Files
Atomizer/atomizer-dashboard/backend/api/services/claude_code_session.py
Anto01 b05412f807 feat(canvas): Claude Code integration with streaming, snippets, and live preview
Backend:
- Add POST /generate-extractor for AI code generation via Claude CLI
- Add POST /generate-extractor/stream for SSE streaming generation
- Add POST /validate-extractor with enhanced syntax checking
- Add POST /check-dependencies for import analysis
- Add POST /test-extractor for live OP2 file testing
- Add ClaudeCodeSession service for managing CLI sessions

Frontend:
- Add lib/api/claude.ts with typed API functions
- Enhance CodeEditorPanel with:
  - Streaming generation with live preview
  - Code snippets library (6 templates: displacement, stress, frequency, mass, energy, reaction)
  - Test button for live OP2 validation
  - Cancel button for stopping generation
  - Dependency warnings display
- Integrate streaming and testing into NodeConfigPanelV2

Uses Claude CLI (--print mode) to leverage Pro/Max subscription without API costs.
2026-01-20 13:08:12 -05:00

452 lines
18 KiB
Python

"""
Claude Code CLI Session Manager
Spawns actual Claude Code CLI processes with full Atomizer access.
This gives dashboard users the same power as terminal users.
Unlike the MCP-based approach:
- Claude can actually edit files (not just return instructions)
- Claude can run Python scripts
- Claude can execute git commands
- Full Opus 4.5 capabilities
"""
import asyncio
import json
import os
import uuid
from pathlib import Path
from typing import AsyncGenerator, Dict, Optional, Any
# Atomizer paths
ATOMIZER_ROOT = Path(__file__).parent.parent.parent.parent.parent
STUDIES_DIR = ATOMIZER_ROOT / "studies"
class ClaudeCodeSession:
"""
Manages a Claude Code CLI session with full capabilities.
Unlike MCP tools, this spawns the actual claude CLI which has:
- Full file system access
- Full command execution
- Opus 4.5 model
- All Claude Code capabilities
"""
def __init__(self, session_id: str, study_id: Optional[str] = None):
self.session_id = session_id
self.study_id = study_id
self.canvas_state: Optional[Dict] = None
self.conversation_history: list = []
# Determine working directory
self.working_dir = ATOMIZER_ROOT
if study_id:
# Handle nested study paths like "M1_Mirror/m1_mirror_flatback_lateral"
study_path = STUDIES_DIR / study_id
if study_path.exists():
self.working_dir = study_path
else:
# Try finding it in subdirectories
for parent in STUDIES_DIR.iterdir():
if parent.is_dir():
nested_path = parent / study_id
if nested_path.exists():
self.working_dir = nested_path
break
def set_canvas_state(self, canvas_state: Dict):
"""Update canvas state from frontend"""
self.canvas_state = canvas_state
async def send_message(self, message: str) -> AsyncGenerator[Dict[str, Any], None]:
"""
Send message to Claude Code CLI and stream response.
Uses claude CLI with:
- --print for output
- --dangerously-skip-permissions for full access (controlled environment)
- Runs from Atomizer root to get CLAUDE.md context automatically
- Study-specific context injected into prompt
Yields:
Dict messages: {"type": "text", "content": "..."} or {"type": "done"}
"""
# Build comprehensive prompt with all context
full_prompt = self._build_full_prompt(message)
# Create MCP config file for the session
mcp_config_file = ATOMIZER_ROOT / f".claude-mcp-{self.session_id}.json"
mcp_config = {
"mcpServers": {
"atomizer-tools": {
"command": "npx",
"args": ["-y", "ts-node", str(ATOMIZER_ROOT / "atomizer-dashboard" / "mcp-server" / "src" / "index.ts")],
"cwd": str(ATOMIZER_ROOT / "atomizer-dashboard" / "mcp-server"),
"env": {
"ATOMIZER_ROOT": str(ATOMIZER_ROOT),
"STUDIES_DIR": str(STUDIES_DIR),
}
}
}
}
mcp_config_file.write_text(json.dumps(mcp_config, indent=2), encoding='utf-8')
try:
# Spawn claude CLI from ATOMIZER_ROOT so it picks up CLAUDE.md
# This gives it full Atomizer context automatically
# Note: prompt is passed via stdin for complex multi-line prompts
process = await asyncio.create_subprocess_exec(
"claude",
"--print",
"--dangerously-skip-permissions",
"--mcp-config", str(mcp_config_file),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
cwd=str(ATOMIZER_ROOT),
env={
**os.environ,
"ATOMIZER_STUDY": self.study_id or "",
"ATOMIZER_STUDY_PATH": str(self.working_dir),
"ATOMIZER_ROOT": str(ATOMIZER_ROOT),
}
)
# Write prompt to stdin
process.stdin.write(full_prompt.encode('utf-8'))
await process.stdin.drain()
process.stdin.close()
# Read and yield output as it comes
full_output = ""
# Stream stdout
while True:
chunk = await process.stdout.read(512)
if not chunk:
break
decoded = chunk.decode('utf-8', errors='replace')
full_output += decoded
yield {"type": "text", "content": decoded}
# Wait for process to complete
await process.wait()
# Check for errors
stderr = await process.stderr.read()
if stderr and process.returncode != 0:
error_text = stderr.decode('utf-8', errors='replace')
yield {"type": "error", "content": f"\n[Error]: {error_text}"}
# Update conversation history
self.conversation_history.append({"role": "user", "content": message})
self.conversation_history.append({"role": "assistant", "content": full_output})
# Signal completion
yield {"type": "done"}
# Check if any files were modified and signal canvas refresh
if self._output_indicates_file_changes(full_output):
yield {
"type": "refresh_canvas",
"study_id": self.study_id,
"reason": "Claude modified study files"
}
finally:
# Clean up temp files
if mcp_config_file.exists():
try:
mcp_config_file.unlink()
except:
pass
def _build_full_prompt(self, message: str) -> str:
"""Build comprehensive prompt with all context"""
parts = []
# Study context
study_context = self._build_study_context() if self.study_id else ""
if study_context:
parts.append("## Current Study Context")
parts.append(study_context)
# Canvas context
if self.canvas_state:
canvas_context = self._build_canvas_context()
if canvas_context:
parts.append("## Current Canvas State")
parts.append(canvas_context)
# Conversation history (last few exchanges)
if self.conversation_history:
parts.append("## Recent Conversation")
for msg in self.conversation_history[-6:]:
role = "User" if msg["role"] == "user" else "Assistant"
# Truncate long messages
content = msg["content"][:500] + "..." if len(msg["content"]) > 500 else msg["content"]
parts.append(f"**{role}:** {content}")
parts.append("")
# User's actual request
parts.append("## User Request")
parts.append(message)
parts.append("")
# Critical instruction
parts.append("## Important")
parts.append("You have FULL power to edit files in this environment. When asked to make changes:")
parts.append("1. Use the Edit or Write tools to ACTUALLY MODIFY the files")
parts.append("2. Show a brief summary of what you changed")
parts.append("3. Do not just describe changes - MAKE THEM")
parts.append("")
parts.append("After making changes to optimization_config.json, the dashboard canvas will auto-refresh.")
return "\n".join(parts)
def _build_study_context(self) -> str:
"""Build detailed context for the active study"""
if not self.study_id:
return ""
context_parts = [f"**Study ID:** `{self.study_id}`"]
context_parts.append(f"**Study Path:** `{self.working_dir}`")
context_parts.append("")
# Find and read optimization_config.json
config_path = self.working_dir / "1_setup" / "optimization_config.json"
if not config_path.exists():
config_path = self.working_dir / "optimization_config.json"
if config_path.exists():
try:
config = json.loads(config_path.read_text(encoding='utf-8'))
context_parts.append(f"**Config File:** `{config_path.relative_to(ATOMIZER_ROOT)}`")
context_parts.append("")
# Design variables summary
dvs = config.get("design_variables", [])
if dvs:
context_parts.append("### Design Variables")
context_parts.append("")
context_parts.append("| Name | Min | Max | Baseline | Unit |")
context_parts.append("|------|-----|-----|----------|------|")
for dv in dvs[:15]:
name = dv.get("name", dv.get("expression_name", "?"))
min_v = dv.get("min", dv.get("lower", "?"))
max_v = dv.get("max", dv.get("upper", "?"))
baseline = dv.get("baseline", "-")
unit = dv.get("units", dv.get("unit", "-"))
context_parts.append(f"| {name} | {min_v} | {max_v} | {baseline} | {unit} |")
if len(dvs) > 15:
context_parts.append(f"\n*... and {len(dvs) - 15} more*")
context_parts.append("")
# Objectives
objs = config.get("objectives", [])
if objs:
context_parts.append("### Objectives")
context_parts.append("")
for obj in objs:
name = obj.get("name", "?")
direction = obj.get("direction", "minimize")
weight = obj.get("weight", 1)
context_parts.append(f"- **{name}**: {direction} (weight: {weight})")
context_parts.append("")
# Extraction method (for Zernike)
ext_method = config.get("extraction_method", {})
if ext_method:
context_parts.append("### Extraction Method")
context_parts.append("")
context_parts.append(f"- Type: `{ext_method.get('type', '?')}`")
context_parts.append(f"- Class: `{ext_method.get('class', '?')}`")
if ext_method.get("inner_radius"):
context_parts.append(f"- Inner Radius: `{ext_method.get('inner_radius')}`")
context_parts.append("")
# Zernike settings
zernike = config.get("zernike_settings", {})
if zernike:
context_parts.append("### Zernike Settings")
context_parts.append("")
context_parts.append(f"- Modes: `{zernike.get('n_modes', '?')}`")
context_parts.append(f"- Filter Low Orders: `{zernike.get('filter_low_orders', '?')}`")
context_parts.append(f"- Subcases: `{zernike.get('subcases', [])}`")
context_parts.append("")
# Algorithm
method = config.get("method", config.get("optimization", {}).get("sampler", "TPE"))
max_trials = config.get("max_trials", config.get("optimization", {}).get("n_trials", 100))
context_parts.append("### Algorithm")
context_parts.append("")
context_parts.append(f"- Method: `{method}`")
context_parts.append(f"- Max Trials: `{max_trials}`")
context_parts.append("")
except Exception as e:
context_parts.append(f"*Error reading config: {e}*")
context_parts.append("")
else:
context_parts.append("*No optimization_config.json found*")
context_parts.append("")
# Check for run_optimization.py
run_opt_path = self.working_dir / "run_optimization.py"
if run_opt_path.exists():
context_parts.append(f"**Run Script:** `{run_opt_path.relative_to(ATOMIZER_ROOT)}` (exists)")
else:
context_parts.append("**Run Script:** not found")
context_parts.append("")
# Check results
db_path = self.working_dir / "3_results" / "study.db"
if not db_path.exists():
db_path = self.working_dir / "2_results" / "study.db"
if db_path.exists():
context_parts.append("**Results Database:** exists")
# Could query trial count here
else:
context_parts.append("**Results Database:** not found (no optimization run yet)")
return "\n".join(context_parts)
def _build_canvas_context(self) -> str:
"""Build markdown context from canvas state"""
if not self.canvas_state:
return ""
parts = []
nodes = self.canvas_state.get("nodes", [])
edges = self.canvas_state.get("edges", [])
if not nodes:
return "*Canvas is empty*"
# Group nodes by type
design_vars = [n for n in nodes if n.get("type") == "designVar"]
objectives = [n for n in nodes if n.get("type") == "objective"]
extractors = [n for n in nodes if n.get("type") == "extractor"]
models = [n for n in nodes if n.get("type") == "nxModel"]
algorithms = [n for n in nodes if n.get("type") == "algorithm"]
if models:
parts.append("### NX Model")
for m in models:
data = m.get("data", {})
parts.append(f"- File: `{data.get('filePath', 'Not set')}`")
parts.append("")
if design_vars:
parts.append("### Design Variables (Canvas)")
parts.append("")
parts.append("| Name | Min | Max | Baseline |")
parts.append("|------|-----|-----|----------|")
for dv in design_vars[:20]:
data = dv.get("data", {})
name = data.get("expressionName") or data.get("label", "?")
min_v = data.get("minValue", "?")
max_v = data.get("maxValue", "?")
baseline = data.get("baseline", "-")
parts.append(f"| {name} | {min_v} | {max_v} | {baseline} |")
if len(design_vars) > 20:
parts.append(f"\n*... and {len(design_vars) - 20} more*")
parts.append("")
if extractors:
parts.append("### Extractors (Canvas)")
parts.append("")
for ext in extractors:
data = ext.get("data", {})
ext_type = data.get("extractorType") or data.get("extractorId", "?")
label = data.get("label", "?")
parts.append(f"- **{label}**: `{ext_type}`")
parts.append("")
if objectives:
parts.append("### Objectives (Canvas)")
parts.append("")
for obj in objectives:
data = obj.get("data", {})
name = data.get("objectiveName") or data.get("label", "?")
direction = data.get("direction", "minimize")
weight = data.get("weight", 1)
parts.append(f"- **{name}**: {direction} (weight: {weight})")
parts.append("")
if algorithms:
parts.append("### Algorithm (Canvas)")
for alg in algorithms:
data = alg.get("data", {})
method = data.get("method", "?")
trials = data.get("maxTrials", "?")
parts.append(f"- Method: `{method}`")
parts.append(f"- Max Trials: `{trials}`")
parts.append("")
return "\n".join(parts)
def _output_indicates_file_changes(self, output: str) -> bool:
"""Check if Claude's output indicates file modifications"""
indicators = [
"✓ Edited",
"✓ Wrote",
"Successfully wrote",
"Successfully edited",
"Modified:",
"Updated:",
"Added to file",
"optimization_config.json", # Common target
"run_optimization.py", # Common target
]
output_lower = output.lower()
return any(indicator.lower() in output_lower for indicator in indicators)
class ClaudeCodeSessionManager:
"""
Manages multiple Claude Code sessions.
Each session is independent and can have different study contexts.
"""
def __init__(self):
self.sessions: Dict[str, ClaudeCodeSession] = {}
def create_session(self, study_id: Optional[str] = None) -> ClaudeCodeSession:
"""Create a new Claude Code session"""
session_id = str(uuid.uuid4())[:8]
session = ClaudeCodeSession(session_id, study_id)
self.sessions[session_id] = session
return session
def get_session(self, session_id: str) -> Optional[ClaudeCodeSession]:
"""Get an existing session"""
return self.sessions.get(session_id)
def remove_session(self, session_id: str):
"""Remove a session"""
self.sessions.pop(session_id, None)
def set_canvas_state(self, session_id: str, canvas_state: Dict):
"""Update canvas state for a session"""
session = self.sessions.get(session_id)
if session:
session.set_canvas_state(canvas_state)
# Global session manager instance
_session_manager: Optional[ClaudeCodeSessionManager] = None
def get_claude_code_manager() -> ClaudeCodeSessionManager:
"""Get the global session manager"""
global _session_manager
if _session_manager is None:
_session_manager = ClaudeCodeSessionManager()
return _session_manager