""" Atomizer Claude Agent Service Provides Claude AI integration with Atomizer-specific tools for: - Analyzing optimization results - Querying trial data - Modifying configurations - Creating new studies - Explaining FEA/Zernike concepts """ import os import json import sqlite3 from pathlib import Path from typing import Optional, List, Dict, Any, AsyncGenerator from datetime import datetime import anthropic # Base studies directory STUDIES_DIR = Path(__file__).parent.parent.parent.parent.parent / "studies" ATOMIZER_ROOT = Path(__file__).parent.parent.parent.parent.parent class AtomizerClaudeAgent: """Claude agent with Atomizer-specific tools and context""" def __init__(self, study_id: Optional[str] = None): self.client = anthropic.Anthropic() self.study_id = study_id self.study_dir = STUDIES_DIR / study_id if study_id else None self.canvas_state: Optional[Dict[str, Any]] = None # Current canvas/spec state self.interview = None # Interview engine instance (if active) self.tools = self._define_tools() self.system_prompt = self._build_system_prompt() def set_canvas_state(self, spec: Dict[str, Any]) -> None: """Update the current canvas state for context. This should be called: 1. When a study is loaded 2. When the frontend sends a canvas_edit message 3. After any spec modification """ self.canvas_state = spec # Rebuild system prompt with new canvas state self.system_prompt = self._build_system_prompt() def load_current_spec(self) -> Optional[Dict[str, Any]]: """Load the current atomizer_spec.json and update canvas state""" if not self.study_dir: return None spec_path = self.study_dir / "atomizer_spec.json" if not spec_path.exists(): return None with open(spec_path, 'r', encoding='utf-8') as f: spec = json.load(f) self.canvas_state = spec return spec def _format_canvas_context(self) -> str: """Format current canvas state for Claude's system prompt. This gives Claude real-time awareness of what's on the canvas, enabling bi-directional sync where Claude sees user's edits. """ if not self.canvas_state: return "" spec = self.canvas_state lines = ["\n## Current Canvas State"] lines.append("*The user can see this canvas. When you modify it, they see changes in real-time.*\n") # Model model = spec.get('model', {}) sim_path = model.get('sim', {}).get('path', '') if sim_path: lines.append(f"**Model**: `{sim_path}`") # Design Variables dvs = spec.get('design_variables', []) if dvs: lines.append(f"\n**Design Variables ({len(dvs)}):**") for dv in dvs: bounds = dv.get('bounds', {}) units = f" {dv.get('units', '')}" if dv.get('units') else "" enabled = "" if dv.get('enabled', True) else " (disabled)" lines.append(f" - `{dv.get('id')}`: **{dv.get('name')}** [{bounds.get('min')}, {bounds.get('max')}]{units}{enabled}") # Extractors exts = spec.get('extractors', []) if exts: lines.append(f"\n**Extractors ({len(exts)}):**") for ext in exts: ext_type = ext.get('type', 'unknown') enabled = "" if ext.get('enabled', True) else " (disabled)" lines.append(f" - `{ext.get('id')}`: **{ext.get('name')}** ({ext_type}){enabled}") # Objectives objs = spec.get('objectives', []) if objs: lines.append(f"\n**Objectives ({len(objs)}):**") for obj in objs: direction = obj.get('direction', 'minimize') weight = obj.get('weight', 1.0) enabled = "" if obj.get('enabled', True) else " (disabled)" weight_str = f" [weight: {weight}]" if weight != 1.0 else "" lines.append(f" - `{obj.get('id')}`: **{direction}** {obj.get('name')}{weight_str}{enabled}") # Constraints cons = spec.get('constraints', []) if cons: lines.append(f"\n**Constraints ({len(cons)}):**") for con in cons: op = con.get('operator', '<=') threshold = con.get('threshold', 0) units = f" {con.get('units', '')}" if con.get('units') else "" enabled = "" if con.get('enabled', True) else " (disabled)" lines.append(f" - `{con.get('id')}`: **{con.get('name')}** {op} {threshold}{units}{enabled}") # Optimization settings opt = spec.get('optimization', {}) if opt: algo = opt.get('algorithm', {}).get('type', 'TPE') budget = opt.get('budget', {}).get('max_trials', 100) lines.append(f"\n**Optimization**: {algo}, max {budget} trials") # Empty state message if not dvs and not exts and not objs: lines.append("\n*Canvas is empty. Help the user add design variables, extractors, and objectives.*") return "\n".join(lines) def _build_system_prompt(self) -> str: """Build context-aware system prompt for Atomizer""" base_prompt = """You are the Atomizer Assistant - the intelligent assistant built into Atomizer for FEA optimization. ## Your Identity - You are "Atomizer Assistant" - never mention AI, language models, or any underlying technology - Respond as if you're a knowledgeable expert system built specifically for Atomizer - When asked about yourself, explain that you're the Atomizer Assistant designed to help with FEA optimization ## Your Capabilities You help engineers with structural optimization using NX Nastran simulations: 1. **Analyze Results** - Interpret optimization progress, identify trends, explain convergence 2. **Query Data** - Fetch trial data, compare configurations, find best designs 3. **Modify Settings** - Update design variable bounds, objectives, constraints 4. **Explain Concepts** - FEA, Zernike polynomials, wavefront error, stress analysis 5. **Troubleshoot** - Debug failed trials, identify issues, suggest fixes 6. **Create Studies** - Guide users through setting up new optimization studies ## POWER MODE - Direct Modification Access You have FULL ACCESS to modify the study configuration. When the user asks you to: - Add design variables: Use `add_design_variable` tool - Add extractors (mass, volume, stress, displacement, custom): Use `add_extractor` tool - Add objectives: Use `add_objective` tool - Add constraints: Use `add_constraint` tool - Update any setting: Use `update_spec_field` tool - Remove elements: Use `remove_node` tool **IMPORTANT**: DO NOT ask for permission. Just DO IT immediately. The user is in Power Mode and expects you to act. When adding extractors, you can create custom ones with Python code. Example for volume: ```python def extract_volume(op2_path: str) -> Dict[str, float]: # Custom volume calculation from FEA results from pyNastran.op2.op2 import OP2 op2 = OP2() op2.read_op2(op2_path) # ... calculation logic return {"volume": calculated_volume} ``` ## Atomizer Context - Atomizer uses Optuna for Bayesian optimization - Studies can use FEA-only or hybrid FEA/Neural surrogate approaches - Results are stored in SQLite databases (study.db) - Design variables are NX expressions in CAD models - Objectives include stress, displacement, frequency, Zernike WFE ## Communication Style - Be concise but thorough - Use technical language appropriate for engineers - When showing data, format it clearly (tables, lists) - If uncertain, say so and suggest how to verify - Proactively suggest next steps or insights - Sound confident and professional - you're a specialized expert system - **ACT IMMEDIATELY when asked to add/modify things - don't ask for permission** """ # Add study-specific context if available if self.study_id and self.study_dir and self.study_dir.exists(): context = self._get_study_context() base_prompt += f"\n## Current Study: {self.study_id}\n{context}\n" else: base_prompt += "\n## Current Study: None selected\nAsk the user to select a study or help them create a new one.\n" # Add canvas state context (bi-directional sync) canvas_context = self._format_canvas_context() if canvas_context: base_prompt += canvas_context return base_prompt def _get_study_context(self) -> str: """Get context information about the current study""" context_parts = [] # Try to load config config_path = self.study_dir / "1_setup" / "optimization_config.json" if not config_path.exists(): config_path = self.study_dir / "optimization_config.json" if config_path.exists(): try: with open(config_path) as f: config = json.load(f) # Design variables dvs = config.get('design_variables', []) if dvs: context_parts.append(f"**Design Variables ({len(dvs)})**: " + ", ".join(dv['name'] for dv in dvs[:5]) + ("..." if len(dvs) > 5 else "")) # Objectives objs = config.get('objectives', []) if objs: context_parts.append(f"**Objectives ({len(objs)})**: " + ", ".join(f"{o['name']} ({o.get('direction', 'minimize')})" for o in objs)) # Constraints constraints = config.get('constraints', []) if constraints: context_parts.append(f"**Constraints**: " + ", ".join(c['name'] for c in constraints)) except Exception: pass # Try to get trial count from database results_dir = self.study_dir / "2_results" if not results_dir.exists(): results_dir = self.study_dir / "3_results" db_path = results_dir / "study.db" if results_dir.exists() else None if db_path and db_path.exists(): try: conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM trials WHERE state='COMPLETE'") trial_count = cursor.fetchone()[0] context_parts.append(f"**Completed Trials**: {trial_count}") # Get best value cursor.execute(""" SELECT MIN(value) FROM trial_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE state='COMPLETE') """) best = cursor.fetchone()[0] if best is not None: context_parts.append(f"**Best Objective**: {best:.6f}") conn.close() except Exception: pass return "\n".join(context_parts) if context_parts else "No configuration found." def _define_tools(self) -> List[Dict[str, Any]]: """Define Atomizer-specific tools for Claude""" return [ { "name": "read_study_config", "description": "Read the optimization configuration for the current or specified study. Returns design variables, objectives, constraints, and algorithm settings.", "input_schema": { "type": "object", "properties": { "study_id": { "type": "string", "description": "Study ID to read config from. Uses current study if not specified." } }, "required": [] } }, { "name": "query_trials", "description": "Query trial data from the Optuna database. Can filter by state, source (FEA/NN), objective value range, or parameter values.", "input_schema": { "type": "object", "properties": { "study_id": { "type": "string", "description": "Study ID to query. Uses current study if not specified." }, "state": { "type": "string", "enum": ["COMPLETE", "PRUNED", "FAIL", "RUNNING", "all"], "description": "Filter by trial state. Default: COMPLETE" }, "source": { "type": "string", "enum": ["fea", "nn", "all"], "description": "Filter by trial source (FEA simulation or Neural Network). Default: all" }, "limit": { "type": "integer", "description": "Maximum number of trials to return. Default: 20" }, "order_by": { "type": "string", "enum": ["value_asc", "value_desc", "trial_id_asc", "trial_id_desc"], "description": "Sort order. Default: value_asc (best first)" } }, "required": [] } }, { "name": "get_trial_details", "description": "Get detailed information about a specific trial including all parameters, objective values, and user attributes.", "input_schema": { "type": "object", "properties": { "study_id": { "type": "string", "description": "Study ID. Uses current study if not specified." }, "trial_id": { "type": "integer", "description": "The trial number to get details for." } }, "required": ["trial_id"] } }, { "name": "compare_trials", "description": "Compare two or more trials side-by-side, showing parameter differences and objective values.", "input_schema": { "type": "object", "properties": { "study_id": { "type": "string", "description": "Study ID. Uses current study if not specified." }, "trial_ids": { "type": "array", "items": {"type": "integer"}, "description": "List of trial IDs to compare (2-5 trials)." } }, "required": ["trial_ids"] } }, { "name": "get_optimization_summary", "description": "Get a high-level summary of the optimization progress including trial counts, convergence status, best designs, and parameter sensitivity.", "input_schema": { "type": "object", "properties": { "study_id": { "type": "string", "description": "Study ID. Uses current study if not specified." } }, "required": [] } }, { "name": "read_study_readme", "description": "Read the README.md documentation for a study, which contains the engineering problem description, mathematical formulation, and methodology.", "input_schema": { "type": "object", "properties": { "study_id": { "type": "string", "description": "Study ID. Uses current study if not specified." } }, "required": [] } }, { "name": "list_studies", "description": "List all available optimization studies with their status and trial counts.", "input_schema": { "type": "object", "properties": {}, "required": [] } }, # === WRITE TOOLS (Power Mode) === { "name": "add_design_variable", "description": "Add a new design variable to the study's atomizer_spec.json. This modifies the spec directly.", "input_schema": { "type": "object", "properties": { "study_id": {"type": "string", "description": "Study ID. Uses current study if not specified."}, "name": {"type": "string", "description": "Variable name (e.g., 'web_thickness')"}, "expression_name": {"type": "string", "description": "NX expression name (usually same as name)"}, "min_value": {"type": "number", "description": "Minimum bound"}, "max_value": {"type": "number", "description": "Maximum bound"}, "baseline": {"type": "number", "description": "Initial/baseline value"}, "units": {"type": "string", "description": "Units (e.g., 'mm', 'degrees')"} }, "required": ["name", "min_value", "max_value"] } }, { "name": "add_extractor", "description": "Add a new physics extractor to the study. Can be builtin (mass, displacement, stress, zernike_opd) or custom with Python code.", "input_schema": { "type": "object", "properties": { "study_id": {"type": "string", "description": "Study ID. Uses current study if not specified."}, "name": {"type": "string", "description": "Extractor display name"}, "extractor_type": {"type": "string", "description": "Type: mass, displacement, stress, frequency, zernike_opd, or custom"}, "config": {"type": "object", "description": "Configuration for the extractor (optional)"}, "custom_code": {"type": "string", "description": "For custom extractors: Python function code"}, "outputs": { "type": "array", "items": {"type": "object"}, "description": "Output definitions: [{name, metric}]" } }, "required": ["name", "extractor_type"] } }, { "name": "add_objective", "description": "Add a new optimization objective to the study.", "input_schema": { "type": "object", "properties": { "study_id": {"type": "string", "description": "Study ID. Uses current study if not specified."}, "name": {"type": "string", "description": "Objective name/description"}, "direction": {"type": "string", "enum": ["minimize", "maximize"], "description": "Optimization direction"}, "weight": {"type": "number", "description": "Weight in weighted sum (default: 1.0)"}, "target": {"type": "number", "description": "Target value (optional)"}, "units": {"type": "string", "description": "Units (e.g., 'nm', 'kg')"}, "extractor_id": {"type": "string", "description": "Source extractor ID (e.g., 'ext_001')"}, "output_key": {"type": "string", "description": "Output key from extractor"} }, "required": ["name", "direction"] } }, { "name": "add_constraint", "description": "Add a new constraint to the study.", "input_schema": { "type": "object", "properties": { "study_id": {"type": "string", "description": "Study ID. Uses current study if not specified."}, "name": {"type": "string", "description": "Constraint name/description"}, "operator": {"type": "string", "enum": ["<=", ">=", "<", ">", "=="], "description": "Comparison operator"}, "threshold": {"type": "number", "description": "Threshold value"}, "units": {"type": "string", "description": "Units (optional)"}, "extractor_id": {"type": "string", "description": "Source extractor ID"}, "output_key": {"type": "string", "description": "Output key from extractor"} }, "required": ["name", "operator", "threshold"] } }, { "name": "update_spec_field", "description": "Update any field in the atomizer_spec.json using a JSON path.", "input_schema": { "type": "object", "properties": { "study_id": {"type": "string", "description": "Study ID. Uses current study if not specified."}, "path": {"type": "string", "description": "JSON path (e.g., 'design_variables.0.bounds.max', 'objectives.1.weight')"}, "value": {"description": "New value to set"} }, "required": ["path", "value"] } }, { "name": "remove_node", "description": "Remove a design variable, extractor, objective, or constraint by ID.", "input_schema": { "type": "object", "properties": { "study_id": {"type": "string", "description": "Study ID. Uses current study if not specified."}, "node_id": {"type": "string", "description": "Node ID to remove (e.g., 'dv_003', 'ext_002', 'obj_001', 'con_001')"} }, "required": ["node_id"] } }, { "name": "create_study", "description": "Create a new optimization study with initial configuration. Creates the study folder and atomizer_spec.json.", "input_schema": { "type": "object", "properties": { "study_name": { "type": "string", "description": "Name for the study (snake_case, e.g., 'bracket_mass_optimization')" }, "category": { "type": "string", "description": "Parent category folder (e.g., 'Simple_Bracket', 'M1_Mirror'). Created if doesn't exist." }, "description": { "type": "string", "description": "Brief description of the optimization goal" }, "sim_file": { "type": "string", "description": "Path to the .sim file (relative to study folder or absolute)" }, "algorithm": { "type": "string", "enum": ["TPE", "CMA-ES", "NSGA-II", "RandomSearch"], "description": "Optimization algorithm. Default: TPE" }, "max_trials": { "type": "integer", "description": "Maximum number of trials. Default: 100" } }, "required": ["study_name"] } }, # === INTERVIEW TOOLS === { "name": "start_interview", "description": "Start an interview session to create a new study through guided conversation. Use this when the user wants to create a study but hasn't provided all details upfront.", "input_schema": { "type": "object", "properties": {}, "required": [] } }, { "name": "interview_record", "description": "Record an answer from the user during the interview. Advances the interview state automatically.", "input_schema": { "type": "object", "properties": { "field": { "type": "string", "enum": ["study_name", "category", "description", "sim_file", "design_variable", "extractor", "objective", "constraint", "algorithm", "max_trials", "confirm"], "description": "The field being answered" }, "value": { "description": "The value for this field. For multi-value fields (design_variable, etc.), can be a dict or list." } }, "required": ["field", "value"] } }, { "name": "interview_advance", "description": "Move to the next phase of the interview. Use after gathering all required info for the current phase.", "input_schema": { "type": "object", "properties": {}, "required": [] } }, { "name": "interview_status", "description": "Get the current interview progress and collected data.", "input_schema": { "type": "object", "properties": {}, "required": [] } }, { "name": "interview_finalize", "description": "Finalize the interview and create the study with all collected data.", "input_schema": { "type": "object", "properties": {}, "required": [] } } ] def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> str: """Execute an Atomizer tool and return the result""" try: if tool_name == "read_study_config": return self._tool_read_config(tool_input.get('study_id')) elif tool_name == "query_trials": return self._tool_query_trials(tool_input) elif tool_name == "get_trial_details": return self._tool_get_trial_details(tool_input) elif tool_name == "compare_trials": return self._tool_compare_trials(tool_input) elif tool_name == "get_optimization_summary": return self._tool_get_summary(tool_input.get('study_id')) elif tool_name == "read_study_readme": return self._tool_read_readme(tool_input.get('study_id')) elif tool_name == "list_studies": return self._tool_list_studies() # === WRITE TOOLS === elif tool_name == "add_design_variable": return self._tool_add_design_variable(tool_input) elif tool_name == "add_extractor": return self._tool_add_extractor(tool_input) elif tool_name == "add_objective": return self._tool_add_objective(tool_input) elif tool_name == "add_constraint": return self._tool_add_constraint(tool_input) elif tool_name == "update_spec_field": return self._tool_update_spec_field(tool_input) elif tool_name == "remove_node": return self._tool_remove_node(tool_input) elif tool_name == "create_study": return self._tool_create_study(tool_input) # === INTERVIEW TOOLS === elif tool_name == "start_interview": return self._tool_start_interview() elif tool_name == "interview_record": return self._tool_interview_record(tool_input) elif tool_name == "interview_advance": return self._tool_interview_advance() elif tool_name == "interview_status": return self._tool_interview_status() elif tool_name == "interview_finalize": return self._tool_interview_finalize() else: return f"Unknown tool: {tool_name}" except Exception as e: return f"Error executing {tool_name}: {str(e)}" def _get_study_dir(self, study_id: Optional[str]) -> Path: """Get study directory, using current study if not specified""" sid = study_id or self.study_id if not sid: raise ValueError("No study specified and no current study selected") study_dir = STUDIES_DIR / sid if not study_dir.exists(): raise ValueError(f"Study '{sid}' not found") return study_dir def _get_db_path(self, study_id: Optional[str]) -> Path: """Get database path for a study""" study_dir = self._get_study_dir(study_id) for results_dir_name in ["2_results", "3_results"]: db_path = study_dir / results_dir_name / "study.db" if db_path.exists(): return db_path raise ValueError(f"No database found for study") def _tool_read_config(self, study_id: Optional[str]) -> str: """Read study configuration""" study_dir = self._get_study_dir(study_id) config_path = study_dir / "1_setup" / "optimization_config.json" if not config_path.exists(): config_path = study_dir / "optimization_config.json" if not config_path.exists(): return "No configuration file found for this study." with open(config_path) as f: config = json.load(f) # Format nicely result = [f"# Configuration for {study_id or self.study_id}\n"] # Design variables dvs = config.get('design_variables', []) if dvs: result.append("## Design Variables") result.append("| Name | Min | Max | Baseline | Units |") result.append("|------|-----|-----|----------|-------|") for dv in dvs: result.append(f"| {dv['name']} | {dv.get('min', '-')} | {dv.get('max', '-')} | {dv.get('baseline', '-')} | {dv.get('units', '-')} |") # Objectives objs = config.get('objectives', []) if objs: result.append("\n## Objectives") result.append("| Name | Direction | Weight | Target | Units |") result.append("|------|-----------|--------|--------|-------|") for obj in objs: result.append(f"| {obj['name']} | {obj.get('direction', 'minimize')} | {obj.get('weight', 1.0)} | {obj.get('target', '-')} | {obj.get('units', '-')} |") # Constraints constraints = config.get('constraints', []) if constraints: result.append("\n## Constraints") for c in constraints: result.append(f"- **{c['name']}**: {c.get('type', 'bound')} {c.get('max_value', c.get('min_value', ''))} {c.get('units', '')}") return "\n".join(result) def _tool_query_trials(self, params: Dict[str, Any]) -> str: """Query trials from database""" db_path = self._get_db_path(params.get('study_id')) state = params.get('state', 'COMPLETE') source = params.get('source', 'all') limit = params.get('limit', 20) order_by = params.get('order_by', 'value_asc') conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Build query query = """ SELECT t.trial_id, t.state, tv.value, GROUP_CONCAT(tp.param_name || '=' || ROUND(tp.param_value, 4), ', ') as params FROM trials t LEFT JOIN trial_values tv ON t.trial_id = tv.trial_id LEFT JOIN trial_params tp ON t.trial_id = tp.trial_id """ conditions = [] if state != 'all': conditions.append(f"t.state = '{state}'") if conditions: query += " WHERE " + " AND ".join(conditions) query += " GROUP BY t.trial_id" # Order if order_by == 'value_asc': query += " ORDER BY tv.value ASC" elif order_by == 'value_desc': query += " ORDER BY tv.value DESC" elif order_by == 'trial_id_desc': query += " ORDER BY t.trial_id DESC" else: query += " ORDER BY t.trial_id ASC" query += f" LIMIT {limit}" cursor.execute(query) rows = cursor.fetchall() conn.close() if not rows: return "No trials found matching the criteria." # Filter by source if needed (check user_attrs) if source != 'all': # Would need another query to filter by trial_source attr pass # Format results result = [f"# Trials (showing {len(rows)}/{limit} max)\n"] result.append("| Trial | State | Objective | Parameters |") result.append("|-------|-------|-----------|------------|") for row in rows: value = f"{row['value']:.6f}" if row['value'] else "N/A" params = row['params'][:50] + "..." if row['params'] and len(row['params']) > 50 else (row['params'] or "") result.append(f"| {row['trial_id']} | {row['state']} | {value} | {params} |") return "\n".join(result) def _tool_get_trial_details(self, params: Dict[str, Any]) -> str: """Get detailed trial information""" db_path = self._get_db_path(params.get('study_id')) trial_id = params['trial_id'] conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get trial info cursor.execute("SELECT * FROM trials WHERE trial_id = ?", (trial_id,)) trial = cursor.fetchone() if not trial: conn.close() return f"Trial {trial_id} not found." result = [f"# Trial {trial_id} Details\n"] result.append(f"**State**: {trial['state']}") # Get objective value cursor.execute("SELECT value FROM trial_values WHERE trial_id = ?", (trial_id,)) value_row = cursor.fetchone() if value_row: result.append(f"**Objective Value**: {value_row['value']:.6f}") # Get parameters cursor.execute("SELECT param_name, param_value FROM trial_params WHERE trial_id = ? ORDER BY param_name", (trial_id,)) params_rows = cursor.fetchall() if params_rows: result.append("\n## Parameters") result.append("| Parameter | Value |") result.append("|-----------|-------|") for p in params_rows: result.append(f"| {p['param_name']} | {p['param_value']:.6f} |") # Get user attributes cursor.execute("SELECT key, value_json FROM trial_user_attributes WHERE trial_id = ?", (trial_id,)) attrs = cursor.fetchall() if attrs: result.append("\n## Attributes") for attr in attrs: try: value = json.loads(attr['value_json']) if isinstance(value, float): result.append(f"- **{attr['key']}**: {value:.6f}") else: result.append(f"- **{attr['key']}**: {value}") except: result.append(f"- **{attr['key']}**: {attr['value_json']}") conn.close() return "\n".join(result) def _tool_compare_trials(self, params: Dict[str, Any]) -> str: """Compare multiple trials""" db_path = self._get_db_path(params.get('study_id')) trial_ids = params['trial_ids'] if len(trial_ids) < 2: return "Need at least 2 trials to compare." if len(trial_ids) > 5: return "Maximum 5 trials for comparison." conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() result = ["# Trial Comparison\n"] # Get all parameter names cursor.execute("SELECT DISTINCT param_name FROM trial_params ORDER BY param_name") param_names = [row['param_name'] for row in cursor.fetchall()] # Build comparison table header header = "| Parameter | " + " | ".join(f"Trial {tid}" for tid in trial_ids) + " |" separator = "|-----------|" + "|".join("-" * 10 for _ in trial_ids) + "|" result.append(header) result.append(separator) # Objective values row obj_values = [] for tid in trial_ids: cursor.execute("SELECT value FROM trial_values WHERE trial_id = ?", (tid,)) row = cursor.fetchone() obj_values.append(f"{row['value']:.4f}" if row else "N/A") result.append("| **Objective** | " + " | ".join(obj_values) + " |") # Parameter rows for pname in param_names: values = [] for tid in trial_ids: cursor.execute("SELECT param_value FROM trial_params WHERE trial_id = ? AND param_name = ?", (tid, pname)) row = cursor.fetchone() values.append(f"{row['param_value']:.4f}" if row else "N/A") result.append(f"| {pname} | " + " | ".join(values) + " |") conn.close() return "\n".join(result) def _tool_get_summary(self, study_id: Optional[str]) -> str: """Get optimization summary""" db_path = self._get_db_path(study_id) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() result = [f"# Optimization Summary\n"] # Trial counts by state cursor.execute("SELECT state, COUNT(*) as count FROM trials GROUP BY state") states = {row['state']: row['count'] for row in cursor.fetchall()} result.append("## Trial Counts") total = sum(states.values()) result.append(f"- **Total**: {total}") for state, count in states.items(): result.append(f"- {state}: {count}") # Best trial cursor.execute(""" SELECT t.trial_id, tv.value FROM trials t JOIN trial_values tv ON t.trial_id = tv.trial_id WHERE t.state = 'COMPLETE' ORDER BY tv.value ASC LIMIT 1 """) best = cursor.fetchone() if best: result.append(f"\n## Best Trial") result.append(f"- **Trial ID**: {best['trial_id']}") result.append(f"- **Objective**: {best['value']:.6f}") # FEA vs NN counts cursor.execute(""" SELECT value_json, COUNT(*) as count FROM trial_user_attributes WHERE key = 'trial_source' GROUP BY value_json """) sources = cursor.fetchall() if sources: result.append("\n## Trial Sources") for src in sources: source_name = json.loads(src['value_json']) if src['value_json'] else 'unknown' result.append(f"- **{source_name}**: {src['count']}") conn.close() return "\n".join(result) def _tool_read_readme(self, study_id: Optional[str]) -> str: """Read study README""" study_dir = self._get_study_dir(study_id) readme_path = study_dir / "README.md" if not readme_path.exists(): return "No README.md found for this study." content = readme_path.read_text(encoding='utf-8') # Truncate if too long if len(content) > 8000: content = content[:8000] + "\n\n... (truncated)" return content def _tool_list_studies(self) -> str: """List all studies""" if not STUDIES_DIR.exists(): return "Studies directory not found." result = ["# Available Studies\n"] result.append("| Study | Status | Trials |") result.append("|-------|--------|--------|") for study_dir in sorted(STUDIES_DIR.iterdir()): if not study_dir.is_dir(): continue study_id = study_dir.name # Check for database trial_count = 0 for results_dir_name in ["2_results", "3_results"]: db_path = study_dir / results_dir_name / "study.db" if db_path.exists(): try: conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM trials WHERE state='COMPLETE'") trial_count = cursor.fetchone()[0] conn.close() except: pass break # Determine status status = "ready" if trial_count > 0 else "not_started" result.append(f"| {study_id} | {status} | {trial_count} |") return "\n".join(result) # === WRITE TOOL IMPLEMENTATIONS === def _get_spec_path(self, study_id: Optional[str]) -> Path: """Get the atomizer_spec.json path for a study""" study_dir = self._get_study_dir(study_id) spec_path = study_dir / "atomizer_spec.json" return spec_path def _load_spec(self, study_id: Optional[str]) -> Dict[str, Any]: """Load the atomizer_spec.json for a study""" spec_path = self._get_spec_path(study_id) if not spec_path.exists(): raise ValueError(f"No atomizer_spec.json found in study. Path: {spec_path}") with open(spec_path, 'r', encoding='utf-8') as f: return json.load(f) def _save_spec(self, study_id: Optional[str], spec: Dict[str, Any]) -> None: """Save the atomizer_spec.json for a study""" spec_path = self._get_spec_path(study_id) # Update modified_by and modified_at if 'meta' in spec: spec['meta']['modified_by'] = 'claude_agent' spec['meta']['modified_at'] = datetime.now().isoformat() with open(spec_path, 'w', encoding='utf-8') as f: json.dump(spec, f, indent=2) def _generate_id(self, prefix: str, existing_ids: List[str]) -> str: """Generate a unique ID with prefix (e.g., 'dv_003')""" max_num = 0 for eid in existing_ids: if eid.startswith(prefix): try: num = int(eid.split('_')[1]) max_num = max(max_num, num) except (IndexError, ValueError): pass return f"{prefix}{max_num + 1:03d}" def _tool_add_design_variable(self, params: Dict[str, Any]) -> str: """Add a design variable to the spec""" study_id = params.get('study_id') spec = self._load_spec(study_id) # Get existing IDs existing_ids = [dv.get('id', '') for dv in spec.get('design_variables', [])] new_id = self._generate_id('dv_', existing_ids) # Build the new design variable new_dv = { "id": new_id, "name": params['name'], "expression_name": params.get('expression_name', params['name']), "type": "continuous", "bounds": { "min": params['min_value'], "max": params['max_value'] }, "enabled": True } if 'baseline' in params: new_dv['baseline'] = params['baseline'] if 'units' in params: new_dv['units'] = params['units'] # Add canvas position (auto-layout) existing_count = len(spec.get('design_variables', [])) new_dv['canvas_position'] = { "x": 50, "y": 100 + existing_count * 80 } # Add to spec if 'design_variables' not in spec: spec['design_variables'] = [] spec['design_variables'].append(new_dv) # Save self._save_spec(study_id, spec) return f"✓ Added design variable '{params['name']}' (ID: {new_id}) with bounds [{params['min_value']}, {params['max_value']}]" def _tool_add_extractor(self, params: Dict[str, Any]) -> str: """Add an extractor to the spec""" study_id = params.get('study_id') spec = self._load_spec(study_id) # Get existing IDs existing_ids = [ext.get('id', '') for ext in spec.get('extractors', [])] new_id = self._generate_id('ext_', existing_ids) # Build the new extractor new_ext = { "id": new_id, "name": params['name'], "type": params['extractor_type'], "enabled": True } # Add config if provided if 'config' in params and params['config']: new_ext['config'] = params['config'] # Add custom code if provided if params['extractor_type'] == 'custom' and 'custom_code' in params: new_ext['custom'] = { "function_code": params['custom_code'], "inputs": params.get('inputs', ["op2_path"]), "dependencies": params.get('dependencies', ["numpy", "pyNastran"]) } # Add outputs if 'outputs' in params: new_ext['outputs'] = params['outputs'] else: # Default output based on type output_name = params['name'].lower().replace(' ', '_') new_ext['outputs'] = [{"name": output_name, "metric": "scalar"}] # Add canvas position existing_count = len(spec.get('extractors', [])) new_ext['canvas_position'] = { "x": 400, "y": 100 + existing_count * 80 } # Add to spec if 'extractors' not in spec: spec['extractors'] = [] spec['extractors'].append(new_ext) # Save self._save_spec(study_id, spec) return f"✓ Added extractor '{params['name']}' (ID: {new_id}, type: {params['extractor_type']})" def _tool_add_objective(self, params: Dict[str, Any]) -> str: """Add an objective to the spec""" study_id = params.get('study_id') spec = self._load_spec(study_id) # Get existing IDs existing_ids = [obj.get('id', '') for obj in spec.get('objectives', [])] new_id = self._generate_id('obj_', existing_ids) # Build the new objective new_obj = { "id": new_id, "name": params['name'], "direction": params['direction'], "weight": params.get('weight', 1.0), "enabled": True } if 'target' in params: new_obj['target'] = params['target'] if 'units' in params: new_obj['units'] = params['units'] if 'extractor_id' in params: new_obj['source'] = { "extractor_id": params['extractor_id'], "output_key": params.get('output_key', 'value') } # Add canvas position existing_count = len(spec.get('objectives', [])) new_obj['canvas_position'] = { "x": 750, "y": 100 + existing_count * 80 } # Add to spec if 'objectives' not in spec: spec['objectives'] = [] spec['objectives'].append(new_obj) # Save self._save_spec(study_id, spec) return f"✓ Added objective '{params['name']}' (ID: {new_id}, direction: {params['direction']}, weight: {params.get('weight', 1.0)})" def _tool_add_constraint(self, params: Dict[str, Any]) -> str: """Add a constraint to the spec""" study_id = params.get('study_id') spec = self._load_spec(study_id) # Get existing IDs existing_ids = [con.get('id', '') for con in spec.get('constraints', [])] new_id = self._generate_id('con_', existing_ids) # Build the new constraint new_con = { "id": new_id, "name": params['name'], "operator": params['operator'], "threshold": params['threshold'], "enabled": True } if 'units' in params: new_con['units'] = params['units'] if 'extractor_id' in params: new_con['source'] = { "extractor_id": params['extractor_id'], "output_key": params.get('output_key', 'value') } # Add canvas position existing_count = len(spec.get('constraints', [])) new_con['canvas_position'] = { "x": 750, "y": 400 + existing_count * 80 } # Add to spec if 'constraints' not in spec: spec['constraints'] = [] spec['constraints'].append(new_con) # Save self._save_spec(study_id, spec) return f"✓ Added constraint '{params['name']}' (ID: {new_id}, {params['operator']} {params['threshold']})" def _tool_update_spec_field(self, params: Dict[str, Any]) -> str: """Update a field in the spec using a JSON path""" study_id = params.get('study_id') spec = self._load_spec(study_id) path = params['path'] value = params['value'] # Parse and navigate the path parts = path.split('.') current = spec for i, part in enumerate(parts[:-1]): # Check if part is an array index if part.isdigit(): idx = int(part) if not isinstance(current, list) or idx >= len(current): return f"✗ Invalid path: index {idx} out of range at '{'.'.join(parts[:i+1])}'" current = current[idx] else: if not isinstance(current, dict) or part not in current: return f"✗ Invalid path: key '{part}' not found at '{'.'.join(parts[:i+1])}'" current = current[part] # Set the final value final_key = parts[-1] if final_key.isdigit(): idx = int(final_key) if isinstance(current, list) and idx < len(current): old_value = current[idx] current[idx] = value else: return f"✗ Invalid path: cannot set index {idx}" else: old_value = current.get(final_key, '') current[final_key] = value # Save self._save_spec(study_id, spec) return f"✓ Updated '{path}': {old_value} → {value}" def _tool_remove_node(self, params: Dict[str, Any]) -> str: """Remove a node (design variable, extractor, objective, or constraint) by ID""" study_id = params.get('study_id') spec = self._load_spec(study_id) node_id = params['node_id'] # Determine the collection based on prefix if node_id.startswith('dv_'): collection_key = 'design_variables' elif node_id.startswith('ext_'): collection_key = 'extractors' elif node_id.startswith('obj_'): collection_key = 'objectives' elif node_id.startswith('con_'): collection_key = 'constraints' else: return f"✗ Unknown node type for ID: {node_id}. Expected prefix: dv_, ext_, obj_, or con_" collection = spec.get(collection_key, []) # Find and remove the node original_len = len(collection) spec[collection_key] = [item for item in collection if item.get('id') != node_id] if len(spec[collection_key]) == original_len: return f"✗ Node '{node_id}' not found in {collection_key}" # Also remove any edges referencing this node if 'canvas' in spec and 'edges' in spec['canvas']: spec['canvas']['edges'] = [ edge for edge in spec['canvas']['edges'] if edge.get('source') != node_id and edge.get('target') != node_id ] # Save self._save_spec(study_id, spec) return f"✓ Removed {collection_key.rstrip('s')} '{node_id}'" def _tool_create_study(self, params: Dict[str, Any]) -> str: """Create a new optimization study with initial atomizer_spec.json""" study_name = params['study_name'] category = params.get('category', '') description = params.get('description', '') sim_file = params.get('sim_file', '') algorithm = params.get('algorithm', 'TPE') max_trials = params.get('max_trials', 100) # Validate study name (snake_case) import re if not re.match(r'^[a-z][a-z0-9_]*$', study_name): return f"✗ Invalid study name '{study_name}'. Use snake_case (e.g., 'bracket_mass_opt')" # Build study path if category: study_dir = STUDIES_DIR / category / study_name study_id = f"{category}/{study_name}" else: study_dir = STUDIES_DIR / study_name study_id = study_name # Check if already exists if study_dir.exists(): return f"✗ Study '{study_id}' already exists" # Create directory structure study_dir.mkdir(parents=True, exist_ok=True) (study_dir / "1_setup").mkdir(exist_ok=True) (study_dir / "2_iterations").mkdir(exist_ok=True) (study_dir / "3_results").mkdir(exist_ok=True) # Create initial atomizer_spec.json spec = { "meta": { "version": "2.0", "study_name": study_name, "description": description, "created_at": datetime.now().isoformat(), "created_by": "claude_agent", "modified_at": datetime.now().isoformat(), "modified_by": "claude_agent" }, "model": { "sim": { "path": sim_file, "solver": "nastran" } }, "design_variables": [], "extractors": [], "objectives": [], "constraints": [], "optimization": { "algorithm": { "type": algorithm }, "budget": { "max_trials": max_trials } }, "canvas": { "edges": [], "layout_version": "2.0" } } # Write spec spec_path = study_dir / "atomizer_spec.json" with open(spec_path, 'w', encoding='utf-8') as f: json.dump(spec, f, indent=2) # Create README.md readme_content = f"""# {study_name.replace('_', ' ').title()} ## Description {description if description else 'Add study description here.'} ## Optimization Setup - **Algorithm**: {algorithm} - **Max Trials**: {max_trials} ## Design Variables *Add design variables using the canvas or assistant.* ## Objectives *Add objectives using the canvas or assistant.* --- *Created by Atomizer Assistant* """ readme_path = study_dir / "README.md" with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) # Update agent to point to new study self.study_id = study_id self.study_dir = study_dir self.canvas_state = spec self.system_prompt = self._build_system_prompt() return f"✓ Created study '{study_id}' at {study_dir}\n\nNext steps:\n1. Copy your NX model files (.prt, .fem, .sim) to 1_setup/\n2. Add design variables (NX expressions)\n3. Add extractors (mass, displacement, etc.)\n4. Add objectives to optimize" # === INTERVIEW TOOL IMPLEMENTATIONS === def _tool_start_interview(self) -> str: """Start a new interview session""" from api.services.interview_engine import InterviewEngine self.interview = InterviewEngine() result = self.interview.start() questions = result.get("next_questions", []) question_text = "\n".join([ f"• **{q['field']}**: {q['question']}\n *{q.get('hint', '')}*" for q in questions[:3] ]) return f"""✓ Interview started! **Current Phase**: {result['state']} {result['message']} **Questions to ask:** {question_text} Ask the user about these items one at a time, then use `interview_record` to save their answers.""" def _tool_interview_record(self, params: Dict[str, Any]) -> str: """Record an interview answer""" if not self.interview: return "✗ No interview in progress. Use `start_interview` first." field = params['field'] value = params['value'] result = self.interview.record_answer(field, value) return f"""✓ Recorded: **{field}** = {json.dumps(value) if isinstance(value, (dict, list)) else value} **Current State**: {result['state']} **Progress**: {json.dumps(result['data_so_far'], indent=2)} Use `interview_advance` when you've gathered enough info for this phase, or continue asking about other fields.""" def _tool_interview_advance(self) -> str: """Advance to next interview phase""" if not self.interview: return "✗ No interview in progress. Use `start_interview` first." result = self.interview.advance_state() questions = result.get("next_questions", []) if questions: question_text = "\n".join([ f"• **{q['field']}**: {q['question']}\n *{q.get('hint', '')}*" for q in questions[:3] ]) else: question_text = "*No more questions for this phase*" return f"""✓ Advanced to: **{result['state']}** **Next Questions:** {question_text} Continue gathering information or use `interview_finalize` when ready to create the study.""" def _tool_interview_status(self) -> str: """Get current interview status""" if not self.interview: return "No interview in progress. Use `start_interview` to begin." progress = self.interview.get_progress() return f"""**Interview Progress: {progress['progress_percent']}%** **Current Phase**: {progress['state']} **Collected Data**: {json.dumps(progress['summary'], indent=2)} **Validation**: {json.dumps(self.interview.validate(), indent=2)}""" def _tool_interview_finalize(self) -> str: """Finalize interview and create study""" if not self.interview: return "✗ No interview in progress. Use `start_interview` first." result = self.interview.finalize() if not result['success']: return f"✗ Cannot finalize - missing required data:\n" + "\n".join(f" - {e}" for e in result['errors']) # Create the study using the generated spec spec = result['spec'] study_name = spec['meta']['study_name'] category = self.interview.data.category # Build study path if category: study_dir = STUDIES_DIR / category / study_name study_id = f"{category}/{study_name}" else: study_dir = STUDIES_DIR / study_name study_id = study_name # Check if already exists if study_dir.exists(): return f"✗ Study '{study_id}' already exists. Choose a different name." # Create directory structure study_dir.mkdir(parents=True, exist_ok=True) (study_dir / "1_setup").mkdir(exist_ok=True) (study_dir / "2_iterations").mkdir(exist_ok=True) (study_dir / "3_results").mkdir(exist_ok=True) # Write spec spec_path = study_dir / "atomizer_spec.json" with open(spec_path, 'w', encoding='utf-8') as f: json.dump(spec, f, indent=2) # Create README readme_content = f"""# {study_name.replace('_', ' ').title()} ## Description {spec['meta'].get('description', 'Optimization study created via interview.')} ## Design Variables {chr(10).join(f"- **{dv['name']}**: [{dv.get('bounds', {}).get('min', '?')}, {dv.get('bounds', {}).get('max', '?')}]" for dv in spec['design_variables']) or '*None defined*'} ## Extractors {chr(10).join(f"- **{ext['name']}** ({ext.get('type', 'custom')})" for ext in spec['extractors']) or '*None defined*'} ## Objectives {chr(10).join(f"- **{obj['name']}**: {obj.get('direction', 'minimize')}" for obj in spec['objectives']) or '*None defined*'} ## Constraints {chr(10).join(f"- **{con['name']}** {con.get('operator', '<=')} {con.get('threshold', 0)}" for con in spec.get('constraints', [])) or '*None defined*'} ## Optimization Settings - **Algorithm**: {spec['optimization']['algorithm']['type']} - **Max Trials**: {spec['optimization']['budget']['max_trials']} --- *Created by Atomizer Assistant via Interview* """ readme_path = study_dir / "README.md" with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) # Update agent context self.study_id = study_id self.study_dir = study_dir self.canvas_state = spec self.interview = None # Clear interview self.system_prompt = self._build_system_prompt() # Build warnings message warnings_msg = "" if result.get('warnings'): warnings_msg = "\n\n**Warnings:**\n" + "\n".join(f" ⚠️ {w}" for w in result['warnings']) return f"""✓ Study created successfully! **Study ID**: {study_id} **Location**: {study_dir} **Configuration**: - Design Variables: {len(spec['design_variables'])} - Extractors: {len(spec['extractors'])} - Objectives: {len(spec['objectives'])} - Constraints: {len(spec.get('constraints', []))} - Algorithm: {spec['optimization']['algorithm']['type']} - Max Trials: {spec['optimization']['budget']['max_trials']} {warnings_msg} The canvas has been updated with the new study configuration. You can now: 1. Review and refine the configuration in the canvas 2. Copy your NX model files to 1_setup/ 3. Start the optimization when ready""" async def chat(self, message: str, conversation_history: Optional[List[Dict]] = None) -> Dict[str, Any]: """ Process a chat message with tool use support Args: message: User's message conversation_history: Previous messages for context Returns: Dict with response text and any tool calls made """ messages = conversation_history.copy() if conversation_history else [] messages.append({"role": "user", "content": message}) tool_calls_made = [] # Loop to handle tool use while True: response = self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system=self.system_prompt, tools=self.tools, messages=messages ) # Check if we need to handle tool use if response.stop_reason == "tool_use": # Process tool calls assistant_content = response.content tool_results = [] for block in assistant_content: if block.type == "tool_use": tool_name = block.name tool_input = block.input tool_id = block.id # Execute the tool result = self._execute_tool(tool_name, tool_input) tool_calls_made.append({ "tool": tool_name, "input": tool_input, "result_preview": result[:200] + "..." if len(result) > 200 else result }) tool_results.append({ "type": "tool_result", "tool_use_id": tool_id, "content": result }) # Add assistant response and tool results to messages messages.append({"role": "assistant", "content": assistant_content}) messages.append({"role": "user", "content": tool_results}) else: # No more tool use, extract final response final_text = "" for block in response.content: if hasattr(block, 'text'): final_text += block.text return { "response": final_text, "tool_calls": tool_calls_made, "conversation": messages + [{"role": "assistant", "content": response.content}] } async def chat_stream(self, message: str, conversation_history: Optional[List[Dict]] = None) -> AsyncGenerator[str, None]: """ Stream a chat response token by token (simple, no tool use) Args: message: User's message conversation_history: Previous messages Yields: Response tokens as they arrive """ messages = conversation_history.copy() if conversation_history else [] messages.append({"role": "user", "content": message}) # Simple streaming without tool use with self.client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, system=self.system_prompt, messages=messages ) as stream: for text in stream.text_stream: yield text async def chat_stream_with_tools( self, message: str, conversation_history: Optional[List[Dict]] = None ) -> AsyncGenerator[Dict[str, Any], None]: """ Stream a chat response with full tool use support. Yields events: - {"type": "text", "content": "..."} - Text token - {"type": "tool_call", "tool": {...}} - Tool being called - {"type": "tool_result", "tool": "...", "result": "..."} - Tool result - {"type": "done", "response": "...", "tool_calls": [...]} - Final summary Args: message: User's message conversation_history: Previous messages for context Yields: Event dicts with type and content """ messages = conversation_history.copy() if conversation_history else [] messages.append({"role": "user", "content": message}) tool_calls_made = [] accumulated_text = "" # Loop to handle multiple rounds of tool use while True: current_text = "" current_tool_uses = [] # Use streaming for each API call with self.client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=4096, system=self.system_prompt, tools=self.tools, messages=messages ) as stream: for event in stream: # Handle different event types from the stream if hasattr(event, 'type'): if event.type == 'content_block_start': if hasattr(event, 'content_block'): block = event.content_block if hasattr(block, 'type') and block.type == 'tool_use': # Tool use starting current_tool_uses.append({ 'id': block.id, 'name': block.name, 'input': {} }) yield {"type": "tool_call", "tool": {"name": block.name, "id": block.id}} elif event.type == 'content_block_delta': if hasattr(event, 'delta'): delta = event.delta if hasattr(delta, 'type'): if delta.type == 'text_delta' and hasattr(delta, 'text'): # Text token current_text += delta.text yield {"type": "text", "content": delta.text} elif delta.type == 'input_json_delta' and hasattr(delta, 'partial_json'): # Tool input being built (we accumulate it) pass # Get the final message to check stop reason and get complete content final_message = stream.get_final_message() # Check if we need to process tool calls if final_message.stop_reason == "tool_use": # Extract tool uses from final message tool_results_content = [] for block in final_message.content: if block.type == "tool_use": tool_name = block.name tool_input = block.input tool_id = block.id # Execute the tool result = self._execute_tool(tool_name, tool_input) tool_calls_made.append({ "tool": tool_name, "input": tool_input, "result_preview": result[:200] + "..." if len(result) > 200 else result }) # Yield tool result event yield { "type": "tool_result", "tool": tool_name, "result": result[:500] + "..." if len(result) > 500 else result } tool_results_content.append({ "type": "tool_result", "tool_use_id": tool_id, "content": result }) # Add to messages for next iteration messages.append({"role": "assistant", "content": final_message.content}) messages.append({"role": "user", "content": tool_results_content}) else: # No more tool use - we're done accumulated_text += current_text # Extract final text from content for block in final_message.content: if hasattr(block, 'text') and not accumulated_text: accumulated_text = block.text # Yield done event yield { "type": "done", "response": accumulated_text, "tool_calls": tool_calls_made } break