docs: Comprehensive documentation update for Dashboard V3 and Canvas

## Documentation Updates
- DASHBOARD.md: Updated to V3.0 with Canvas V3 features, file browser, introspection
- DASHBOARD_IMPLEMENTATION_STATUS.md: Marked Canvas V3 features as COMPLETE
- CANVAS.md: New comprehensive guide for Canvas Builder V3 with all features
- CLAUDE.md: Added dashboard quick reference and Canvas V3 features

## Canvas V3 Features Documented
- File Browser: Browse studies directory for model files
- Model Introspection: Auto-discover expressions, solver type, dependencies
- One-Click Add: Add expressions as design variables instantly
- Claude Bug Fixes: WebSocket reconnection, SQL errors resolved
- Health Check: /api/health endpoint for monitoring

## Backend Services
- NX introspection service with expression discovery
- File browser API with type filtering
- Claude session management improvements
- Context builder enhancements

## Frontend Components
- FileBrowser: Modal for file selection with search
- IntrospectionPanel: View discovered model information
- ExpressionSelector: Dropdown for design variable configuration
- Improved chat hooks with reconnection logic

## Plan Documents
- Added RALPH_LOOP_CANVAS_V2/V3 implementation records
- Added ATOMIZER_DASHBOARD_V2_MASTER_PLAN
- Added investigation and sync documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-16 20:48:58 -05:00
parent 1c7c7aff05
commit ac5e9b4054
23 changed files with 10860 additions and 773 deletions

View File

