Files
Atomizer/atomizer-dashboard/backend/api/routes/spec.py
Anto01 ba0b9a1fae feat(dashboard): Enhanced chat, spec management, and Claude integration
Backend:
- spec.py: New AtomizerSpec REST API endpoints
- spec_manager.py: SpecManager service for unified config
- interview_engine.py: Study creation interview logic
- claude.py: Enhanced Claude API with context
- optimization.py: Extended optimization endpoints
- context_builder.py, session_manager.py: Improved services

Frontend:
- Chat components: Enhanced message rendering, tool call cards
- Hooks: useClaudeCode, useSpecWebSocket, improved useChat
- Pages: Updated Dashboard, Analysis, Insights, Setup, Home
- Components: ParallelCoordinatesPlot, ParetoPlot improvements
- App.tsx: Route updates for canvas/studio

Infrastructure:
- vite.config.ts: Build configuration updates
- start/stop-dashboard.bat: Script improvements
2026-01-20 13:10:47 -05:00

647 lines
21 KiB
Python

"""
AtomizerSpec v2.0 API Endpoints
REST API for managing AtomizerSpec configurations.
All spec modifications flow through these endpoints.
Endpoints:
- GET /studies/{study_id}/spec - Get full spec
- PUT /studies/{study_id}/spec - Replace entire spec
- PATCH /studies/{study_id}/spec - Partial update
- POST /studies/{study_id}/spec/validate - Validate spec
- POST /studies/{study_id}/spec/nodes - Add node
- PATCH /studies/{study_id}/spec/nodes/{node_id} - Update node
- DELETE /studies/{study_id}/spec/nodes/{node_id} - Delete node
- POST /studies/{study_id}/spec/custom-functions - Add custom extractor
- WebSocket /studies/{study_id}/spec/sync - Real-time sync
"""
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import json
import sys
import asyncio
# Add project root to path
sys.path.append(str(Path(__file__).parent.parent.parent.parent.parent))
from api.services.spec_manager import (
SpecManager,
SpecManagerError,
SpecNotFoundError,
SpecConflictError,
get_spec_manager,
)
from optimization_engine.config.spec_models import (
AtomizerSpec,
ValidationReport,
)
from optimization_engine.config.spec_validator import SpecValidationError
router = APIRouter(prefix="/studies/{study_id:path}/spec", tags=["spec"])
# Base studies directory
STUDIES_DIR = Path(__file__).parent.parent.parent.parent.parent / "studies"
# ============================================================================
# Request/Response Models
# ============================================================================
class SpecPatchRequest(BaseModel):
"""Request for patching a spec field."""
path: str = Field(..., description="JSONPath to the field (e.g., 'objectives[0].weight')")
value: Any = Field(..., description="New value")
modified_by: str = Field(default="api", description="Who is making the change")
class NodeAddRequest(BaseModel):
"""Request for adding a node."""
type: str = Field(..., description="Node type: designVar, extractor, objective, constraint")
data: Dict[str, Any] = Field(..., description="Node data")
modified_by: str = Field(default="canvas", description="Who is making the change")
class NodeUpdateRequest(BaseModel):
"""Request for updating a node."""
updates: Dict[str, Any] = Field(..., description="Fields to update")
modified_by: str = Field(default="canvas", description="Who is making the change")
class CustomFunctionRequest(BaseModel):
"""Request for adding a custom extractor function."""
name: str = Field(..., description="Function name")
code: str = Field(..., description="Python source code")
outputs: List[str] = Field(..., description="Output names")
description: Optional[str] = Field(default=None, description="Human-readable description")
modified_by: str = Field(default="claude", description="Who is making the change")
class ExtractorValidationRequest(BaseModel):
"""Request for validating custom extractor code."""
function_name: str = Field(default="extract", description="Expected function name")
source: str = Field(..., description="Python source code to validate")
class SpecUpdateResponse(BaseModel):
"""Response for spec modification operations."""
success: bool
hash: str
modified: str
modified_by: str
class NodeAddResponse(BaseModel):
"""Response for node add operation."""
success: bool
node_id: str
message: str
class ValidationResponse(BaseModel):
"""Response for validation endpoint."""
valid: bool
errors: List[Dict[str, Any]]
warnings: List[Dict[str, Any]]
summary: Dict[str, int]
# ============================================================================
# Helper Functions
# ============================================================================
def resolve_study_path(study_id: str) -> Path:
"""Find study folder by scanning all topic directories.
Supports both formats:
- "study_name" - Will scan topic folders to find it
- "Topic/study_name" - Direct nested path (e.g., "M1_Mirror/m1_mirror_v1")
"""
# Handle nested paths (e.g., "M1_Mirror/m1_mirror_cost_reduction_lateral")
if "/" in study_id:
nested_path = STUDIES_DIR / study_id.replace("/", "\\") # Handle Windows paths
if nested_path.exists() and nested_path.is_dir():
return nested_path
# Also try with forward slashes (Path handles both)
nested_path = STUDIES_DIR / study_id
if nested_path.exists() and nested_path.is_dir():
return nested_path
# Direct path (flat structure)
direct_path = STUDIES_DIR / study_id
if direct_path.exists() and direct_path.is_dir():
return direct_path
# Scan topic folders (nested structure)
for topic_dir in STUDIES_DIR.iterdir():
if topic_dir.is_dir() and not topic_dir.name.startswith('.'):
study_dir = topic_dir / study_id
if study_dir.exists() and study_dir.is_dir():
return study_dir
raise HTTPException(status_code=404, detail=f"Study not found: {study_id}")
def get_manager(study_id: str) -> SpecManager:
"""Get SpecManager for a study."""
study_path = resolve_study_path(study_id)
return get_spec_manager(study_path)
# ============================================================================
# REST Endpoints
# ============================================================================
@router.get("", response_model=None)
async def get_spec(study_id: str):
"""
Get the full AtomizerSpec for a study.
Returns the complete spec JSON with all design variables, extractors,
objectives, constraints, and canvas state.
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(
status_code=404,
detail=f"No AtomizerSpec found for study '{study_id}'. Use migration or create new spec."
)
try:
spec = manager.load()
return spec.model_dump(mode='json')
except SpecValidationError as e:
# Return spec even if invalid, but include validation info
raw = manager.load_raw()
return JSONResponse(
status_code=200,
content={
**raw,
"_validation_error": str(e)
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/raw")
async def get_spec_raw(study_id: str):
"""
Get the raw spec JSON without validation.
Useful for debugging or when spec is invalid.
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
return manager.load_raw()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/hash")
async def get_spec_hash(study_id: str):
"""Get the current spec hash for conflict detection."""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
return {"hash": manager.get_hash()}
@router.put("", response_model=SpecUpdateResponse)
async def replace_spec(
study_id: str,
spec: Dict[str, Any],
modified_by: str = Query(default="api"),
expected_hash: Optional[str] = Query(default=None)
):
"""
Replace the entire spec.
Validates the new spec before saving. Optionally check for conflicts
using expected_hash parameter.
"""
manager = get_manager(study_id)
try:
new_hash = manager.save(spec, modified_by=modified_by, expected_hash=expected_hash)
reloaded = manager.load()
return SpecUpdateResponse(
success=True,
hash=new_hash,
modified=reloaded.meta.modified or "",
modified_by=modified_by
)
except SpecConflictError as e:
raise HTTPException(
status_code=409,
detail={
"message": str(e),
"current_hash": e.current_hash
}
)
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.patch("", response_model=SpecUpdateResponse)
async def patch_spec(study_id: str, request: SpecPatchRequest):
"""
Partial update to spec using JSONPath.
Example paths:
- "objectives[0].weight" - Update objective weight
- "design_variables[1].bounds.max" - Update DV bound
- "meta.description" - Update description
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
spec = manager.patch(request.path, request.value, modified_by=request.modified_by)
return SpecUpdateResponse(
success=True,
hash=manager.get_hash(),
modified=spec.meta.modified or "",
modified_by=request.modified_by
)
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/validate", response_model=ValidationResponse)
async def validate_spec(study_id: str):
"""
Validate the spec and return detailed report.
Returns errors, warnings, and summary of the spec contents.
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
report = manager.validate_and_report()
return ValidationResponse(
valid=report.valid,
errors=[e.model_dump() for e in report.errors],
warnings=[w.model_dump() for w in report.warnings],
summary=report.summary.model_dump()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Node CRUD Endpoints
# ============================================================================
@router.post("/nodes", response_model=NodeAddResponse)
async def add_node(study_id: str, request: NodeAddRequest):
"""
Add a new node to the spec.
Supported types: designVar, extractor, objective, constraint
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
valid_types = ["designVar", "extractor", "objective", "constraint"]
if request.type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"Invalid node type '{request.type}'. Valid: {valid_types}"
)
try:
node_id = manager.add_node(request.type, request.data, modified_by=request.modified_by)
return NodeAddResponse(
success=True,
node_id=node_id,
message=f"Added {request.type} node: {node_id}"
)
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/nodes/{node_id}")
async def update_node(study_id: str, node_id: str, request: NodeUpdateRequest):
"""Update an existing node's properties."""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
manager.update_node(node_id, request.updates, modified_by=request.modified_by)
return {"success": True, "message": f"Updated node {node_id}"}
except SpecManagerError as e:
raise HTTPException(status_code=404, detail=str(e))
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/nodes/{node_id}")
async def delete_node(
study_id: str,
node_id: str,
modified_by: str = Query(default="canvas")
):
"""
Delete a node and all edges referencing it.
Use with caution - this will also remove any objectives or constraints
that reference a deleted extractor.
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
manager.remove_node(node_id, modified_by=modified_by)
return {"success": True, "message": f"Removed node {node_id}"}
except SpecManagerError as e:
raise HTTPException(status_code=404, detail=str(e))
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Custom Function Endpoint
# ============================================================================
@router.post("/custom-functions", response_model=NodeAddResponse)
async def add_custom_function(study_id: str, request: CustomFunctionRequest):
"""
Add a custom Python function as an extractor.
The function will be available in the optimization workflow.
Claude can use this to add new physics extraction logic.
"""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
extractor_id = manager.add_custom_function(
name=request.name,
code=request.code,
outputs=request.outputs,
description=request.description,
modified_by=request.modified_by
)
return NodeAddResponse(
success=True,
node_id=extractor_id,
message=f"Added custom extractor: {request.name}"
)
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Separate router for non-study-specific endpoints
validate_router = APIRouter(prefix="/spec", tags=["spec"])
@validate_router.post("/validate-extractor")
async def validate_custom_extractor(request: ExtractorValidationRequest):
"""
Validate custom extractor Python code.
Checks syntax, security patterns, and function signature.
Does not require a study - can be used before adding to spec.
"""
try:
from optimization_engine.extractors.custom_extractor_loader import (
validate_extractor_code,
ExtractorSecurityError,
)
try:
is_valid, errors = validate_extractor_code(request.source, request.function_name)
return {
"valid": is_valid,
"errors": errors
}
except ExtractorSecurityError as e:
return {
"valid": False,
"errors": [str(e)]
}
except ImportError as e:
raise HTTPException(
status_code=500,
detail=f"Custom extractor loader not available: {e}"
)
# ============================================================================
# Edge Endpoints
# ============================================================================
@router.post("/edges")
async def add_edge(
study_id: str,
source: str = Query(..., description="Source node ID"),
target: str = Query(..., description="Target node ID"),
modified_by: str = Query(default="canvas")
):
"""Add a canvas edge between two nodes."""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
manager.add_edge(source, target, modified_by=modified_by)
return {"success": True, "message": f"Added edge {source} -> {target}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/edges")
async def delete_edge(
study_id: str,
source: str = Query(..., description="Source node ID"),
target: str = Query(..., description="Target node ID"),
modified_by: str = Query(default="canvas")
):
"""Remove a canvas edge."""
manager = get_manager(study_id)
if not manager.exists():
raise HTTPException(status_code=404, detail=f"No spec found for study '{study_id}'")
try:
manager.remove_edge(source, target, modified_by=modified_by)
return {"success": True, "message": f"Removed edge {source} -> {target}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# WebSocket Sync Endpoint
# ============================================================================
class WebSocketSubscriber:
"""WebSocket subscriber adapter."""
def __init__(self, websocket: WebSocket):
self.websocket = websocket
async def send_json(self, data: Dict[str, Any]) -> None:
await self.websocket.send_json(data)
@router.websocket("/sync")
async def websocket_sync(websocket: WebSocket, study_id: str):
"""
WebSocket endpoint for real-time spec sync.
Clients receive notifications when spec changes:
- spec_updated: Spec was modified
- node_added: New node added
- node_removed: Node removed
- validation_error: Validation failed
"""
await websocket.accept()
manager = get_manager(study_id)
subscriber = WebSocketSubscriber(websocket)
# Subscribe to updates
manager.subscribe(subscriber)
try:
# Send initial connection ack
await websocket.send_json({
"type": "connection_ack",
"study_id": study_id,
"hash": manager.get_hash() if manager.exists() else None,
"message": "Connected to spec sync"
})
# Keep connection alive and handle client messages
while True:
try:
data = await asyncio.wait_for(
websocket.receive_json(),
timeout=30.0 # Heartbeat interval
)
# Handle client messages
msg_type = data.get("type")
if msg_type == "ping":
await websocket.send_json({"type": "pong"})
elif msg_type == "patch_node":
# Client requests node update
try:
manager.update_node(
data["node_id"],
data.get("data", {}),
modified_by=data.get("modified_by", "canvas")
)
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
elif msg_type == "update_position":
# Client updates node position
try:
manager.update_node_position(
data["node_id"],
data["position"],
modified_by=data.get("modified_by", "canvas")
)
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
except asyncio.TimeoutError:
# Send heartbeat
await websocket.send_json({"type": "heartbeat"})
except WebSocketDisconnect:
pass
finally:
manager.unsubscribe(subscriber)
# ============================================================================
# Create/Initialize Spec
# ============================================================================
@router.post("/create")
async def create_spec(
study_id: str,
spec: Dict[str, Any],
modified_by: str = Query(default="api")
):
"""
Create a new spec for a study.
Use this when migrating from old config or creating a new study.
Will fail if spec already exists (use PUT to replace).
"""
manager = get_manager(study_id)
if manager.exists():
raise HTTPException(
status_code=409,
detail=f"Spec already exists for '{study_id}'. Use PUT to replace."
)
try:
# Ensure meta fields are set
if "meta" not in spec:
spec["meta"] = {}
spec["meta"]["created_by"] = modified_by
new_hash = manager.save(spec, modified_by=modified_by)
return {
"success": True,
"hash": new_hash,
"message": f"Created spec for {study_id}"
}
except SpecValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))