Files
Atomizer/atomizer-dashboard/backend/api/routes/claude.py
Anto01 ba0b9a1fae feat(dashboard): Enhanced chat, spec management, and Claude integration
Backend:
- spec.py: New AtomizerSpec REST API endpoints
- spec_manager.py: SpecManager service for unified config
- interview_engine.py: Study creation interview logic
- claude.py: Enhanced Claude API with context
- optimization.py: Extended optimization endpoints
- context_builder.py, session_manager.py: Improved services

Frontend:
- Chat components: Enhanced message rendering, tool call cards
- Hooks: useClaudeCode, useSpecWebSocket, improved useChat
- Pages: Updated Dashboard, Analysis, Insights, Setup, Home
- Components: ParallelCoordinatesPlot, ParetoPlot improvements
- App.tsx: Route updates for canvas/studio

Infrastructure:
- vite.config.ts: Build configuration updates
- start/stop-dashboard.bat: Script improvements
2026-01-20 13:10:47 -05:00

619 lines
21 KiB
Python

"""
Claude Chat API Routes
Provides endpoints for AI-powered chat within the Atomizer dashboard.
Two approaches:
1. Session-based: Persistent sessions with MCP tools (new)
2. Legacy: Stateless CLI calls (backwards compatible)
"""
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List, Dict, Any, Literal
import json
router = APIRouter()
# ========== Request/Response Models ==========
class ChatMessage(BaseModel):
role: str # "user" or "assistant"
content: str
class ChatRequest(BaseModel):
message: str
study_id: Optional[str] = None
conversation_history: Optional[List[Dict[str, Any]]] = None
class ChatResponse(BaseModel):
response: str
tool_calls: Optional[List[Dict[str, Any]]] = None
study_id: Optional[str] = None
class CreateSessionRequest(BaseModel):
mode: Literal["user", "power"] = "user"
study_id: Optional[str] = None
resume_session_id: Optional[str] = None
class SwitchModeRequest(BaseModel):
mode: Literal["user", "power"]
# Store active conversations (legacy, in production use database)
_conversations: Dict[str, List[Dict[str, Any]]] = {}
# ========== Session Manager Access ==========
_session_manager = None
def get_session_manager():
"""Lazy import to avoid circular dependencies"""
global _session_manager
if _session_manager is None:
from api.services.session_manager import SessionManager
_session_manager = SessionManager()
return _session_manager
# ========== NEW: Session-based Endpoints ==========
@router.post("/sessions")
async def create_session(request: CreateSessionRequest):
"""
Create or resume a Claude session with MCP tools.
Args:
mode: "user" for safe operations, "power" for full access
study_id: Optional study to provide context
resume_session_id: Optional session ID to resume
Returns:
Session info including session_id, mode, study_id
"""
try:
manager = get_session_manager()
session = await manager.create_session(
mode=request.mode,
study_id=request.study_id,
resume_session_id=request.resume_session_id,
)
return {
"session_id": session.session_id,
"mode": session.mode,
"study_id": session.study_id,
"is_alive": session.is_alive(),
}
except Exception as e:
import traceback
error_msg = f"{type(e).__name__}: {str(e) or 'No message'}"
traceback.print_exc()
raise HTTPException(status_code=500, detail=error_msg)
@router.get("/sessions/{session_id}")
async def get_session(session_id: str):
"""Get session info"""
manager = get_session_manager()
info = manager.get_session_info(session_id)
if not info:
raise HTTPException(status_code=404, detail="Session not found")
return info
@router.post("/sessions/{session_id}/mode")
async def switch_session_mode(session_id: str, request: SwitchModeRequest):
"""
Switch session mode (requires session restart).
Args:
session_id: Session to update
mode: New mode ("user" or "power")
"""
try:
manager = get_session_manager()
session = await manager.switch_mode(session_id, request.mode)
return {
"session_id": session.session_id,
"mode": session.mode,
"message": f"Mode switched to {request.mode}",
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sessions/{session_id}/study")
async def set_session_study(session_id: str, study_id: str):
"""Update study context for a session"""
try:
manager = get_session_manager()
await manager.set_study_context(session_id, study_id)
return {"message": f"Study context updated to {study_id}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.websocket("/sessions/{session_id}/ws")
async def session_websocket(websocket: WebSocket, session_id: str):
"""
WebSocket for real-time chat with a session.
Message formats (client -> server):
{"type": "message", "content": "user message", "canvas_state": {...}}
{"type": "set_study", "study_id": "study_name"}
{"type": "set_canvas", "canvas_state": {...}}
{"type": "ping"}
Message formats (server -> client):
{"type": "text", "content": "..."}
{"type": "tool_call", "tool": {...}}
{"type": "tool_result", "result": {...}}
{"type": "done", "tool_calls": [...]}
{"type": "error", "message": "..."}
{"type": "pong"}
{"type": "context_updated", "study_id": "..."}
{"type": "canvas_updated", "canvas_state": {...}}
"""
await websocket.accept()
manager = get_session_manager()
session = manager.get_session(session_id)
if not session:
await websocket.send_json({"type": "error", "message": "Session not found"})
await websocket.close()
return
# Track current canvas state for this connection
current_canvas_state: Dict[str, Any] = {}
try:
while True:
data = await websocket.receive_json()
if data.get("type") == "message":
content = data.get("content", "")
if not content:
continue
# Get canvas state from message or use stored state
msg_canvas = data.get("canvas_state")
canvas_state = msg_canvas if msg_canvas is not None else current_canvas_state
# Debug logging
if canvas_state:
node_count = len(canvas_state.get("nodes", []))
print(f"[Claude WS] Sending message with canvas state: {node_count} nodes")
else:
print("[Claude WS] Sending message WITHOUT canvas state")
async for chunk in manager.send_message(
session_id,
content,
canvas_state=canvas_state if canvas_state else None,
):
await websocket.send_json(chunk)
elif data.get("type") == "set_study":
study_id = data.get("study_id")
if study_id:
await manager.set_study_context(session_id, study_id)
await websocket.send_json({
"type": "context_updated",
"study_id": study_id,
})
elif data.get("type") == "set_canvas":
# Update canvas state for this connection
current_canvas_state = data.get("canvas_state", {})
await websocket.send_json({
"type": "canvas_updated",
"canvas_state": current_canvas_state,
})
elif data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
pass
except Exception as e:
try:
await websocket.send_json({"type": "error", "message": str(e)})
except:
pass
# ========== LEGACY: Stateless Endpoints (backwards compatible) ==========
@router.get("/status")
async def get_claude_status():
"""
Check if Claude CLI is available
Returns:
JSON with CLI status
"""
import shutil
claude_available = shutil.which("claude") is not None
return {
"available": claude_available,
"message": "Claude CLI is available" if claude_available else "Claude CLI not found in PATH",
"mode": "cli" # Indicate we're using CLI mode
}
@router.post("/chat", response_model=ChatResponse)
async def chat_with_claude(request: ChatRequest):
"""
Send a message to Claude via CLI with Atomizer context
Args:
request: ChatRequest with message, optional study_id, and conversation history
Returns:
ChatResponse with Claude's response
"""
try:
from api.services.claude_cli_agent import AtomizerCLIAgent
# Create agent with study context
agent = AtomizerCLIAgent(study_id=request.study_id)
# Convert conversation history format if needed
history = []
if request.conversation_history:
for msg in request.conversation_history:
if isinstance(msg.get('content'), str):
history.append(msg)
# Get response
result = await agent.chat(request.message, history)
return ChatResponse(
response=result["response"],
tool_calls=result.get("tool_calls"),
study_id=request.study_id
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Chat error: {str(e)}"
)
@router.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
Stream a response from Claude CLI token by token
Args:
request: ChatRequest with message and optional context
Returns:
StreamingResponse with text/event-stream
"""
async def generate():
try:
from api.services.claude_cli_agent import AtomizerCLIAgent
agent = AtomizerCLIAgent(study_id=request.study_id)
# Convert history
history = []
if request.conversation_history:
for msg in request.conversation_history:
if isinstance(msg.get('content'), str):
history.append(msg)
# Stream response
async for token in agent.chat_stream(request.message, history):
yield f"data: {json.dumps({'token': token})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@router.websocket("/chat/ws")
async def websocket_chat(websocket: WebSocket):
"""
WebSocket endpoint for real-time chat
Message format (client -> server):
{"type": "message", "content": "user message", "study_id": "optional"}
Message format (server -> client):
{"type": "token", "content": "..."}
{"type": "done", "tool_calls": [...]}
{"type": "error", "message": "..."}
"""
await websocket.accept()
conversation_history = []
try:
from api.services.claude_cli_agent import AtomizerCLIAgent
while True:
# Receive message from client
data = await websocket.receive_json()
if data.get("type") == "message":
content = data.get("content", "")
study_id = data.get("study_id")
if not content:
continue
# Create agent
agent = AtomizerCLIAgent(study_id=study_id)
try:
# Use non-streaming chat
result = await agent.chat(content, conversation_history)
# Send response
await websocket.send_json({
"type": "response",
"content": result["response"],
"tool_calls": result.get("tool_calls", [])
})
# Update history
conversation_history.append({"role": "user", "content": content})
conversation_history.append({"role": "assistant", "content": result["response"]})
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
elif data.get("type") == "clear":
# Clear conversation history
conversation_history = []
await websocket.send_json({"type": "cleared"})
except WebSocketDisconnect:
pass
except Exception as e:
try:
await websocket.send_json({
"type": "error",
"message": str(e)
})
except:
pass
# ========== POWER MODE: Direct API with Write Tools ==========
@router.websocket("/sessions/{session_id}/ws/power")
async def power_mode_websocket(websocket: WebSocket, session_id: str):
"""
WebSocket for power mode chat using direct Anthropic API with write tools.
Unlike the regular /ws endpoint which uses Claude CLI + MCP,
this uses AtomizerClaudeAgent directly with built-in write tools.
This allows immediate modifications without permission prompts.
Message formats (client -> server):
{"type": "message", "content": "user message"}
{"type": "set_study", "study_id": "study_name"}
{"type": "ping"}
Message formats (server -> client):
{"type": "text", "content": "..."}
{"type": "tool_call", "tool": "...", "input": {...}}
{"type": "tool_result", "result": "..."}
{"type": "done", "tool_calls": [...]}
{"type": "error", "message": "..."}
{"type": "spec_modified", "changes": [...]}
{"type": "pong"}
"""
await websocket.accept()
manager = get_session_manager()
session = manager.get_session(session_id)
if not session:
await websocket.send_json({"type": "error", "message": "Session not found"})
await websocket.close()
return
# Import AtomizerClaudeAgent for direct API access
from api.services.claude_agent import AtomizerClaudeAgent
# Create agent with study context
agent = AtomizerClaudeAgent(study_id=session.study_id)
conversation_history: List[Dict[str, Any]] = []
# Load initial spec and set canvas state so Claude sees current canvas
initial_spec = agent.load_current_spec()
if initial_spec:
# Send initial spec to frontend
await websocket.send_json({
"type": "spec_updated",
"spec": initial_spec,
"reason": "initial_load"
})
try:
while True:
data = await websocket.receive_json()
if data.get("type") == "message":
content = data.get("content", "")
if not content:
continue
try:
# Use streaming API with tool support for real-time response
last_tool_calls = []
async for event in agent.chat_stream_with_tools(content, conversation_history):
event_type = event.get("type")
if event_type == "text":
# Stream text tokens to frontend immediately
await websocket.send_json({
"type": "text",
"content": event.get("content", ""),
})
elif event_type == "tool_call":
# Tool is being called
tool_info = event.get("tool", {})
await websocket.send_json({
"type": "tool_call",
"tool": tool_info,
})
elif event_type == "tool_result":
# Tool finished executing
tool_name = event.get("tool", "")
await websocket.send_json({
"type": "tool_result",
"tool": tool_name,
"result": event.get("result", ""),
})
# If it was a write tool, send full updated spec
if tool_name in ["add_design_variable", "add_extractor",
"add_objective", "add_constraint",
"update_spec_field", "remove_node",
"create_study"]:
# Load updated spec and update agent's canvas state
updated_spec = agent.load_current_spec()
if updated_spec:
await websocket.send_json({
"type": "spec_updated",
"tool": tool_name,
"spec": updated_spec, # Full spec for direct canvas update
})
elif event_type == "done":
# Streaming complete
last_tool_calls = event.get("tool_calls", [])
await websocket.send_json({
"type": "done",
"tool_calls": last_tool_calls,
})
# Update conversation history for next message
# Note: For proper history tracking, we'd need to store messages properly
# For now, we append the user message and response
conversation_history.append({"role": "user", "content": content})
conversation_history.append({"role": "assistant", "content": event.get("response", "")})
except Exception as e:
import traceback
traceback.print_exc()
await websocket.send_json({
"type": "error",
"message": str(e),
})
elif data.get("type") == "canvas_edit":
# User made a manual edit to the canvas - update Claude's context
spec = data.get("spec")
if spec:
agent.set_canvas_state(spec)
await websocket.send_json({
"type": "canvas_edit_received",
"acknowledged": True
})
elif data.get("type") == "set_study":
study_id = data.get("study_id")
if study_id:
await manager.set_study_context(session_id, study_id)
# Recreate agent with new study context
agent = AtomizerClaudeAgent(study_id=study_id)
conversation_history = [] # Clear history on study change
# Load spec for new study
new_spec = agent.load_current_spec()
await websocket.send_json({
"type": "context_updated",
"study_id": study_id,
})
if new_spec:
await websocket.send_json({
"type": "spec_updated",
"spec": new_spec,
"reason": "study_change"
})
elif data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
pass
except Exception as e:
try:
await websocket.send_json({"type": "error", "message": str(e)})
except:
pass
@router.get("/suggestions")
async def get_chat_suggestions(study_id: Optional[str] = None):
"""
Get contextual chat suggestions based on current study
Args:
study_id: Optional study to get suggestions for
Returns:
List of suggested prompts
"""
base_suggestions = [
"What's the status of my optimization?",
"Show me the best designs found",
"Compare the top 3 trials",
"What parameters have the most impact?",
"Explain the convergence behavior"
]
if study_id:
# Add study-specific suggestions
return {
"suggestions": [
f"Summarize the {study_id} study",
"What's the current best objective value?",
"Are there any failed trials? Why?",
"Show parameter sensitivity analysis",
"What should I try next to improve results?"
] + base_suggestions[:3]
}
return {
"suggestions": [
"List all available studies",
"Help me create a new study",
"What can you help me with?"
] + base_suggestions[:3]
}