@@ -93,7 +93,10 @@ async def create_session(request: CreateSessionRequest):
"is_alive": session.is_alive(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
import traceback
error_msg = f"{type(e).__name__}: {str(e) or 'No message'}"
traceback.print_exc()
raise HTTPException(status_code=500, detail=error_msg)
@router.get("/sessions/{session_id}")
@@ -146,8 +149,9 @@ async def session_websocket(websocket: WebSocket, session_id: str):
WebSocket for real-time chat with a session.
Message formats (client -> server):
{"type": "message", "content": "user message"}
{"type": "message", "content": "user message", "canvas_state": {...}}
{"type": "set_study", "study_id": "study_name"}
{"type": "set_canvas", "canvas_state": {...}}
{"type": "ping"}
Message formats (server -> client):
@@ -158,6 +162,7 @@ async def session_websocket(websocket: WebSocket, session_id: str):
{"type": "error", "message": "..."}
{"type": "pong"}
{"type": "context_updated", "study_id": "..."}
{"type": "canvas_updated", "canvas_state": {...}}
"""
await websocket.accept()
@@ -169,6 +174,9 @@ async def session_websocket(websocket: WebSocket, session_id: str):
await websocket.close()
return
# Track current canvas state for this connection
current_canvas_state: Dict[str, Any] = {}
try:
while True:
data = await websocket.receive_json()
@@ -178,7 +186,14 @@ async def session_websocket(websocket: WebSocket, session_id: str):
if not content:
continue
async for chunk in manager.send_message(session_id, content):
# Get canvas state from message or use stored state
canvas_state = data.get("canvas_state") or current_canvas_state
async for chunk in manager.send_message(
session_id,
content,
canvas_state=canvas_state if canvas_state else None,
):
await websocket.send_json(chunk)
elif data.get("type") == "set_study":
@@ -190,6 +205,14 @@ async def session_websocket(websocket: WebSocket, session_id: str):
"study_id": study_id,
})
elif data.get("type") == "set_canvas":
# Update canvas state for this connection
current_canvas_state = data.get("canvas_state", {})
await websocket.send_json({
"type": "canvas_updated",
"canvas_state": current_canvas_state,
})
elif data.get("type") == "ping":
await websocket.send_json({"type": "pong"})

View File

@@ -1,16 +1,28 @@
"""
Files API Routes
Provides file browsing capabilities for the Canvas Builder.
Provides file browsing and import capabilities for the Canvas Builder.
Supports importing NX model files from anywhere on the file system.
"""
from fastapi import APIRouter, Query
from fastapi import APIRouter, Query, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pathlib import Path
from typing import List, Optional
import os
import shutil
import re
router = APIRouter()
class ImportRequest(BaseModel):
"""Request to import a file from a Windows path"""
source_path: str
study_name: str
copy_related: bool = True
# Path to studies root (go up 5 levels from this file)
_file_path = os.path.abspath(__file__)
ATOMIZER_ROOT = Path(os.path.normpath(os.path.dirname(os.path.dirname(os.path.dirname(
@@ -153,3 +165,240 @@ async def check_file_exists(path: str):
result["name"] = file_path.name
return result
def find_related_nx_files(source_path: Path) -> List[Path]:
"""
Find all related NX files based on naming conventions.
Given a .sim file like 'model_sim1.sim', finds:
- model.prt (geometry part)
- model_fem1.fem (FEM file)
- model_fem1_i.prt (idealized part)
- model_sim1.sim (simulation)
Args:
source_path: Path to any NX file
Returns:
List of all related file paths that exist
"""
related = []
parent = source_path.parent
stem = source_path.stem
suffix = source_path.suffix.lower()
# Extract base name by removing _sim1, _fem1, _i suffixes
base_name = stem
base_name = re.sub(r'_sim\d*$', '', base_name)
base_name = re.sub(r'_fem\d*$', '', base_name)
base_name = re.sub(r'_i$', '', base_name)
# Define patterns to search for
patterns = [
f"{base_name}.prt", # Main geometry
f"{base_name}_i.prt", # Idealized part
f"{base_name}_fem*.fem", # FEM files
f"{base_name}_fem*_i.prt", # Idealized FEM parts
f"{base_name}_sim*.sim", # Simulation files
f"{base_name}.afem", # Assembled FEM
]
# Search for matching files
for pattern in patterns:
for match in parent.glob(pattern):
if match.exists() and match not in related:
related.append(match)
# Also include the source file itself
if source_path.exists() and source_path not in related:
related.append(source_path)
return related
@router.get("/validate-path")
async def validate_external_path(path: str):
"""
Validate an external Windows path and return info about related files.
Args:
path: Absolute Windows path (e.g., C:\\Models\\bracket.sim)
Returns:
Information about the file and related files
"""
try:
source_path = Path(path)
if not source_path.exists():
return {
"valid": False,
"error": f"Path does not exist: {path}",
}
if not source_path.is_file():
return {
"valid": False,
"error": "Path is not a file",
}
# Check if it's a valid NX file type
valid_extensions = ['.prt', '.sim', '.fem', '.afem']
if source_path.suffix.lower() not in valid_extensions:
return {
"valid": False,
"error": f"Invalid file type. Expected: {', '.join(valid_extensions)}",
}
# Find related files
related = find_related_nx_files(source_path)
return {
"valid": True,
"path": str(source_path),
"name": source_path.name,
"size": source_path.stat().st_size,
"related_files": [
{
"name": f.name,
"path": str(f),
"size": f.stat().st_size,
"type": f.suffix.lower(),
}
for f in related
],
}
except Exception as e:
return {
"valid": False,
"error": str(e),
}
@router.post("/import-from-path")
async def import_from_path(request: ImportRequest):
"""
Import NX model files from an external path into a study folder.
This will:
1. Create the study folder if it doesn't exist
2. Copy the specified file
3. Optionally copy all related files (.prt, .sim, .fem, _i.prt)
Args:
request: ImportRequest with source_path, study_name, and copy_related flag
Returns:
List of imported files
"""
try:
source_path = Path(request.source_path)
if not source_path.exists():
raise HTTPException(status_code=404, detail=f"Source file not found: {request.source_path}")
# Create study folder structure
study_dir = STUDIES_ROOT / request.study_name
model_dir = study_dir / "1_model"
model_dir.mkdir(parents=True, exist_ok=True)
# Find files to copy
if request.copy_related:
files_to_copy = find_related_nx_files(source_path)
else:
files_to_copy = [source_path]
imported = []
for src_file in files_to_copy:
dest_file = model_dir / src_file.name
# Skip if already exists (avoid overwrite)
if dest_file.exists():
imported.append({
"name": src_file.name,
"status": "skipped",
"reason": "Already exists",
"path": str(dest_file.relative_to(STUDIES_ROOT)).replace("\\", "/"),
})
continue
# Copy file
shutil.copy2(src_file, dest_file)
imported.append({
"name": src_file.name,
"status": "imported",
"path": str(dest_file.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"size": dest_file.stat().st_size,
})
return {
"success": True,
"study_name": request.study_name,
"imported_files": imported,
"total_imported": len([f for f in imported if f["status"] == "imported"]),
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/upload")
async def upload_files(
files: List[UploadFile] = File(...),
study_name: str = Query(...),
):
"""
Upload NX model files to a study folder.
Args:
files: List of files to upload
study_name: Target study name
Returns:
List of uploaded files
"""
try:
# Create study folder structure
study_dir = STUDIES_ROOT / study_name
model_dir = study_dir / "1_model"
model_dir.mkdir(parents=True, exist_ok=True)
uploaded = []
for file in files:
# Validate file type
suffix = Path(file.filename).suffix.lower()
if suffix not in ['.prt', '.sim', '.fem', '.afem']:
uploaded.append({
"name": file.filename,
"status": "rejected",
"reason": f"Invalid file type: {suffix}",
})
continue
dest_file = model_dir / file.filename
# Save file
content = await file.read()
with open(dest_file, 'wb') as f:
f.write(content)
uploaded.append({
"name": file.filename,
"status": "uploaded",
"path": str(dest_file.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"size": len(content),
})
return {
"success": True,
"study_name": study_name,
"uploaded_files": uploaded,
"total_uploaded": len([f for f in uploaded if f["status"] == "uploaded"]),
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -25,6 +25,7 @@ class ContextBuilder:
mode: Literal["user", "power"],
study_id: Optional[str] = None,
conversation_history: Optional[List[Dict[str, Any]]] = None,
canvas_state: Optional[Dict[str, Any]] = None,
) -> str:
"""
Build full system prompt with context.
@@ -33,12 +34,17 @@ class ContextBuilder:
mode: "user" for safe operations, "power" for full access
study_id: Optional study name to provide context for
conversation_history: Optional recent messages for continuity
canvas_state: Optional canvas state (nodes, edges) from the UI
Returns:
Complete system prompt string
"""
parts = [self._base_context(mode)]
# Canvas context takes priority - if user is working on a canvas, include it
if canvas_state:
parts.append(self._canvas_context(canvas_state))
if study_id:
parts.append(self._study_context(study_id))
else:
@@ -200,6 +206,166 @@ Important guidelines:
return context
def _canvas_context(self, canvas_state: Dict[str, Any]) -> str:
"""
Build context from canvas state (nodes and edges).
This is CRITICAL for Claude to understand the current workflow
being built in the Canvas UI.
"""
context = "# Current Canvas State\n\n"
context += "**You are assisting the user with a Canvas Builder workflow.**\n"
context += "The canvas represents an optimization pipeline being configured visually.\n\n"
nodes = canvas_state.get("nodes", [])
edges = canvas_state.get("edges", [])
study_name = canvas_state.get("studyName", "Untitled")
study_path = canvas_state.get("studyPath", None)
context += f"**Study Name**: {study_name}\n"
if study_path:
context += f"**Study Path**: {study_path}\n"
context += "\n"
# Group nodes by type
node_types = {}
for node in nodes:
node_type = node.get("type", "unknown")
if node_type not in node_types:
node_types[node_type] = []
node_types[node_type].append(node)
# Model node
if "model" in node_types:
model = node_types["model"][0]
data = model.get("data", {})
context += "## Model\n"
context += f"- **Label**: {data.get('label', 'Model')}\n"
context += f"- **File Path**: {data.get('filePath', 'Not set')}\n"
context += f"- **File Type**: {data.get('fileType', 'Not set')}\n\n"
# Solver node
if "solver" in node_types:
solver = node_types["solver"][0]
data = solver.get("data", {})
context += "## Solver\n"
context += f"- **Type**: {data.get('solverType', 'Not set')}\n\n"
# Design variables
if "designVar" in node_types:
context += "## Design Variables\n\n"
context += "| Name | Expression | Min | Max | Baseline | Unit | Enabled |\n"
context += "|------|------------|-----|-----|----------|------|---------|\n"
for dv in node_types["designVar"]:
data = dv.get("data", {})
name = data.get("label", "?")
expr = data.get("expressionName", data.get("label", "?"))
min_val = data.get("minValue", "?")
max_val = data.get("maxValue", "?")
baseline = data.get("baseline", "-")
unit = data.get("unit", "-")
enabled = "" if data.get("enabled", True) else ""
context += f"| {name} | {expr} | {min_val} | {max_val} | {baseline} | {unit} | {enabled} |\n"
context += "\n"
# Extractors
if "extractor" in node_types:
context += "## Extractors\n\n"
for ext in node_types["extractor"]:
data = ext.get("data", {})
context += f"### {data.get('extractorName', data.get('label', 'Extractor'))}\n"
context += f"- **ID**: {data.get('extractorId', 'Not set')}\n"
context += f"- **Type**: {data.get('extractorType', 'Not set')}\n"
if data.get("extractMethod"):
context += f"- **Method**: {data.get('extractMethod')}\n"
if data.get("innerRadius"):
context += f"- **Inner Radius**: {data.get('innerRadius')}\n"
if data.get("nModes"):
context += f"- **Zernike Modes**: {data.get('nModes')}\n"
if data.get("subcases"):
context += f"- **Subcases**: {data.get('subcases')}\n"
if data.get("config"):
config = data.get("config", {})
if config.get("subcaseLabels"):
context += f"- **Subcase Labels**: {config.get('subcaseLabels')}\n"
if config.get("referenceSubcase"):
context += f"- **Reference Subcase**: {config.get('referenceSubcase')}\n"
context += "\n"
# Objectives
if "objective" in node_types:
context += "## Objectives\n\n"
context += "| Name | Direction | Weight | Penalty |\n"
context += "|------|-----------|--------|---------|\n"
for obj in node_types["objective"]:
data = obj.get("data", {})
name = data.get("name", data.get("label", "?"))
direction = data.get("direction", "minimize")
weight = data.get("weight", 1)
penalty = data.get("penaltyWeight", "-")
context += f"| {name} | {direction} | {weight} | {penalty} |\n"
context += "\n"
# Constraints
if "constraint" in node_types:
context += "## Constraints\n\n"
context += "| Name | Operator | Value |\n"
context += "|------|----------|-------|\n"
for con in node_types["constraint"]:
data = con.get("data", {})
name = data.get("name", data.get("label", "?"))
operator = data.get("operator", "?")
value = data.get("value", "?")
context += f"| {name} | {operator} | {value} |\n"
context += "\n"
# Algorithm
if "algorithm" in node_types:
algo = node_types["algorithm"][0]
data = algo.get("data", {})
context += "## Algorithm\n"
context += f"- **Method**: {data.get('method', 'Not set')}\n"
context += f"- **Max Trials**: {data.get('maxTrials', 'Not set')}\n"
if data.get("sigma0"):
context += f"- **CMA-ES Sigma0**: {data.get('sigma0')}\n"
if data.get("restartStrategy"):
context += f"- **Restart Strategy**: {data.get('restartStrategy')}\n"
context += "\n"
# Surrogate
if "surrogate" in node_types:
sur = node_types["surrogate"][0]
data = sur.get("data", {})
context += "## Surrogate\n"
context += f"- **Enabled**: {data.get('enabled', False)}\n"
context += f"- **Type**: {data.get('modelType', 'Not set')}\n"
context += f"- **Min Trials**: {data.get('minTrials', 'Not set')}\n\n"
# Edge connections summary
context += "## Connections\n\n"
context += f"Total edges: {len(edges)}\n"
context += "Flow: Design Variables → Model → Solver → Extractors → Objectives/Constraints → Algorithm\n\n"
# Canvas modification instructions
context += """## Canvas Modification Tools
When the user asks to modify the canvas (add/remove nodes, change values), use these MCP tools:
- `canvas_add_node` - Add a new node (designVar, extractor, objective, constraint)
- `canvas_update_node` - Update node properties (bounds, weights, names)
- `canvas_remove_node` - Remove a node from the canvas
- `canvas_connect_nodes` - Create an edge between nodes
**Example user requests you can handle:**
- "Add a design variable called hole_diameter with range 5-15 mm" → Use canvas_add_node
- "Change the weight of wfe_40_20 to 8" → Use canvas_update_node
- "Remove the constraint node" → Use canvas_remove_node
- "Connect the new extractor to the objective" → Use canvas_connect_nodes
Always respond with confirmation of changes made to the canvas.
"""
return context
def _mode_instructions(self, mode: str) -> str:
"""Mode-specific instructions"""
if mode == "power":

View File

@@ -1,15 +1,18 @@
"""
NX Model Introspection Service
NX Model Introspection Service - Real Implementation
Discovers expressions, solver types, dependent files, and actual result data
from NX model files. Uses PyNastran for OP2 result parsing.
Discovers expressions, solver types, and dependent files from NX model files.
Used by the Canvas Builder to help users configure optimization workflows.
"""
import json
import os
import re
import struct
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
@@ -21,6 +24,14 @@ ATOMIZER_ROOT = Path(os.path.normpath(os.path.dirname(os.path.dirname(os.path.di
)))))
STUDIES_ROOT = ATOMIZER_ROOT / "studies"
# Try to import PyNastran for OP2 parsing
try:
from pyNastran.op2.op2 import OP2
HAS_PYNASTRAN = True
except ImportError:
HAS_PYNASTRAN = False
logger.warning("PyNastran not available - OP2 parsing disabled")
class NXIntrospector:
"""Introspect NX model files to discover expressions, dependencies, and solver info."""
@@ -36,13 +47,31 @@ class NXIntrospector:
self.file_path = STUDIES_ROOT / self.relative_path
self.file_type = self.file_path.suffix.lower()
self.parent_dir = self.file_path.parent
self.study_dir = self._find_study_dir()
def _find_study_dir(self) -> Path:
"""Find the study root directory."""
# Walk up to find study markers (optimization_config.json, study.db, etc.)
current = self.parent_dir
for _ in range(5): # Max 5 levels up
if (current / "optimization_config.json").exists():
return current
if (current / "3_results").exists():
return current
if (current / "1_model").exists():
return current
if current == STUDIES_ROOT:
break
current = current.parent
return self.parent_dir
def introspect(self) -> Dict[str, Any]:
"""
Full introspection of the model file.
Returns:
Dict with expressions, solver_type, dependent_files, extractors_available, warnings
Dict with expressions, solver_type, dependent_files, extractors_available,
mesh_info, result_files, warnings
"""
result = {
"file_path": self.relative_path,
@@ -50,8 +79,11 @@ class NXIntrospector:
"expressions": [],
"solver_type": None,
"dependent_files": [],
"result_files": [],
"mesh_info": None,
"extractors_available": [],
"warnings": [],
"study_dir": str(self.study_dir.relative_to(STUDIES_ROOT)).replace("\\", "/") if self.study_dir != self.parent_dir else None,
}
if not self.file_path.exists():
@@ -59,170 +91,372 @@ class NXIntrospector:
return result
try:
if self.file_type == '.sim':
result.update(self._introspect_sim())
elif self.file_type == '.prt':
result.update(self._introspect_prt())
elif self.file_type in ['.fem', '.afem']:
result.update(self._introspect_fem())
# Step 1: Discover related files
result["dependent_files"] = self._discover_related_files()
# Try to load expressions from optimization_config.json if present
# Step 2: Detect solver type from files
result["solver_type"] = self._detect_solver_type()
# Step 3: Find and analyze OP2 result files
op2_files = self._find_op2_files()
if op2_files:
result["result_files"] = op2_files
# Analyze the first OP2 file for available result types
op2_analysis = self._analyze_op2(op2_files[0]["path"]) if HAS_PYNASTRAN else None
if op2_analysis:
result["op2_analysis"] = op2_analysis
# Step 4: Try to get mesh info from FEM files
fem_file = self._find_fem_file()
if fem_file:
mesh_info = self._analyze_fem(fem_file)
if mesh_info:
result["mesh_info"] = mesh_info
# Step 5: Parse BDF for actual parameter values
bdf_file = self._find_bdf_file()
if bdf_file:
bdf_analysis = self._analyze_bdf(bdf_file)
if bdf_analysis:
result["bdf_analysis"] = bdf_analysis
if bdf_analysis.get("mass"):
result["mass_from_bdf"] = bdf_analysis["mass"]
# Step 6: Try to load expressions from config or discover them
config_expressions = self._load_expressions_from_config()
if config_expressions:
result["expressions"] = config_expressions
# If still no expressions, try from study history
if not result["expressions"]:
result["expressions"] = self._discover_common_expressions()
else:
# Try to discover from study history
historical = self._discover_from_study_history()
if historical:
result["expressions"] = historical
else:
# Fall back to common patterns
result["expressions"] = self._discover_common_expressions()
except Exception as e:
logger.error(f"Introspection error: {e}")
logger.error(f"Introspection error: {e}", exc_info=True)
result["warnings"].append(str(e))
# Suggest extractors based on solver type
result["extractors_available"] = self._suggest_extractors(result.get("solver_type"))
# Suggest extractors based on solver type and available data
result["extractors_available"] = self._suggest_extractors(
result.get("solver_type"),
result.get("result_files", []),
result.get("op2_analysis")
)
return result
def _introspect_sim(self) -> Dict[str, Any]:
"""Introspect .sim file."""
result = {
"solver_type": None,
"dependent_files": [],
}
def _discover_related_files(self) -> List[Dict[str, Any]]:
"""Find all related NX files by naming convention."""
related = []
# Get base name without _sim1, _fem1, _i suffixes
base_name = self.file_path.stem
base_name = re.sub(r'_sim\d*$', '', base_name)
base_name = re.sub(r'_fem\d*$', '', base_name)
base_name = re.sub(r'_i$', '', base_name)
# Find related files in the same directory and parent
# Search directories
search_dirs = [self.parent_dir]
if self.parent_dir.name in ['1_config', '1_setup', 'config', 'setup']:
search_dirs.append(self.parent_dir.parent)
if self.study_dir != self.parent_dir:
search_dirs.append(self.study_dir)
# Also check 1_model subfolder
model_dir = self.study_dir / "1_model"
if model_dir.exists():
search_dirs.append(model_dir)
seen_paths = set()
for search_dir in search_dirs:
if not search_dir.exists():
continue
for ext in ['.prt', '.fem', '.afem']:
# Look for variations of the file name
patterns = [
f"{base_name}{ext}",
f"{base_name.replace('_sim1', '')}{ext}",
f"{base_name.replace('_sim1', '_fem1')}{ext}",
]
# Define patterns to search for
patterns = [
(f"{base_name}.prt", "geometry"),
(f"{base_name}_i.prt", "idealized"),
(f"{base_name}_fem*.fem", "fem"),
(f"{base_name}_fem*_i.prt", "idealized_fem"),
(f"{base_name}_sim*.sim", "simulation"),
(f"{base_name}.afem", "assembled_fem"),
]
for pattern in patterns:
file_candidate = search_dir / pattern
if file_candidate.exists():
result["dependent_files"].append({
"path": str(file_candidate.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"type": ext[1:],
"name": file_candidate.name,
for pattern, file_category in patterns:
for match in search_dir.glob(pattern):
if match.exists() and str(match) not in seen_paths:
seen_paths.add(str(match))
try:
rel_path = str(match.relative_to(STUDIES_ROOT)).replace("\\", "/")
except ValueError:
rel_path = str(match)
related.append({
"name": match.name,
"path": rel_path,
"type": match.suffix[1:].lower(),
"category": file_category,
"size": match.stat().st_size,
})
# Find idealized part (*_i.prt) - critical for mesh updates
for f in search_dir.glob("*_i.prt"):
result["dependent_files"].append({
"path": str(f.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"type": "idealized_prt",
"name": f.name,
return related
def _find_op2_files(self) -> List[Dict[str, Any]]:
"""Find OP2 result files in the study."""
op2_files = []
# Search in iterations/results folders
search_dirs = [
self.study_dir / "2_iterations",
self.study_dir / "3_results",
self.parent_dir,
]
for search_dir in search_dirs:
if not search_dir.exists():
continue
# Search recursively for OP2 files (limit depth to avoid going too deep)
for op2_path in search_dir.rglob("*.op2"):
try:
rel_path = str(op2_path.relative_to(STUDIES_ROOT)).replace("\\", "/")
except ValueError:
rel_path = str(op2_path)
op2_files.append({
"name": op2_path.name,
"path": rel_path,
"full_path": str(op2_path),
"size": op2_path.stat().st_size,
"trial_folder": op2_path.parent.name if "trial_" in op2_path.parent.name else None,
})
# Try to determine solver type
result["solver_type"] = self._detect_solver_type()
# Limit to 10 OP2 files for performance
if len(op2_files) >= 10:
break
return result
return op2_files
def _introspect_prt(self) -> Dict[str, Any]:
"""Introspect .prt file."""
result = {
"dependent_files": [],
}
def _analyze_op2(self, op2_path: str) -> Optional[Dict[str, Any]]:
"""Analyze an OP2 file to discover available result types."""
if not HAS_PYNASTRAN:
return None
full_path = STUDIES_ROOT / op2_path
if not full_path.exists():
return None
try:
op2 = OP2()
op2.set_results_to_include({
'displacements': True,
'eigenvectors': True,
'solid_stress': True,
'plate_stress': True,
})
op2.read_op2(str(full_path), build_dataframe=False)
analysis = {
"subcases": list(op2.displacements.keys()) if op2.displacements else [],
"has_displacements": bool(op2.displacements),
"has_eigenvectors": bool(op2.eigenvectors),
"has_solid_stress": bool(getattr(op2, 'solid_stress', None)),
"has_plate_stress": bool(getattr(op2, 'plate_stress', None)),
}
# Get node count from displacement results
if op2.displacements:
first_subcase = list(op2.displacements.values())[0]
analysis["node_count"] = len(first_subcase.node_gridtype)
# Get eigenvalue info if modal analysis
if op2.eigenvectors:
first_subcase = list(op2.eigenvectors.values())[0]
if hasattr(first_subcase, 'eigrs'):
# Convert eigenvalues to frequencies
import numpy as np
eigenvalues = first_subcase.eigrs
frequencies = np.sqrt(np.abs(eigenvalues)) / (2 * np.pi)
analysis["frequencies_hz"] = frequencies[:10].tolist() # First 10 modes
analysis["num_modes"] = len(eigenvalues)
return analysis
except Exception as e:
logger.warning(f"OP2 analysis failed: {e}")
return {"error": str(e)}
def _find_fem_file(self) -> Optional[Path]:
"""Find the FEM file for this model."""
base_name = self.file_path.stem
base_name = re.sub(r'_sim\d*$', '', base_name)
base_name = re.sub(r'_i$', '', base_name)
# Look for associated .sim and .fem files
search_dirs = [self.parent_dir]
if self.parent_dir.name in ['1_config', '1_setup', 'config', 'setup']:
search_dirs.append(self.parent_dir.parent)
patterns = [
f"{base_name}.fem",
f"{base_name}_fem1.fem",
f"{base_name}_fem.fem",
]
for search_dir in search_dirs:
for search_dir in [self.parent_dir, self.study_dir / "1_model"]:
if not search_dir.exists():
continue
for ext in ['.sim', '.fem', '.afem']:
patterns = [
f"{base_name}{ext}",
f"{base_name}_sim1{ext}",
f"{base_name}_fem1{ext}",
]
for pattern in patterns:
file_candidate = search_dir / pattern
if file_candidate.exists():
result["dependent_files"].append({
"path": str(file_candidate.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"type": ext[1:],
"name": file_candidate.name,
})
return result
def _introspect_fem(self) -> Dict[str, Any]:
"""Introspect .fem or .afem file."""
result = {
"dependent_files": [],
}
base_name = self.file_path.stem
# Look for associated files
for ext in ['.prt', '.sim']:
patterns = [
f"{base_name}{ext}",
f"{base_name.replace('_fem1', '')}{ext}",
f"{base_name.replace('_fem1', '_sim1')}{ext}",
]
for pattern in patterns:
file_candidate = self.parent_dir / pattern
if file_candidate.exists():
result["dependent_files"].append({
"path": str(file_candidate.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"type": ext[1:],
"name": file_candidate.name,
})
fem_path = search_dir / pattern
if fem_path.exists():
return fem_path
return result
return None
def _analyze_fem(self, fem_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze FEM file for mesh statistics."""
try:
# FEM files are binary - we can get basic stats from file size
# For actual mesh data, we'd need NX Open API
stats = {
"path": str(fem_path.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"size_mb": round(fem_path.stat().st_size / 1024 / 1024, 2),
}
# Try to find corresponding .dat file for actual mesh info
dat_path = fem_path.with_suffix('.dat')
if dat_path.exists():
dat_analysis = self._analyze_dat_file(dat_path)
if dat_analysis:
stats.update(dat_analysis)
return stats
except Exception as e:
logger.warning(f"FEM analysis failed: {e}")
return None
def _find_bdf_file(self) -> Optional[Path]:
"""Find BDF/DAT file in the study."""
# Check iterations folder first (most recent analysis)
iterations_dir = self.study_dir / "2_iterations"
if iterations_dir.exists():
# Look in the most recent trial folder
trial_folders = sorted(
[d for d in iterations_dir.iterdir() if d.is_dir() and d.name.startswith("trial_")],
key=lambda x: x.name,
reverse=True
)
if trial_folders:
for trial in trial_folders[:3]: # Check last 3 trials
for ext in ['.dat', '.bdf']:
for bdf_path in trial.glob(f"*{ext}"):
return bdf_path
# Check model directory
for search_dir in [self.parent_dir, self.study_dir / "1_model"]:
if search_dir.exists():
for ext in ['.dat', '.bdf']:
for bdf_path in search_dir.glob(f"*{ext}"):
return bdf_path
return None
def _analyze_bdf(self, bdf_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze BDF/DAT file for mass and other properties."""
try:
analysis = {
"path": str(bdf_path.relative_to(STUDIES_ROOT)).replace("\\", "/"),
}
with open(bdf_path, 'r', errors='ignore') as f:
content = f.read()
# Extract mass from GRID+element cards or PARAM,WTMASS
# Look for mass in comments or parameters
mass_match = re.search(r'(?:MASS|mass)\s*[=:]\s*([\d.eE+-]+)', content)
if mass_match:
analysis["mass"] = float(mass_match.group(1))
# Count grid points
grid_count = len(re.findall(r'^GRID[\s,]', content, re.MULTILINE))
if grid_count > 0:
analysis["grid_count"] = grid_count
# Count elements by type
element_counts = {}
for elem_type in ['CTETRA', 'CHEXA', 'CPENTA', 'CTRIA3', 'CQUAD4', 'CBAR', 'CBEAM']:
count = len(re.findall(rf'^{elem_type}[\s,]', content, re.MULTILINE))
if count > 0:
element_counts[elem_type.lower()] = count
if element_counts:
analysis["elements"] = element_counts
analysis["total_elements"] = sum(element_counts.values())
# Detect solver type from executive control
if 'SOL 101' in content or 'SOL101' in content:
analysis["solver"] = "SOL101"
elif 'SOL 103' in content or 'SOL103' in content:
analysis["solver"] = "SOL103"
elif 'SOL 111' in content or 'SOL111' in content:
analysis["solver"] = "SOL111"
return analysis
except Exception as e:
logger.warning(f"BDF analysis failed: {e}")
return None
def _analyze_dat_file(self, dat_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze .dat file for mesh/model info."""
try:
analysis = {}
with open(dat_path, 'r', errors='ignore') as f:
# Read first 10000 chars for efficiency
content = f.read(10000)
# Count grid points
grid_count = len(re.findall(r'^GRID[\s,]', content, re.MULTILINE))
if grid_count > 0:
analysis["node_count"] = grid_count
return analysis if analysis else None
except Exception as e:
return None
def _detect_solver_type(self) -> Optional[str]:
"""Detect solver type from file name or contents."""
name_lower = self.file_path.name.lower()
parent_lower = str(self.parent_dir).lower()
"""Detect solver type from files and naming."""
# First check BDF file
bdf_file = self._find_bdf_file()
if bdf_file:
analysis = self._analyze_bdf(bdf_file)
if analysis and analysis.get("solver"):
return analysis["solver"]
# Infer from naming conventions
if 'modal' in name_lower or 'freq' in name_lower or 'modal' in parent_lower:
return 'SOL103' # Modal analysis
elif 'static' in name_lower or 'stress' in name_lower:
return 'SOL101' # Static analysis
elif 'thermal' in name_lower or 'heat' in name_lower:
return 'SOL153' # Thermal
elif 'dynamic' in name_lower:
return 'SOL111' # Frequency response
elif 'mirror' in parent_lower or 'wfe' in parent_lower:
return 'SOL101' # Mirrors usually use static analysis
name_lower = self.file_path.name.lower()
parent_lower = str(self.study_dir).lower()
# Default to static
return 'SOL101'
if 'modal' in name_lower or 'freq' in name_lower or 'modal' in parent_lower:
return 'SOL103'
elif 'static' in name_lower or 'stress' in name_lower:
return 'SOL101'
elif 'thermal' in name_lower or 'heat' in name_lower:
return 'SOL153'
elif 'dynamic' in name_lower:
return 'SOL111'
elif 'mirror' in parent_lower or 'wfe' in parent_lower:
return 'SOL101'
return 'SOL101' # Default
def _load_expressions_from_config(self) -> List[Dict[str, Any]]:
"""Load expressions from optimization_config.json if it exists."""
expressions = []
# Look for config file in study directory
config_paths = [
self.study_dir / "optimization_config.json",
self.study_dir / "1_config" / "optimization_config.json",
self.parent_dir / "optimization_config.json",
self.parent_dir / "1_config" / "optimization_config.json",
self.parent_dir / "1_setup" / "optimization_config.json",
self.parent_dir.parent / "optimization_config.json",
self.parent_dir.parent / "1_config" / "optimization_config.json",
]
for config_path in config_paths:
@@ -231,14 +465,17 @@ class NXIntrospector:
with open(config_path, 'r') as f:
config = json.load(f)
# Extract design variables
design_vars = config.get("design_variables", [])
for dv in design_vars:
expr_name = dv.get("name", dv.get("expression", "unknown"))
expr_min = dv.get("min", 0)
expr_max = dv.get("max", 100)
expressions.append({
"name": dv.get("name", dv.get("expression", "unknown")),
"value": (dv.get("min", 0) + dv.get("max", 100)) / 2,
"min": dv.get("min"),
"max": dv.get("max"),
"name": expr_name,
"value": (expr_min + expr_max) / 2,
"min": expr_min,
"max": expr_max,
"unit": dv.get("unit", "mm"),
"type": "design_variable",
"source": "config",
@@ -250,68 +487,139 @@ class NXIntrospector:
return expressions
def _discover_from_study_history(self) -> List[Dict[str, Any]]:
"""Try to discover expressions from study database or previous trials."""
expressions = []
# Check study.db for parameter history
db_path = self.study_dir / "3_results" / "study.db"
if db_path.exists():
try:
import sqlite3
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Try Optuna schema first
cursor.execute("""
SELECT DISTINCT param_name, param_value
FROM trial_params
ORDER BY trial_id DESC
LIMIT 20
""")
rows = cursor.fetchall()
param_values: Dict[str, List[float]] = {}
for name, value in rows:
if name not in param_values:
param_values[name] = []
try:
param_values[name].append(float(value))
except (ValueError, TypeError):
pass
for name, values in param_values.items():
if values:
expressions.append({
"name": name,
"value": sum(values) / len(values),
"min": min(values),
"max": max(values),
"unit": "mm",
"type": "design_variable",
"source": "database",
})
conn.close()
except Exception as e:
logger.debug(f"Database query failed: {e}")
return expressions
def _discover_common_expressions(self) -> List[Dict[str, Any]]:
"""Discover common expressions based on study type."""
# Check parent directory name to infer study type
parent_lower = str(self.parent_dir).lower()
parent_lower = str(self.study_dir).lower()
if 'mirror' in parent_lower:
return [
{"name": "flatback_thickness", "value": 30.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "rib_height", "value": 40.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "rib_width", "value": 8.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "fillet_radius", "value": 5.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "web_thickness", "value": 4.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "flatback_thickness", "value": 30.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "rib_height", "value": 40.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "rib_width", "value": 8.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "fillet_radius", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"},
]
elif 'bracket' in parent_lower:
return [
{"name": "thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "height", "value": 30.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "hole_diameter", "value": 8.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "height", "value": 30.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "template"},
]
elif 'beam' in parent_lower:
return [
{"name": "height", "value": 100.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "web_thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "flange_thickness", "value": 8.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "height", "value": 100.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "web_thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"},
]
# Generic expressions
# Generic
return [
{"name": "thickness", "value": 10.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "length", "value": 100.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "height", "value": 25.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "inferred"},
{"name": "thickness", "value": 10.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "length", "value": 100.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"},
]
def _suggest_extractors(self, solver_type: Optional[str]) -> List[Dict[str, Any]]:
"""Suggest extractors based on solver type."""
def _suggest_extractors(
self,
solver_type: Optional[str],
result_files: List[Dict[str, Any]],
op2_analysis: Optional[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Suggest extractors based on solver type and available data."""
extractors = [
{"id": "E4", "name": "Mass (BDF)", "description": "Extract mass from BDF file", "always": True},
{"id": "E5", "name": "Mass (Expression)", "description": "Extract mass from NX expression", "always": True},
{"id": "E4", "name": "Mass (BDF)", "description": "Extract mass from BDF file", "always": True, "available": True},
{"id": "E5", "name": "Mass (Expression)", "description": "Extract mass from NX expression", "always": True, "available": True},
]
if solver_type == 'SOL101':
# Determine availability based on OP2 analysis
has_displacements = op2_analysis.get("has_displacements", False) if op2_analysis else False
has_eigenvectors = op2_analysis.get("has_eigenvectors", False) if op2_analysis else False
has_stress = op2_analysis.get("has_solid_stress", False) or op2_analysis.get("has_plate_stress", False) if op2_analysis else False
has_results = len(result_files) > 0
if solver_type == 'SOL101' or has_displacements:
extractors.extend([
{"id": "E1", "name": "Displacement", "description": "Max displacement from static analysis", "always": False},
{"id": "E3", "name": "Stress", "description": "Von Mises stress from static analysis", "always": False},
])
elif solver_type == 'SOL103':
extractors.extend([
{"id": "E2", "name": "Frequency", "description": "Natural frequencies from modal analysis", "always": False},
{
"id": "E1",
"name": "Displacement",
"description": "Max displacement from static analysis",
"always": False,
"available": has_displacements or has_results
},
{
"id": "E3",
"name": "Stress",
"description": "Von Mises stress from static analysis",
"always": False,
"available": has_stress or has_results
},
])
# Check if study appears to be mirror-related
parent_lower = str(self.parent_dir).lower()
if solver_type == 'SOL103' or has_eigenvectors:
extractors.append({
"id": "E2",
"name": "Frequency",
"description": "Natural frequencies from modal analysis",
"always": False,
"available": has_eigenvectors or has_results
})
# Mirror-specific extractors
parent_lower = str(self.study_dir).lower()
if 'mirror' in parent_lower or 'wfe' in parent_lower:
extractors.extend([
{"id": "E8", "name": "Zernike Coefficients", "description": "Zernike polynomial coefficients", "always": False},
{"id": "E9", "name": "Zernike RMS", "description": "RMS wavefront error", "always": False},
{"id": "E10", "name": "Zernike WFE", "description": "Weighted WFE metric", "always": False},
{"id": "E8", "name": "Zernike Coefficients", "description": "Zernike polynomial coefficients from OP2", "always": False, "available": has_displacements},
{"id": "E9", "name": "Zernike CSV", "description": "Zernike from CSV export", "always": False, "available": True},
{"id": "E10", "name": "Zernike RMS WFE", "description": "RMS wavefront error calculation", "always": False, "available": True},
])
return extractors

View File

@@ -2,12 +2,15 @@
Session Manager
Manages persistent Claude Code sessions with MCP integration.
Fixed for Windows compatibility - uses subprocess.Popen with ThreadPoolExecutor.
"""
import asyncio
import json
import os
import subprocess
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
@@ -20,6 +23,9 @@ from .context_builder import ContextBuilder
ATOMIZER_ROOT = Path(__file__).parent.parent.parent.parent.parent
MCP_SERVER_PATH = ATOMIZER_ROOT / "mcp-server" / "atomizer-tools"
# Thread pool for subprocess operations (Windows compatible)
_executor = ThreadPoolExecutor(max_workers=4)
@dataclass
class ClaudeSession:
@@ -28,13 +34,12 @@ class ClaudeSession:
session_id: str
mode: Literal["user", "power"]
study_id: Optional[str]
process: Optional[asyncio.subprocess.Process] = None
created_at: datetime = field(default_factory=datetime.now)
last_active: datetime = field(default_factory=datetime.now)
def is_alive(self) -> bool:
"""Check if the subprocess is still running"""
return self.process is not None and self.process.returncode is None
"""Session is always 'alive' - we use stateless CLI calls"""
return True
class SessionManager:
@@ -45,7 +50,7 @@ class SessionManager:
self.store = ConversationStore()
self.context_builder = ContextBuilder()
self._cleanup_task: Optional[asyncio.Task] = None
self._lock: Optional[asyncio.Lock] = None # Created lazily in async context
self._lock: Optional[asyncio.Lock] = None
def _get_lock(self) -> asyncio.Lock:
"""Get or create the async lock (must be called from async context)"""
@@ -55,7 +60,6 @@ class SessionManager:
async def start(self):
"""Start the session manager"""
# Start periodic cleanup of stale sessions
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self):
@@ -67,9 +71,9 @@ class SessionManager:
except asyncio.CancelledError:
pass
# Terminate all sessions
# Clean up temp files
for session in list(self.sessions.values()):
await self._terminate_session(session)
self._cleanup_session_files(session.session_id)
async def create_session(
self,
@@ -80,22 +84,16 @@ class SessionManager:
"""
Create or resume a Claude Code session.
Args:
mode: "user" for safe mode, "power" for full access
study_id: Optional study context
resume_session_id: Optional session ID to resume
Returns:
ClaudeSession object
Note: Sessions are now stateless - we don't spawn persistent processes.
Each message is handled via a one-shot CLI call for Windows compatibility.
"""
async with self._get_lock():
# Resume existing session if requested and alive
# Resume existing session if requested
if resume_session_id and resume_session_id in self.sessions:
session = self.sessions[resume_session_id]
if session.is_alive():
session.last_active = datetime.now()
self.store.touch_session(session.session_id)
return session
session.last_active = datetime.now()
self.store.touch_session(session.session_id)
return session
session_id = resume_session_id or str(uuid.uuid4())[:8]
@@ -112,51 +110,11 @@ class SessionManager:
with open(mcp_config_path, "w") as f:
json.dump(mcp_config, f)
# Build system prompt with context
history = self.store.get_history(session_id) if resume_session_id else []
system_prompt = self.context_builder.build(
mode=mode,
study_id=study_id,
conversation_history=history,
)
# Write system prompt to temp file
prompt_path = ATOMIZER_ROOT / f".claude-prompt-{session_id}.md"
with open(prompt_path, "w") as f:
f.write(system_prompt)
# Build environment
env = os.environ.copy()
env["ATOMIZER_MODE"] = mode
env["ATOMIZER_ROOT"] = str(ATOMIZER_ROOT)
if study_id:
env["ATOMIZER_STUDY"] = study_id
# Start Claude Code subprocess
# Note: claude CLI with appropriate flags for JSON streaming
try:
process = await asyncio.create_subprocess_exec(
"claude",
"--print", # Non-interactive mode
"--output-format", "stream-json",
"--mcp-config", str(mcp_config_path),
"--system-prompt", str(prompt_path),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(ATOMIZER_ROOT),
env=env,
)
except FileNotFoundError:
# Claude CLI not found - create session without process
# Frontend will get error on first message
process = None
# Create session object (no subprocess - stateless)
session = ClaudeSession(
session_id=session_id,
mode=mode,
study_id=study_id,
process=process,
)
self.sessions[session_id] = session
@@ -166,19 +124,17 @@ class SessionManager:
self,
session_id: str,
message: str,
canvas_state: Optional[Dict] = None,
) -> AsyncGenerator[Dict, None]:
"""
Send a message to a session and stream the response.
Uses one-shot Claude CLI calls (claude --print) since the CLI
doesn't support persistent interactive sessions via stdin/stdout.
Uses synchronous subprocess.Popen via ThreadPoolExecutor for Windows compatibility.
Args:
session_id: Session ID
session_id: The session ID
message: User message
Yields:
Response chunks (text, tool_calls, errors, done)
canvas_state: Optional canvas state (nodes, edges) from UI
"""
session = self.sessions.get(session_id)
@@ -191,23 +147,20 @@ class SessionManager:
# Store user message
self.store.add_message(session_id, "user", message)
# Build context with conversation history
# Build context with conversation history AND canvas state
history = self.store.get_history(session_id, limit=10)
full_prompt = self.context_builder.build(
mode=session.mode,
study_id=session.study_id,
conversation_history=history[:-1], # Exclude current message
conversation_history=history[:-1],
canvas_state=canvas_state, # Pass canvas state for context
)
full_prompt += f"\n\nUser: {message}\n\nRespond helpfully and concisely:"
# Run Claude CLI one-shot
full_response = ""
tool_calls: List[Dict] = []
# Build CLI arguments based on mode
# Build CLI arguments
cli_args = ["claude", "--print"]
# Ensure MCP config exists for atomizer tools
# Ensure MCP config exists
mcp_config_path = ATOMIZER_ROOT / f".claude-mcp-{session_id}.json"
if not mcp_config_path.exists():
mcp_config = self._build_mcp_config(session.mode)
@@ -216,56 +169,61 @@ class SessionManager:
cli_args.extend(["--mcp-config", str(mcp_config_path)])
if session.mode == "user":
# User mode: Allow safe operations including report generation
# Allow Write tool for report files (STUDY_REPORT.md, *.md in study dirs)
cli_args.extend([
"--allowedTools",
"Read Write(**/STUDY_REPORT.md) Write(**/3_results/*.md) Bash(python:*) mcp__atomizer-tools__*"
])
else:
# Power mode: Full access
cli_args.append("--dangerously-skip-permissions")
# Pass prompt via stdin (handles long prompts and special characters)
cli_args.append("-") # Read from stdin
full_response = ""
tool_calls: List[Dict] = []
try:
process = await asyncio.create_subprocess_exec(
*cli_args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(ATOMIZER_ROOT),
)
loop = asyncio.get_event_loop()
# Send prompt via stdin
process.stdin.write(full_prompt.encode())
await process.stdin.drain()
process.stdin.close()
await process.stdin.wait_closed()
# Run subprocess in thread pool (Windows compatible)
def run_claude():
try:
process = subprocess.Popen(
cli_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(ATOMIZER_ROOT),
text=True,
encoding='utf-8',
errors='replace',
)
stdout, stderr = process.communicate(input=full_prompt, timeout=300)
return {
"stdout": stdout,
"stderr": stderr,
"returncode": process.returncode,
}
except subprocess.TimeoutExpired:
process.kill()
return {"error": "Response timeout (5 minutes)"}
except FileNotFoundError:
return {"error": "Claude CLI not found in PATH. Install with: npm install -g @anthropic-ai/claude-code"}
except Exception as e:
return {"error": str(e)}
# Stream stdout
buffer = ""
while True:
chunk = await process.stdout.read(100)
if not chunk:
break
result = await loop.run_in_executor(_executor, run_claude)
text = chunk.decode()
full_response += text
yield {"type": "text", "content": text}
if "error" in result:
yield {"type": "error", "message": result["error"]}
else:
full_response = result["stdout"] or ""
await process.wait()
if full_response:
yield {"type": "text", "content": full_response}
if process.returncode != 0:
stderr = await process.stderr.read()
error_msg = stderr.decode() if stderr else "Unknown error"
yield {"type": "error", "message": f"CLI error: {error_msg}"}
if result["returncode"] != 0 and result["stderr"]:
yield {"type": "error", "message": f"CLI error: {result['stderr']}"}
except asyncio.TimeoutError:
yield {"type": "error", "message": "Response timeout"}
except FileNotFoundError:
yield {"type": "error", "message": "Claude CLI not found in PATH"}
except Exception as e:
yield {"type": "error", "message": str(e)}
@@ -285,31 +243,21 @@ class SessionManager:
session_id: str,
new_mode: Literal["user", "power"],
) -> ClaudeSession:
"""
Switch a session's mode (requires restart).
Args:
session_id: Session to switch
new_mode: New mode ("user" or "power")
Returns:
New ClaudeSession with updated mode
"""
"""Switch a session's mode"""
session = self.sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
study_id = session.study_id
session.mode = new_mode
self.store.update_session(session_id, mode=new_mode)
# Terminate existing session
await self._terminate_session(session)
# Rebuild MCP config with new mode
mcp_config = self._build_mcp_config(new_mode)
mcp_config_path = ATOMIZER_ROOT / f".claude-mcp-{session_id}.json"
with open(mcp_config_path, "w") as f:
json.dump(mcp_config, f)
# Create new session with same ID but different mode
return await self.create_session(
mode=new_mode,
study_id=study_id,
resume_session_id=session_id,
)
return session
async def set_study_context(
self,
@@ -322,16 +270,6 @@ class SessionManager:
session.study_id = study_id
self.store.update_session(session_id, study_id=study_id)
# If session is alive, send context update
if session.is_alive() and session.process:
context_update = self.context_builder.build_study_context(study_id)
context_msg = f"[CONTEXT UPDATE] Study changed to: {study_id}\n\n{context_update}"
try:
session.process.stdin.write(f"{context_msg}\n".encode())
await session.process.stdin.drain()
except Exception:
pass # Ignore errors for context updates
def get_session(self, session_id: str) -> Optional[ClaudeSession]:
"""Get session by ID"""
return self.sessions.get(session_id)
@@ -369,20 +307,11 @@ class SessionManager:
},
}
async def _terminate_session(self, session: ClaudeSession):
"""Terminate a Claude session and clean up"""
if session.process and session.is_alive():
session.process.terminate()
try:
await asyncio.wait_for(session.process.wait(), timeout=5.0)
except asyncio.TimeoutError:
session.process.kill()
await session.process.wait()
# Clean up temp files
def _cleanup_session_files(self, session_id: str):
"""Clean up temp files for a session"""
for pattern in [
f".claude-mcp-{session.session_id}.json",
f".claude-prompt-{session.session_id}.md",
f".claude-mcp-{session_id}.json",
f".claude-prompt-{session_id}.md",
]:
path = ATOMIZER_ROOT / pattern
if path.exists():
@@ -391,9 +320,6 @@ class SessionManager:
except Exception:
pass
# Remove from active sessions
self.sessions.pop(session.session_id, None)
async def _cleanup_loop(self):
"""Periodically clean up stale sessions"""
while True:
@@ -404,24 +330,22 @@ class SessionManager:
stale = [
sid
for sid, session in list(self.sessions.items())
if (now - session.last_active).total_seconds() > 3600 # 1 hour
if (now - session.last_active).total_seconds() > 3600
]
for sid in stale:
session = self.sessions.get(sid)
if session:
await self._terminate_session(session)
self._cleanup_session_files(sid)
self.sessions.pop(sid, None)
# Also clean up database
self.store.cleanup_stale_sessions(max_age_hours=24)
except asyncio.CancelledError:
break
except Exception:
pass # Continue cleanup loop on errors
pass
# Global instance for the application
# Global instance
_session_manager: Optional[SessionManager] = None