diff --git a/atomizer-dashboard/frontend/src/components/canvas/ConnectionStatusIndicator.tsx b/atomizer-dashboard/frontend/src/components/canvas/ConnectionStatusIndicator.tsx
new file mode 100644
index 00000000..0d64b376
--- /dev/null
+++ b/atomizer-dashboard/frontend/src/components/canvas/ConnectionStatusIndicator.tsx
@@ -0,0 +1,49 @@
+/**
+ * ConnectionStatusIndicator - Visual indicator for WebSocket connection status.
+ */
+
+import { ConnectionStatus } from '../../hooks/useSpecWebSocket';
+
+interface ConnectionStatusIndicatorProps {
+ status: ConnectionStatus;
+ className?: string;
+}
+
+/**
+ * Visual indicator for WebSocket connection status.
+ * Can be used in the canvas UI to show sync state.
+ */
+export function ConnectionStatusIndicator({
+ status,
+ className = '',
+}: ConnectionStatusIndicatorProps) {
+ const statusConfig = {
+ disconnected: {
+ color: 'bg-gray-500',
+ label: 'Disconnected',
+ },
+ connecting: {
+ color: 'bg-yellow-500 animate-pulse',
+ label: 'Connecting...',
+ },
+ connected: {
+ color: 'bg-green-500',
+ label: 'Connected',
+ },
+ reconnecting: {
+ color: 'bg-yellow-500 animate-pulse',
+ label: 'Reconnecting...',
+ },
+ };
+
+ const config = statusConfig[status];
+
+ return (
+
+ );
+}
+
+export default ConnectionStatusIndicator;
diff --git a/atomizer-dashboard/frontend/src/components/canvas/nodes/CustomExtractorNode.tsx b/atomizer-dashboard/frontend/src/components/canvas/nodes/CustomExtractorNode.tsx
new file mode 100644
index 00000000..f21e794c
--- /dev/null
+++ b/atomizer-dashboard/frontend/src/components/canvas/nodes/CustomExtractorNode.tsx
@@ -0,0 +1,58 @@
+/**
+ * CustomExtractorNode - Canvas node for custom Python extractors
+ *
+ * Displays custom extractors defined with inline Python code.
+ * Visually distinct from builtin extractors with a code icon.
+ *
+ * P3.11: Custom extractor UI component
+ */
+
+import { memo } from 'react';
+import { NodeProps } from 'reactflow';
+import { Code2 } from 'lucide-react';
+import { BaseNode } from './BaseNode';
+
+export interface CustomExtractorNodeData {
+ type: 'customExtractor';
+ label: string;
+ configured: boolean;
+ extractorId?: string;
+ extractorName?: string;
+ functionName?: string;
+ functionSource?: string;
+ outputs?: Array<{ name: string; units?: string }>;
+ dependencies?: string[];
+}
+
+function CustomExtractorNodeComponent(props: NodeProps) {
+ const { data } = props;
+
+ // Show validation status
+ const hasCode = !!data.functionSource?.trim();
+ const hasOutputs = (data.outputs?.length ?? 0) > 0;
+ const isConfigured = hasCode && hasOutputs;
+
+ return (
+ }
+ iconColor={isConfigured ? 'text-violet-400' : 'text-dark-500'}
+ >
+
+
+ {data.extractorName || data.functionName || 'Custom Extractor'}
+
+ {!isConfigured && (
+ Needs configuration
+ )}
+ {isConfigured && data.outputs && (
+
+ {data.outputs.length} output{data.outputs.length !== 1 ? 's' : ''}
+
+ )}
+
+
+ );
+}
+
+export const CustomExtractorNode = memo(CustomExtractorNodeComponent);
diff --git a/atomizer-dashboard/frontend/src/components/canvas/panels/CustomExtractorPanel.tsx b/atomizer-dashboard/frontend/src/components/canvas/panels/CustomExtractorPanel.tsx
new file mode 100644
index 00000000..fe8318a3
--- /dev/null
+++ b/atomizer-dashboard/frontend/src/components/canvas/panels/CustomExtractorPanel.tsx
@@ -0,0 +1,360 @@
+/**
+ * CustomExtractorPanel - Panel for editing custom Python extractors
+ *
+ * Provides a code editor for writing custom extraction functions,
+ * output definitions, and validation.
+ *
+ * P3.12: Custom extractor UI component
+ */
+
+import { useState, useCallback } from 'react';
+import { X, Play, AlertCircle, CheckCircle, Plus, Trash2, HelpCircle } from 'lucide-react';
+
+interface CustomExtractorOutput {
+ name: string;
+ units?: string;
+ description?: string;
+}
+
+interface CustomExtractorPanelProps {
+ isOpen: boolean;
+ onClose: () => void;
+ initialName?: string;
+ initialFunctionName?: string;
+ initialSource?: string;
+ initialOutputs?: CustomExtractorOutput[];
+ initialDependencies?: string[];
+ onSave: (data: {
+ name: string;
+ functionName: string;
+ source: string;
+ outputs: CustomExtractorOutput[];
+ dependencies: string[];
+ }) => void;
+}
+
+// Common styling classes
+const inputClass =
+ 'w-full px-3 py-2 bg-dark-800 border border-dark-600 text-white placeholder-dark-400 rounded-lg focus:border-primary-500 focus:outline-none transition-colors';
+const labelClass = 'block text-sm font-medium text-dark-300 mb-1';
+
+// Default extractor template
+const DEFAULT_SOURCE = `def extract(op2_path, bdf_path=None, params=None, working_dir=None):
+ """
+ Custom extractor function.
+
+ Args:
+ op2_path: Path to the OP2 results file
+ bdf_path: Optional path to the BDF model file
+ params: Dictionary of current design parameters
+ working_dir: Path to the current trial directory
+
+ Returns:
+ Dictionary of output_name -> value
+ OR a single float value
+ OR a list/tuple of values (mapped to outputs in order)
+ """
+ import numpy as np
+ from pyNastran.op2.op2 import OP2
+
+ # Load OP2 results
+ op2 = OP2(op2_path, debug=False)
+
+ # Example: compute custom metric
+ # ... your extraction logic here ...
+
+ result = 0.0
+
+ return {"custom_output": result}
+`;
+
+export function CustomExtractorPanel({
+ isOpen,
+ onClose,
+ initialName = '',
+ initialFunctionName = 'extract',
+ initialSource = DEFAULT_SOURCE,
+ initialOutputs = [{ name: 'custom_output', units: '' }],
+ initialDependencies = [],
+ onSave,
+}: CustomExtractorPanelProps) {
+ const [name, setName] = useState(initialName);
+ const [functionName, setFunctionName] = useState(initialFunctionName);
+ const [source, setSource] = useState(initialSource);
+ const [outputs, setOutputs] = useState(initialOutputs);
+ const [dependencies] = useState(initialDependencies);
+ const [validation, setValidation] = useState<{
+ valid: boolean;
+ errors: string[];
+ } | null>(null);
+ const [isValidating, setIsValidating] = useState(false);
+ const [showHelp, setShowHelp] = useState(false);
+
+ // Add a new output
+ const addOutput = useCallback(() => {
+ setOutputs((prev) => [...prev, { name: '', units: '' }]);
+ }, []);
+
+ // Remove an output
+ const removeOutput = useCallback((index: number) => {
+ setOutputs((prev) => prev.filter((_, i) => i !== index));
+ }, []);
+
+ // Update an output
+ const updateOutput = useCallback(
+ (index: number, field: keyof CustomExtractorOutput, value: string) => {
+ setOutputs((prev) =>
+ prev.map((out, i) => (i === index ? { ...out, [field]: value } : out))
+ );
+ },
+ []
+ );
+
+ // Validate the code
+ const validateCode = useCallback(async () => {
+ setIsValidating(true);
+ setValidation(null);
+
+ try {
+ const response = await fetch('/api/spec/validate-extractor', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({
+ function_name: functionName,
+ source: source,
+ }),
+ });
+
+ const result = await response.json();
+ setValidation({
+ valid: result.valid,
+ errors: result.errors || [],
+ });
+ } catch (error) {
+ setValidation({
+ valid: false,
+ errors: ['Failed to validate: ' + (error instanceof Error ? error.message : 'Unknown error')],
+ });
+ } finally {
+ setIsValidating(false);
+ }
+ }, [functionName, source]);
+
+ // Handle save
+ const handleSave = useCallback(() => {
+ // Filter out empty outputs
+ const validOutputs = outputs.filter((o) => o.name.trim());
+
+ if (!name.trim()) {
+ setValidation({ valid: false, errors: ['Name is required'] });
+ return;
+ }
+
+ if (validOutputs.length === 0) {
+ setValidation({ valid: false, errors: ['At least one output is required'] });
+ return;
+ }
+
+ onSave({
+ name: name.trim(),
+ functionName: functionName.trim() || 'extract',
+ source,
+ outputs: validOutputs,
+ dependencies: dependencies.filter((d) => d.trim()),
+ });
+ onClose();
+ }, [name, functionName, source, outputs, dependencies, onSave, onClose]);
+
+ if (!isOpen) return null;
+
+ return (
+
+
+ {/* Header */}
+
+
Custom Extractor
+
+
+
+
+
+
+ {/* Content */}
+
+ {/* Help Section */}
+ {showHelp && (
+
+
How Custom Extractors Work
+
+ - • Your function receives the path to OP2 results and optional BDF/params
+ - • Use pyNastran, numpy, scipy for data extraction and analysis
+ - • Return a dictionary mapping output names to numeric values
+ - • Outputs can be used as objectives or constraints in optimization
+ - • Code runs in a sandboxed environment (no file I/O beyond OP2/BDF)
+
+
+ )}
+
+
+ {/* Left Column - Basic Info & Outputs */}
+
+ {/* Name */}
+
+
+ setName(e.target.value)}
+ placeholder="My Custom Extractor"
+ className={inputClass}
+ />
+
+
+ {/* Function Name */}
+
+
+
setFunctionName(e.target.value)}
+ placeholder="extract"
+ className={`${inputClass} font-mono`}
+ />
+
+ Name of the Python function in your code
+
+
+
+ {/* Outputs */}
+
+
+ {/* Validation Status */}
+ {validation && (
+
+
+ {validation.valid ? (
+
+ ) : (
+
+ )}
+
+ {validation.valid ? 'Code is valid' : 'Validation failed'}
+
+
+ {validation.errors.length > 0 && (
+
+ {validation.errors.map((err, i) => (
+ - • {err}
+ ))}
+
+ )}
+
+ )}
+
+
+ {/* Right Column - Code Editor */}
+
+
+
+
+
+
+
+
+
+ {/* Footer */}
+
+
+
+
+
+
+ );
+}
diff --git a/atomizer-dashboard/frontend/src/types/atomizer-spec.ts b/atomizer-dashboard/frontend/src/types/atomizer-spec.ts
new file mode 100644
index 00000000..868e1096
--- /dev/null
+++ b/atomizer-dashboard/frontend/src/types/atomizer-spec.ts
@@ -0,0 +1,572 @@
+/**
+ * AtomizerSpec v2.0 TypeScript Types
+ *
+ * These types match the JSON Schema at optimization_engine/schemas/atomizer_spec_v2.json
+ * This is the single source of truth for optimization configuration.
+ */
+
+// ============================================================================
+// Position Types
+// ============================================================================
+
+export interface CanvasPosition {
+ x: number;
+ y: number;
+}
+
+// ============================================================================
+// Meta Types
+// ============================================================================
+
+export type SpecCreatedBy = 'canvas' | 'claude' | 'api' | 'migration' | 'manual';
+
+export interface SpecMeta {
+ /** Schema version (e.g., "2.0") */
+ version: string;
+ /** When the spec was created (ISO 8601) */
+ created?: string;
+ /** When the spec was last modified (ISO 8601) */
+ modified?: string;
+ /** Who/what created the spec */
+ created_by?: SpecCreatedBy;
+ /** Who/what last modified the spec */
+ modified_by?: string;
+ /** Unique study identifier (snake_case) */
+ study_name: string;
+ /** Human-readable description */
+ description?: string;
+ /** Tags for categorization */
+ tags?: string[];
+ /** Real-world engineering context */
+ engineering_context?: string;
+}
+
+// ============================================================================
+// Model Types
+// ============================================================================
+
+export interface NxPartConfig {
+ /** Path to .prt file */
+ path?: string;
+ /** File hash for change detection */
+ hash?: string;
+ /** Idealized part filename (_i.prt) */
+ idealized_part?: string;
+}
+
+export interface FemConfig {
+ /** Path to .fem file */
+ path?: string;
+ /** Number of elements */
+ element_count?: number;
+ /** Number of nodes */
+ node_count?: number;
+}
+
+export type SolverType = 'nastran' | 'NX_Nastran' | 'abaqus';
+export type SubcaseType = 'static' | 'modal' | 'thermal' | 'buckling';
+
+export interface Subcase {
+ id: number;
+ name?: string;
+ type?: SubcaseType;
+}
+
+export interface SimConfig {
+ /** Path to .sim file */
+ path: string;
+ /** Solver type */
+ solver: SolverType;
+ /** Solution type (e.g., SOL101) */
+ solution_type?: string;
+ /** Defined subcases */
+ subcases?: Subcase[];
+}
+
+export interface NxSettings {
+ nx_install_path?: string;
+ simulation_timeout_s?: number;
+ auto_start_nx?: boolean;
+}
+
+export interface ModelConfig {
+ nx_part?: NxPartConfig;
+ fem?: FemConfig;
+ sim: SimConfig;
+ nx_settings?: NxSettings;
+}
+
+// ============================================================================
+// Design Variable Types
+// ============================================================================
+
+export type DesignVariableType = 'continuous' | 'integer' | 'categorical';
+
+export interface DesignVariableBounds {
+ min: number;
+ max: number;
+}
+
+export interface DesignVariable {
+ /** Unique identifier (pattern: dv_XXX) */
+ id: string;
+ /** Human-readable name */
+ name: string;
+ /** NX expression name (must match model) */
+ expression_name: string;
+ /** Variable type */
+ type: DesignVariableType;
+ /** Value bounds */
+ bounds: DesignVariableBounds;
+ /** Current/initial value */
+ baseline?: number;
+ /** Physical units (mm, deg, etc.) */
+ units?: string;
+ /** Step size for integer/discrete */
+ step?: number;
+ /** Whether to include in optimization */
+ enabled?: boolean;
+ /** Description */
+ description?: string;
+ /** Canvas position */
+ canvas_position?: CanvasPosition;
+}
+
+// ============================================================================
+// Extractor Types
+// ============================================================================
+
+export type ExtractorType =
+ | 'displacement'
+ | 'frequency'
+ | 'stress'
+ | 'mass'
+ | 'mass_expression'
+ | 'zernike_opd'
+ | 'zernike_csv'
+ | 'temperature'
+ | 'custom_function';
+
+export interface ExtractorConfig {
+ /** Inner radius for Zernike (mm) */
+ inner_radius_mm?: number;
+ /** Outer radius for Zernike (mm) */
+ outer_radius_mm?: number;
+ /** Number of Zernike modes */
+ n_modes?: number;
+ /** Low-order modes to filter */
+ filter_low_orders?: number;
+ /** Displacement unit */
+ displacement_unit?: string;
+ /** Reference subcase ID */
+ reference_subcase?: number;
+ /** NX expression name (for mass_expression) */
+ expression_name?: string;
+ /** Mode number (for frequency) */
+ mode_number?: number;
+ /** Element type (for stress) */
+ element_type?: string;
+ /** Result type */
+ result_type?: string;
+ /** Metric type */
+ metric?: string;
+ /** Additional config properties */
+ [key: string]: unknown;
+}
+
+export interface CustomFunction {
+ /** Function name */
+ name?: string;
+ /** Python module path */
+ module?: string;
+ /** Function signature */
+ signature?: string;
+ /** Python source code */
+ source_code?: string;
+}
+
+export interface ExtractorOutput {
+ /** Output name (used by objectives/constraints) */
+ name: string;
+ /** Specific metric (max, total, rms, etc.) */
+ metric?: string;
+ /** Subcase ID for this output */
+ subcase?: number;
+ /** Units */
+ units?: string;
+}
+
+export interface Extractor {
+ /** Unique identifier (pattern: ext_XXX) */
+ id: string;
+ /** Human-readable name */
+ name: string;
+ /** Extractor type */
+ type: ExtractorType;
+ /** Whether this is a built-in extractor */
+ builtin?: boolean;
+ /** Type-specific configuration */
+ config?: ExtractorConfig;
+ /** Custom function definition (for custom_function type) */
+ function?: CustomFunction;
+ /** Output values this extractor produces */
+ outputs: ExtractorOutput[];
+ /** Canvas position */
+ canvas_position?: CanvasPosition;
+}
+
+// ============================================================================
+// Objective Types
+// ============================================================================
+
+export type OptimizationDirection = 'minimize' | 'maximize';
+
+export interface ObjectiveSource {
+ /** Reference to extractor */
+ extractor_id: string;
+ /** Which output from the extractor */
+ output_name: string;
+}
+
+export interface Objective {
+ /** Unique identifier (pattern: obj_XXX) */
+ id: string;
+ /** Human-readable name */
+ name: string;
+ /** Optimization direction */
+ direction: OptimizationDirection;
+ /** Weight for weighted sum (multi-objective) */
+ weight?: number;
+ /** Where the value comes from */
+ source: ObjectiveSource;
+ /** Target value (for goal programming) */
+ target?: number;
+ /** Units */
+ units?: string;
+ /** Description */
+ description?: string;
+ /** Canvas position */
+ canvas_position?: CanvasPosition;
+}
+
+// ============================================================================
+// Constraint Types
+// ============================================================================
+
+export type ConstraintType = 'hard' | 'soft';
+export type ConstraintOperator = '<=' | '>=' | '<' | '>' | '==';
+export type PenaltyMethod = 'linear' | 'quadratic' | 'exponential';
+
+export interface ConstraintSource {
+ extractor_id: string;
+ output_name: string;
+}
+
+export interface PenaltyConfig {
+ /** Penalty method */
+ method?: PenaltyMethod;
+ /** Penalty weight */
+ weight?: number;
+ /** Soft margin before penalty kicks in */
+ margin?: number;
+}
+
+export interface Constraint {
+ /** Unique identifier (pattern: con_XXX) */
+ id: string;
+ /** Human-readable name */
+ name: string;
+ /** Constraint type */
+ type: ConstraintType;
+ /** Comparison operator */
+ operator: ConstraintOperator;
+ /** Constraint threshold value */
+ threshold: number;
+ /** Where the value comes from */
+ source: ConstraintSource;
+ /** Penalty method configuration */
+ penalty_config?: PenaltyConfig;
+ /** Description */
+ description?: string;
+ /** Canvas position */
+ canvas_position?: CanvasPosition;
+}
+
+// ============================================================================
+// Optimization Types
+// ============================================================================
+
+export type AlgorithmType = 'TPE' | 'CMA-ES' | 'NSGA-II' | 'RandomSearch' | 'SAT_v3' | 'GP-BO';
+export type SurrogateType = 'MLP' | 'GNN' | 'ensemble';
+
+export interface AlgorithmConfig {
+ /** Population size (evolutionary algorithms) */
+ population_size?: number;
+ /** Number of generations */
+ n_generations?: number;
+ /** Mutation probability */
+ mutation_prob?: number | null;
+ /** Crossover probability */
+ crossover_prob?: number;
+ /** Random seed */
+ seed?: number;
+ /** Number of startup trials (TPE) */
+ n_startup_trials?: number;
+ /** Initial sigma (CMA-ES) */
+ sigma0?: number;
+ /** Additional config properties */
+ [key: string]: unknown;
+}
+
+export interface Algorithm {
+ type: AlgorithmType;
+ config?: AlgorithmConfig;
+}
+
+export interface OptimizationBudget {
+ /** Maximum number of trials */
+ max_trials?: number;
+ /** Maximum time in hours */
+ max_time_hours?: number;
+ /** Stop if no improvement for N trials */
+ convergence_patience?: number;
+}
+
+export interface SurrogateConfig {
+ /** Number of models in ensemble */
+ n_models?: number;
+ /** Network architecture layers */
+ architecture?: number[];
+ /** Retrain every N trials */
+ train_every_n_trials?: number;
+ /** Minimum training samples */
+ min_training_samples?: number;
+ /** Acquisition function candidates */
+ acquisition_candidates?: number;
+ /** FEA validations per round */
+ fea_validations_per_round?: number;
+}
+
+export interface Surrogate {
+ enabled?: boolean;
+ type?: SurrogateType;
+ config?: SurrogateConfig;
+}
+
+export interface OptimizationConfig {
+ algorithm: Algorithm;
+ budget: OptimizationBudget;
+ surrogate?: Surrogate;
+ canvas_position?: CanvasPosition;
+}
+
+// ============================================================================
+// Workflow Types
+// ============================================================================
+
+export interface WorkflowStage {
+ id: string;
+ name: string;
+ algorithm?: string;
+ trials?: number;
+ purpose?: string;
+}
+
+export interface WorkflowTransition {
+ from: string;
+ to: string;
+ condition?: string;
+}
+
+export interface Workflow {
+ stages?: WorkflowStage[];
+ transitions?: WorkflowTransition[];
+}
+
+// ============================================================================
+// Reporting Types
+// ============================================================================
+
+export interface InsightConfig {
+ include_html?: boolean;
+ show_pareto_evolution?: boolean;
+ [key: string]: unknown;
+}
+
+export interface Insight {
+ type?: string;
+ for_trials?: string;
+ config?: InsightConfig;
+}
+
+export interface ReportingConfig {
+ auto_report?: boolean;
+ report_triggers?: string[];
+ insights?: Insight[];
+}
+
+// ============================================================================
+// Canvas Types
+// ============================================================================
+
+export interface CanvasViewport {
+ x: number;
+ y: number;
+ zoom: number;
+}
+
+export interface CanvasEdge {
+ source: string;
+ target: string;
+ sourceHandle?: string;
+ targetHandle?: string;
+}
+
+export interface CanvasGroup {
+ id: string;
+ name: string;
+ node_ids: string[];
+}
+
+export interface CanvasConfig {
+ layout_version?: string;
+ viewport?: CanvasViewport;
+ edges?: CanvasEdge[];
+ groups?: CanvasGroup[];
+}
+
+// ============================================================================
+// Main AtomizerSpec Type
+// ============================================================================
+
+/**
+ * AtomizerSpec v2.0 - The unified configuration schema for Atomizer optimization studies.
+ *
+ * This is the single source of truth used by:
+ * - Canvas UI (rendering and editing)
+ * - Backend API (validation and storage)
+ * - Claude Assistant (reading and modifying)
+ * - Optimization Engine (execution)
+ */
+export interface AtomizerSpec {
+ /** Metadata about the spec */
+ meta: SpecMeta;
+ /** NX model files and configuration */
+ model: ModelConfig;
+ /** Design variables (NX expressions) to optimize */
+ design_variables: DesignVariable[];
+ /** Physics extractors that compute outputs from FEA results */
+ extractors: Extractor[];
+ /** Optimization objectives (minimize/maximize) */
+ objectives: Objective[];
+ /** Hard and soft constraints */
+ constraints?: Constraint[];
+ /** Optimization algorithm configuration */
+ optimization: OptimizationConfig;
+ /** Multi-stage optimization workflow */
+ workflow?: Workflow;
+ /** Reporting configuration */
+ reporting?: ReportingConfig;
+ /** Canvas UI state (persisted for reconstruction) */
+ canvas?: CanvasConfig;
+}
+
+// ============================================================================
+// Utility Types for API Responses
+// ============================================================================
+
+export interface SpecValidationError {
+ type: 'schema' | 'semantic' | 'reference';
+ path: string[];
+ message: string;
+}
+
+export interface SpecValidationWarning {
+ type: string;
+ path: string[];
+ message: string;
+}
+
+export interface SpecValidationReport {
+ valid: boolean;
+ errors: SpecValidationError[];
+ warnings: SpecValidationWarning[];
+ summary: {
+ design_variables: number;
+ extractors: number;
+ objectives: number;
+ constraints: number;
+ custom_functions: number;
+ };
+}
+
+export interface SpecModification {
+ operation: 'set' | 'add' | 'remove';
+ path: string;
+ value?: unknown;
+}
+
+export interface SpecUpdateResult {
+ success: boolean;
+ hash: string;
+ modified: string;
+ modified_by: string;
+}
+
+export interface SpecPatchRequest {
+ path: string;
+ value: unknown;
+ modified_by?: string;
+}
+
+// ============================================================================
+// Node Types for Canvas
+// ============================================================================
+
+export type SpecNodeType =
+ | 'designVar'
+ | 'extractor'
+ | 'objective'
+ | 'constraint'
+ | 'model'
+ | 'solver'
+ | 'algorithm';
+
+export interface SpecNodeBase {
+ id: string;
+ type: SpecNodeType;
+ position: CanvasPosition;
+ data: Record;
+}
+
+// ============================================================================
+// WebSocket Types
+// ============================================================================
+
+export type SpecSyncMessageType =
+ | 'spec_updated'
+ | 'validation_error'
+ | 'node_added'
+ | 'node_removed'
+ | 'connection_ack';
+
+export interface SpecSyncMessage {
+ type: SpecSyncMessageType;
+ timestamp: string;
+ hash?: string;
+ modified_by?: string;
+ changes?: Array<{
+ path: string;
+ old: unknown;
+ new: unknown;
+ }>;
+ error?: string;
+}
+
+export interface SpecClientMessage {
+ type: 'subscribe' | 'patch_node' | 'add_node' | 'remove_node' | 'update_position';
+ study_id: string;
+ node_id?: string;
+ data?: Record;
+ position?: CanvasPosition;
+}
diff --git a/mcp-server/atomizer-tools/src/index.ts b/mcp-server/atomizer-tools/src/index.ts
index ba7bb68a..91ec1147 100644
--- a/mcp-server/atomizer-tools/src/index.ts
+++ b/mcp-server/atomizer-tools/src/index.ts
@@ -22,6 +22,7 @@ import { analysisTools } from "./tools/analysis.js";
import { reportingTools } from "./tools/reporting.js";
import { physicsTools } from "./tools/physics.js";
import { canvasTools } from "./tools/canvas.js";
+import { specTools } from "./tools/spec.js";
import { adminTools } from "./tools/admin.js";
import { ATOMIZER_MODE } from "./utils/paths.js";
@@ -52,6 +53,7 @@ const userTools: AtomizerTool[] = [
...reportingTools,
...physicsTools,
...canvasTools,
+ ...specTools,
];
const powerTools: AtomizerTool[] = [
diff --git a/mcp-server/atomizer-tools/src/tools/spec.ts b/mcp-server/atomizer-tools/src/tools/spec.ts
new file mode 100644
index 00000000..f4d93da5
--- /dev/null
+++ b/mcp-server/atomizer-tools/src/tools/spec.ts
@@ -0,0 +1,1175 @@
+/**
+ * AtomizerSpec v2.0 MCP Tools
+ *
+ * Tools for Claude to interact with the unified AtomizerSpec configuration system.
+ * These tools enable reading, modifying, and creating optimization studies
+ * through natural language conversation.
+ *
+ * Phase 3: P3.1-P3.6 Claude Integration
+ */
+
+import { execSync } from "child_process";
+import { AtomizerTool } from "../index.js";
+import { PYTHON_PATH, ATOMIZER_ROOT, getStudyDir } from "../utils/paths.js";
+import * as fs from "fs";
+import * as path from "path";
+
+// Dashboard API base URL (for HTTP calls)
+const API_BASE = process.env.ATOMIZER_API_URL || "http://localhost:8000/api";
+
+/**
+ * Execute a Python script and return the result
+ */
+function runPython(script: string, timeout = 30000): string {
+ const escapedScript = script.replace(/"/g, '\\"');
+ return execSync(`"${PYTHON_PATH}" -c "${escapedScript}"`, {
+ encoding: "utf-8",
+ timeout,
+ cwd: ATOMIZER_ROOT,
+ });
+}
+
+/**
+ * Load spec via Python (for when Dashboard API isn't available)
+ */
+function loadSpecDirect(studyId: string): Record | null {
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path = study_dir / "atomizer_spec.json"
+
+if spec_path.exists():
+ with open(spec_path, 'r', encoding='utf-8') as f:
+ spec = json.load(f)
+ print(json.dumps({"success": True, "spec": spec}))
+else:
+ # Try to migrate from legacy config
+ legacy_paths = [
+ study_dir / "1_setup" / "optimization_config.json",
+ study_dir / "optimization_config.json",
+ ]
+ for lp in legacy_paths:
+ if lp.exists():
+ from optimization_engine.config.migrator import SpecMigrator
+ migrator = SpecMigrator(study_dir)
+ with open(lp, 'r', encoding='utf-8') as f:
+ old_config = json.load(f)
+ spec = migrator.migrate(old_config)
+ print(json.dumps({"success": True, "spec": spec, "migrated": True}))
+ sys.exit(0)
+ print(json.dumps({"success": False, "error": "No spec or config found"}))
+`;
+
+ try {
+ const output = runPython(script);
+ const result = JSON.parse(output.trim());
+ if (result.success) {
+ return result.spec;
+ }
+ return null;
+ } catch (error) {
+ console.error("Failed to load spec:", error);
+ return null;
+ }
+}
+
+/**
+ * Save spec via Python
+ */
+function saveSpecDirect(studyId: string, spec: Record): boolean {
+ const specJson = JSON.stringify(spec).replace(/"/g, '\\"').replace(/\n/g, "\\n");
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path = study_dir / "atomizer_spec.json"
+
+spec = json.loads("""${specJson}""")
+
+# Validate before saving
+from optimization_engine.config.spec_validator import SpecValidator
+validator = SpecValidator()
+report = validator.validate(spec)
+
+if not report.valid:
+ errors = [{"path": e.path, "message": e.message} for e in report.errors[:5]]
+ print(json.dumps({"success": False, "errors": errors}))
+ sys.exit(1)
+
+with open(spec_path, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+print(json.dumps({"success": True}))
+`;
+
+ try {
+ const output = runPython(script);
+ const result = JSON.parse(output.trim());
+ return result.success;
+ } catch (error) {
+ console.error("Failed to save spec:", error);
+ return false;
+ }
+}
+
+/**
+ * Apply a patch to the spec
+ */
+function patchSpecDirect(
+ studyId: string,
+ specPath: string,
+ value: unknown
+): { success: boolean; error?: string } {
+ const valueJson = JSON.stringify(value).replace(/"/g, '\\"');
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path_file = study_dir / "atomizer_spec.json"
+
+if not spec_path_file.exists():
+ print(json.dumps({"success": False, "error": "Spec not found"}))
+ sys.exit(1)
+
+with open(spec_path_file, 'r', encoding='utf-8') as f:
+ spec = json.load(f)
+
+# Apply patch using JSONPath-style navigation
+path_parts = "${specPath}".replace("][", ".").replace("[", ".").replace("]", "").split(".")
+path_parts = [p for p in path_parts if p]
+
+value = json.loads("""${valueJson}""")
+
+# Navigate to parent and set value
+current = spec
+for i, part in enumerate(path_parts[:-1]):
+ if part.isdigit():
+ current = current[int(part)]
+ else:
+ current = current[part]
+
+final_key = path_parts[-1]
+if final_key.isdigit():
+ current[int(final_key)] = value
+else:
+ current[final_key] = value
+
+# Validate and save
+from optimization_engine.config.spec_validator import SpecValidator
+validator = SpecValidator()
+report = validator.validate(spec)
+
+if not report.valid:
+ errors = [{"path": e.path, "message": e.message} for e in report.errors[:3]]
+ print(json.dumps({"success": False, "errors": errors}))
+ sys.exit(1)
+
+with open(spec_path_file, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+print(json.dumps({"success": True}))
+`;
+
+ try {
+ const output = runPython(script);
+ const result = JSON.parse(output.trim());
+ return result;
+ } catch (error) {
+ return {
+ success: false,
+ error: error instanceof Error ? error.message : String(error),
+ };
+ }
+}
+
+// ============================================================================
+// Spec Tools
+// ============================================================================
+
+export const specTools: AtomizerTool[] = [
+ // P3.1 - spec_get: Retrieve full AtomizerSpec
+ {
+ definition: {
+ name: "spec_get",
+ description:
+ "Get the full AtomizerSpec for a study. Returns the complete unified configuration including design variables, extractors, objectives, constraints, and optimization settings.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_id: {
+ type: "string",
+ description:
+ "Study identifier (e.g., 'M1_Mirror/m1_mirror_flatback_lateral' or 'bracket_v1')",
+ },
+ include_summary: {
+ type: "boolean",
+ description: "Include a human-readable summary (default: true)",
+ },
+ },
+ required: ["study_id"],
+ },
+ },
+ handler: async (args) => {
+ const studyId = args.study_id as string;
+ const includeSummary = args.include_summary !== false;
+
+ const spec = loadSpecDirect(studyId);
+
+ if (!spec) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: `No AtomizerSpec found for study "${studyId}". The study may not exist or hasn't been migrated to v2.0 format.`,
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+
+ const result: Record = {
+ success: true,
+ study_id: studyId,
+ spec,
+ };
+
+ if (includeSummary) {
+ const dvs = (spec.design_variables as unknown[]) || [];
+ const exts = (spec.extractors as unknown[]) || [];
+ const objs = (spec.objectives as unknown[]) || [];
+ const cons = (spec.constraints as unknown[]) || [];
+ const opt = spec.optimization as Record || {};
+
+ result.summary = {
+ study_name: (spec.meta as Record)?.study_name,
+ design_variables: dvs.length,
+ extractors: exts.length,
+ objectives: objs.length,
+ constraints: cons.length,
+ algorithm: (opt.algorithm as Record)?.type || "unknown",
+ max_trials: ((opt.budget as Record)?.max_trials) || 100,
+ };
+ }
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify(result, null, 2),
+ },
+ ],
+ };
+ },
+ },
+
+ // P3.2 - spec_modify: Apply modifications to spec
+ {
+ definition: {
+ name: "spec_modify",
+ description:
+ "Modify specific fields in an AtomizerSpec. Use JSONPath-style paths to update nested values. Examples: 'design_variables[0].bounds.max', 'optimization.budget.max_trials', 'objectives[1].weight'.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_id: {
+ type: "string",
+ description: "Study identifier",
+ },
+ path: {
+ type: "string",
+ description:
+ "JSONPath-style path to the field (e.g., 'design_variables[0].bounds.max')",
+ },
+ value: {
+ description: "New value to set",
+ },
+ },
+ required: ["study_id", "path", "value"],
+ },
+ },
+ handler: async (args) => {
+ const studyId = args.study_id as string;
+ const specPath = args.path as string;
+ const value = args.value;
+
+ const result = patchSpecDirect(studyId, specPath, value);
+
+ if (!result.success) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: result.error || "Failed to apply modification",
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: true,
+ message: `Updated ${specPath} to ${JSON.stringify(value)}`,
+ study_id: studyId,
+ }),
+ },
+ ],
+ };
+ },
+ },
+
+ // P3.3 - spec_add_node: Add design vars, extractors, objectives, constraints
+ {
+ definition: {
+ name: "spec_add_node",
+ description:
+ "Add a new node to the AtomizerSpec. Can add design variables, extractors, objectives, or constraints.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_id: {
+ type: "string",
+ description: "Study identifier",
+ },
+ node_type: {
+ type: "string",
+ enum: ["design_variable", "extractor", "objective", "constraint"],
+ description: "Type of node to add",
+ },
+ data: {
+ type: "object",
+ description: "Node configuration data",
+ },
+ },
+ required: ["study_id", "node_type", "data"],
+ },
+ },
+ handler: async (args) => {
+ const studyId = args.study_id as string;
+ const nodeType = args.node_type as string;
+ const data = args.data as Record;
+
+ const dataJson = JSON.stringify(data).replace(/"/g, '\\"');
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path = study_dir / "atomizer_spec.json"
+
+if not spec_path.exists():
+ print(json.dumps({"success": False, "error": "Spec not found"}))
+ sys.exit(1)
+
+with open(spec_path, 'r', encoding='utf-8') as f:
+ spec = json.load(f)
+
+data = json.loads("""${dataJson}""")
+node_type = "${nodeType}"
+
+# Generate unique ID
+import uuid
+node_id = f"{node_type[:2]}_{str(uuid.uuid4())[:8]}"
+
+# Build the node based on type
+if node_type == "design_variable":
+ node = {
+ "id": node_id,
+ "name": data.get("name", "New Variable"),
+ "expression_name": data.get("expression_name", data.get("name", "expr")),
+ "bounds": {
+ "min": data.get("min", data.get("bounds", {}).get("min", 0)),
+ "max": data.get("max", data.get("bounds", {}).get("max", 100)),
+ },
+ "baseline": data.get("baseline"),
+ "units": data.get("units", data.get("unit")),
+ "enabled": data.get("enabled", True),
+ "type": data.get("type", "continuous"),
+ }
+ spec.setdefault("design_variables", []).append(node)
+
+elif node_type == "extractor":
+ node = {
+ "id": node_id,
+ "name": data.get("name", "New Extractor"),
+ "type": data.get("extractor_type", data.get("type", "custom_function")),
+ "builtin": data.get("builtin", False),
+ "outputs": data.get("outputs", [{"name": "output", "units": None}]),
+ "config": data.get("config", {}),
+ }
+ if data.get("function"):
+ node["function"] = data["function"]
+ spec.setdefault("extractors", []).append(node)
+
+elif node_type == "objective":
+ node = {
+ "id": node_id,
+ "name": data.get("name", "New Objective"),
+ "direction": data.get("direction", "minimize"),
+ "weight": data.get("weight", 1.0),
+ "source": {
+ "extractor_id": data.get("extractor_id", data.get("source", {}).get("extractor_id")),
+ "output_name": data.get("output_name", data.get("source", {}).get("output_name")),
+ },
+ }
+ spec.setdefault("objectives", []).append(node)
+
+elif node_type == "constraint":
+ node = {
+ "id": node_id,
+ "name": data.get("name", "New Constraint"),
+ "type": data.get("constraint_type", "inequality"),
+ "operator": data.get("operator", "<="),
+ "threshold": data.get("threshold", data.get("value", 0)),
+ "source": {
+ "extractor_id": data.get("extractor_id", data.get("source", {}).get("extractor_id")),
+ "output_name": data.get("output_name", data.get("source", {}).get("output_name")),
+ },
+ }
+ spec.setdefault("constraints", []).append(node)
+else:
+ print(json.dumps({"success": False, "error": f"Unknown node type: {node_type}"}))
+ sys.exit(1)
+
+# Validate and save
+from optimization_engine.config.spec_validator import SpecValidator
+validator = SpecValidator()
+report = validator.validate(spec, strict=False)
+
+# Save even with warnings (but not errors)
+with open(spec_path, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+print(json.dumps({
+ "success": True,
+ "node_id": node_id,
+ "node_type": node_type,
+ "warnings": [w.message for w in report.warnings[:3]] if hasattr(report, 'warnings') else []
+}))
+`;
+
+ try {
+ const output = runPython(script);
+ const result = JSON.parse(output.trim());
+
+ if (!result.success) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify(result),
+ },
+ ],
+ isError: true,
+ };
+ }
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: true,
+ message: `Added ${nodeType} with ID "${result.node_id}"`,
+ node_id: result.node_id,
+ study_id: studyId,
+ warnings: result.warnings,
+ }),
+ },
+ ],
+ };
+ } catch (error) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+ },
+ },
+
+ // P3.4 - spec_remove_node: Remove nodes with edge cleanup
+ {
+ definition: {
+ name: "spec_remove_node",
+ description:
+ "Remove a node from the AtomizerSpec. Also removes any edges connected to the node.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_id: {
+ type: "string",
+ description: "Study identifier",
+ },
+ node_id: {
+ type: "string",
+ description: "ID of the node to remove",
+ },
+ },
+ required: ["study_id", "node_id"],
+ },
+ },
+ handler: async (args) => {
+ const studyId = args.study_id as string;
+ const nodeId = args.node_id as string;
+
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path = study_dir / "atomizer_spec.json"
+
+if not spec_path.exists():
+ print(json.dumps({"success": False, "error": "Spec not found"}))
+ sys.exit(1)
+
+with open(spec_path, 'r', encoding='utf-8') as f:
+ spec = json.load(f)
+
+node_id = "${nodeId}"
+removed = False
+node_type = None
+
+# Try to find and remove from each collection
+for collection_name in ["design_variables", "extractors", "objectives", "constraints"]:
+ collection = spec.get(collection_name, [])
+ original_len = len(collection)
+ spec[collection_name] = [n for n in collection if n.get("id") != node_id]
+ if len(spec[collection_name]) < original_len:
+ removed = True
+ node_type = collection_name
+ break
+
+if not removed:
+ print(json.dumps({"success": False, "error": f"Node '{node_id}' not found"}))
+ sys.exit(1)
+
+# Remove connected edges
+if "canvas" in spec and "edges" in spec["canvas"]:
+ spec["canvas"]["edges"] = [
+ e for e in spec["canvas"]["edges"]
+ if e.get("source") != node_id and e.get("target") != node_id
+ ]
+
+# If removing an extractor, also remove objective/constraint references
+if node_type == "extractors":
+ for obj in spec.get("objectives", []):
+ if obj.get("source", {}).get("extractor_id") == node_id:
+ obj["source"]["extractor_id"] = None
+ for con in spec.get("constraints", []):
+ if con.get("source", {}).get("extractor_id") == node_id:
+ con["source"]["extractor_id"] = None
+
+with open(spec_path, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+print(json.dumps({
+ "success": True,
+ "removed_id": node_id,
+ "node_type": node_type
+}))
+`;
+
+ try {
+ const output = runPython(script);
+ const result = JSON.parse(output.trim());
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify(result),
+ },
+ ],
+ isError: !result.success,
+ };
+ } catch (error) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+ },
+ },
+
+ // P3.5 - spec_validate: Validate spec and return detailed report
+ {
+ definition: {
+ name: "spec_validate",
+ description:
+ "Validate an AtomizerSpec and return a detailed validation report with errors, warnings, and suggestions.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_id: {
+ type: "string",
+ description: "Study identifier",
+ },
+ strict: {
+ type: "boolean",
+ description: "Enable strict validation (default: false)",
+ },
+ },
+ required: ["study_id"],
+ },
+ },
+ handler: async (args) => {
+ const studyId = args.study_id as string;
+ const strict = args.strict as boolean || false;
+
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path = study_dir / "atomizer_spec.json"
+
+if not spec_path.exists():
+ print(json.dumps({"success": False, "error": "Spec not found"}))
+ sys.exit(1)
+
+with open(spec_path, 'r', encoding='utf-8') as f:
+ spec = json.load(f)
+
+from optimization_engine.config.spec_validator import SpecValidator
+validator = SpecValidator()
+report = validator.validate(spec, strict=${strict ? "True" : "False"})
+
+result = {
+ "success": True,
+ "valid": report.valid,
+ "errors": [{"path": e.path, "message": e.message, "code": e.code} for e in report.errors],
+ "warnings": [{"path": w.path, "message": w.message, "code": w.code} for w in report.warnings],
+ "summary": {
+ "error_count": len(report.errors),
+ "warning_count": len(report.warnings),
+ "status": "valid" if report.valid else "invalid",
+ }
+}
+
+print(json.dumps(result))
+`;
+
+ try {
+ const output = runPython(script);
+ const result = JSON.parse(output.trim());
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify(result, null, 2),
+ },
+ ],
+ isError: !result.success,
+ };
+ } catch (error) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+ },
+ },
+
+ // P3.6 - spec_add_custom_extractor: Add custom Python function as extractor
+ {
+ definition: {
+ name: "spec_add_custom_extractor",
+ description:
+ "Add a custom Python function as an extractor. The function will be validated for syntax and stored in the spec. Use this when you need custom physics extraction logic.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_id: {
+ type: "string",
+ description: "Study identifier",
+ },
+ name: {
+ type: "string",
+ description: "Name for the custom extractor",
+ },
+ code: {
+ type: "string",
+ description:
+ "Python function code. Must define a function that takes (trial_dir: Path, params: dict) and returns dict of outputs.",
+ },
+ outputs: {
+ type: "array",
+ items: { type: "string" },
+ description: "List of output names this extractor produces",
+ },
+ description: {
+ type: "string",
+ description: "Optional description of what this extractor does",
+ },
+ },
+ required: ["study_id", "name", "code", "outputs"],
+ },
+ },
+ handler: async (args) => {
+ const studyId = args.study_id as string;
+ const name = args.name as string;
+ const code = args.code as string;
+ const outputs = args.outputs as string[];
+ const description = args.description as string || "";
+
+ // Escape the code for embedding in Python
+ const escapedCode = code
+ .replace(/\\/g, "\\\\")
+ .replace(/"/g, '\\"')
+ .replace(/\n/g, "\\n")
+ .replace(/\r/g, "");
+
+ const outputsJson = JSON.stringify(outputs);
+
+ const script = `
+import sys
+import json
+import ast
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / "${studyId.replace(/\\/g, "/")}"
+spec_path = study_dir / "atomizer_spec.json"
+
+if not spec_path.exists():
+ print(json.dumps({"success": False, "error": "Spec not found"}))
+ sys.exit(1)
+
+# Validate Python syntax
+code = """${escapedCode}"""
+try:
+ ast.parse(code)
+except SyntaxError as e:
+ print(json.dumps({
+ "success": False,
+ "error": f"Python syntax error: {e.msg} at line {e.lineno}",
+ "line": e.lineno,
+ "offset": e.offset
+ }))
+ sys.exit(1)
+
+# Check for dangerous patterns
+dangerous_patterns = ["import os", "import subprocess", "exec(", "eval(", "__import__", "open("]
+for pattern in dangerous_patterns:
+ if pattern in code:
+ print(json.dumps({
+ "success": False,
+ "error": f"Security error: '{pattern}' is not allowed in custom functions"
+ }))
+ sys.exit(1)
+
+with open(spec_path, 'r', encoding='utf-8') as f:
+ spec = json.load(f)
+
+import uuid
+extractor_id = f"custom_{str(uuid.uuid4())[:8]}"
+
+outputs = ${outputsJson}
+extractor = {
+ "id": extractor_id,
+ "name": "${name}",
+ "type": "custom_function",
+ "builtin": False,
+ "description": "${description}",
+ "outputs": [{"name": o, "units": None} for o in outputs],
+ "function": {
+ "code": code,
+ "inputs": ["trial_dir", "params"],
+ "outputs": outputs
+ }
+}
+
+spec.setdefault("extractors", []).append(extractor)
+
+with open(spec_path, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+print(json.dumps({
+ "success": True,
+ "extractor_id": extractor_id,
+ "name": "${name}",
+ "outputs": outputs,
+ "message": "Custom extractor added successfully. Connect it to objectives/constraints to use it."
+}))
+`;
+
+ try {
+ const output = runPython(script, 60000);
+ const result = JSON.parse(output.trim());
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify(result, null, 2),
+ },
+ ],
+ isError: !result.success,
+ };
+ } catch (error) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+ },
+ },
+
+ // P3.13 - spec_create_from_description: Create new spec from natural language
+ {
+ definition: {
+ name: "spec_create_from_description",
+ description:
+ "Create a new AtomizerSpec study from a natural language description. Provide the optimization goal, design variables, and model path to generate a complete spec.",
+ inputSchema: {
+ type: "object" as const,
+ properties: {
+ study_name: {
+ type: "string",
+ description: "Name for the new study (snake_case, e.g., 'bracket_mass_v1')",
+ },
+ description: {
+ type: "string",
+ description:
+ "Natural language description of the optimization goal (e.g., 'Minimize mass while keeping stress below 100 MPa')",
+ },
+ model_path: {
+ type: "string",
+ description: "Path to the NX model file (.prt, .fem, or .sim)",
+ },
+ design_variables: {
+ type: "array",
+ items: {
+ type: "object",
+ properties: {
+ name: { type: "string" },
+ min: { type: "number" },
+ max: { type: "number" },
+ unit: { type: "string" },
+ },
+ },
+ description: "List of design variables with bounds",
+ },
+ template: {
+ type: "string",
+ enum: ["bracket", "beam", "mirror", "custom"],
+ description: "Base template to use (default: custom)",
+ },
+ },
+ required: ["study_name", "description"],
+ },
+ },
+ handler: async (args) => {
+ const studyName = args.study_name as string;
+ const description = args.description as string;
+ const modelPath = args.model_path as string || "";
+ const designVariables = args.design_variables as Array<{
+ name: string;
+ min: number;
+ max: number;
+ unit?: string;
+ }> || [];
+ const template = args.template as string || "custom";
+
+ const dvJson = JSON.stringify(designVariables).replace(/"/g, '\\"');
+
+ const script = `
+import sys
+import json
+sys.path.insert(0, r"${ATOMIZER_ROOT}")
+from pathlib import Path
+
+study_name = "${studyName}"
+description = """${description.replace(/"/g, '\\"')}"""
+model_path = "${modelPath.replace(/\\/g, "/")}"
+template = "${template}"
+design_variables = json.loads("""${dvJson}""")
+
+from optimization_engine.study.creator import StudyCreator
+
+try:
+ creator = StudyCreator()
+
+ # Analyze description for objectives
+ desc_lower = description.lower()
+
+ objectives = []
+ constraints = []
+
+ # Extract objectives from description
+ if "mass" in desc_lower and ("minimize" in desc_lower or "reduce" in desc_lower or "light" in desc_lower):
+ objectives.append({
+ "name": "mass",
+ "direction": "minimize",
+ "extractor_type": "mass"
+ })
+ if "stress" in desc_lower:
+ if "below" in desc_lower or "under" in desc_lower or "max" in desc_lower:
+ # It's a constraint
+ import re
+ stress_match = re.search(r'stress.*?(\\d+)', desc_lower)
+ limit = float(stress_match.group(1)) if stress_match else 100.0
+ constraints.append({
+ "name": "stress_limit",
+ "type": "stress",
+ "operator": "<=",
+ "threshold": limit
+ })
+ else:
+ objectives.append({
+ "name": "stress",
+ "direction": "minimize",
+ "extractor_type": "stress"
+ })
+
+ if "displacement" in desc_lower or "deflection" in desc_lower:
+ if "below" in desc_lower or "under" in desc_lower:
+ import re
+ disp_match = re.search(r'(displacement|deflection).*?(\\d+\\.?\\d*)', desc_lower)
+ limit = float(disp_match.group(2)) if disp_match else 1.0
+ constraints.append({
+ "name": "displacement_limit",
+ "type": "displacement",
+ "operator": "<=",
+ "threshold": limit
+ })
+ else:
+ objectives.append({
+ "name": "displacement",
+ "direction": "minimize",
+ "extractor_type": "displacement"
+ })
+
+ if "wfe" in desc_lower or "zernike" in desc_lower or "wavefront" in desc_lower:
+ objectives.append({
+ "name": "wfe",
+ "direction": "minimize",
+ "extractor_type": "zernike_opd"
+ })
+
+ # Default to mass if nothing detected
+ if not objectives:
+ objectives.append({
+ "name": "objective",
+ "direction": "minimize",
+ "extractor_type": "mass"
+ })
+
+ # Create study directory
+ study_dir = Path(r"${ATOMIZER_ROOT}") / "studies" / study_name
+ study_dir.mkdir(parents=True, exist_ok=True)
+
+ # Build spec
+ import uuid
+ spec = {
+ "version": "2.0",
+ "meta": {
+ "study_name": study_name,
+ "description": description,
+ "created": __import__('datetime').datetime.now().isoformat(),
+ "created_by": "claude",
+ "template": template
+ },
+ "model": {
+ "sim": {
+ "path": model_path or None,
+ "solver": "nastran",
+ "solution_type": "SOL101"
+ }
+ },
+ "design_variables": [
+ {
+ "id": f"dv_{str(uuid.uuid4())[:8]}",
+ "name": dv.get("name", f"var_{i}"),
+ "expression_name": dv.get("name", f"var_{i}"),
+ "bounds": {"min": dv.get("min", 0), "max": dv.get("max", 100)},
+ "baseline": (dv.get("min", 0) + dv.get("max", 100)) / 2,
+ "units": dv.get("unit"),
+ "enabled": True,
+ "type": "continuous"
+ }
+ for i, dv in enumerate(design_variables)
+ ],
+ "extractors": [],
+ "objectives": [],
+ "constraints": [],
+ "optimization": {
+ "algorithm": {
+ "type": "TPE" if len(objectives) == 1 else "NSGA-II",
+ "config": {}
+ },
+ "budget": {
+ "max_trials": 100,
+ "timeout_hours": 24
+ }
+ }
+ }
+
+ # Add extractors and objectives
+ for obj in objectives:
+ ext_id = f"ext_{str(uuid.uuid4())[:8]}"
+ spec["extractors"].append({
+ "id": ext_id,
+ "name": obj["extractor_type"],
+ "type": obj["extractor_type"],
+ "builtin": True,
+ "outputs": [{"name": obj["name"], "units": None}]
+ })
+ spec["objectives"].append({
+ "id": f"obj_{str(uuid.uuid4())[:8]}",
+ "name": obj["name"],
+ "direction": obj["direction"],
+ "weight": 1.0,
+ "source": {
+ "extractor_id": ext_id,
+ "output_name": obj["name"]
+ }
+ })
+
+ # Add constraints
+ for con in constraints:
+ ext_id = f"ext_{str(uuid.uuid4())[:8]}"
+ # Check if extractor already exists
+ existing = [e for e in spec["extractors"] if e["type"] == con["type"]]
+ if not existing:
+ spec["extractors"].append({
+ "id": ext_id,
+ "name": con["type"],
+ "type": con["type"],
+ "builtin": True,
+ "outputs": [{"name": con["name"], "units": None}]
+ })
+ else:
+ ext_id = existing[0]["id"]
+
+ spec["constraints"].append({
+ "id": f"con_{str(uuid.uuid4())[:8]}",
+ "name": con["name"],
+ "type": "inequality",
+ "operator": con["operator"],
+ "threshold": con["threshold"],
+ "source": {
+ "extractor_id": ext_id,
+ "output_name": con["name"]
+ }
+ })
+
+ # Save spec
+ spec_path = study_dir / "atomizer_spec.json"
+ with open(spec_path, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+ print(json.dumps({
+ "success": True,
+ "study_name": study_name,
+ "path": str(study_dir),
+ "spec_path": str(spec_path),
+ "summary": {
+ "design_variables": len(spec["design_variables"]),
+ "extractors": len(spec["extractors"]),
+ "objectives": len(spec["objectives"]),
+ "constraints": len(spec["constraints"]),
+ "algorithm": spec["optimization"]["algorithm"]["type"]
+ },
+ "message": f"Study '{study_name}' created successfully. You can now add more details or run the optimization."
+ }))
+
+except Exception as e:
+ import traceback
+ print(json.dumps({
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }))
+ sys.exit(1)
+`;
+
+ try {
+ const output = runPython(script, 60000);
+ const result = JSON.parse(output.trim());
+
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify(result, null, 2),
+ },
+ ],
+ isError: !result.success,
+ };
+ } catch (error) {
+ return {
+ content: [
+ {
+ type: "text",
+ text: JSON.stringify({
+ success: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ },
+ ],
+ isError: true,
+ };
+ }
+ },
+ },
+];
diff --git a/optimization_engine/config/__init__.py b/optimization_engine/config/__init__.py
index fdba7fe3..7f527e3c 100644
--- a/optimization_engine/config/__init__.py
+++ b/optimization_engine/config/__init__.py
@@ -9,6 +9,7 @@ Modules:
- builder: OptimizationConfigBuilder for creating configs
- setup_wizard: Interactive configuration setup
- capability_matcher: Match capabilities to requirements
+- spec_models: AtomizerSpec v2.0 Pydantic models (unified configuration)
"""
# Lazy imports to avoid circular dependencies
@@ -31,6 +32,27 @@ def __getattr__(name):
elif name == 'TemplateLoader':
from .template_loader import TemplateLoader
return TemplateLoader
+ elif name == 'AtomizerSpec':
+ from .spec_models import AtomizerSpec
+ return AtomizerSpec
+ elif name == 'SpecValidator':
+ from .spec_validator import SpecValidator
+ return SpecValidator
+ elif name == 'SpecValidationError':
+ from .spec_validator import SpecValidationError
+ return SpecValidationError
+ elif name == 'validate_spec':
+ from .spec_validator import validate_spec
+ return validate_spec
+ elif name == 'SpecMigrator':
+ from .migrator import SpecMigrator
+ return SpecMigrator
+ elif name == 'migrate_config':
+ from .migrator import migrate_config
+ return migrate_config
+ elif name == 'migrate_config_file':
+ from .migrator import migrate_config_file
+ return migrate_config_file
raise AttributeError(f"module 'optimization_engine.config' has no attribute '{name}'")
__all__ = [
@@ -40,4 +62,11 @@ __all__ = [
'SetupWizard',
'CapabilityMatcher',
'TemplateLoader',
+ 'AtomizerSpec',
+ 'SpecValidator',
+ 'SpecValidationError',
+ 'validate_spec',
+ 'SpecMigrator',
+ 'migrate_config',
+ 'migrate_config_file',
]
diff --git a/optimization_engine/config/migrator.py b/optimization_engine/config/migrator.py
new file mode 100644
index 00000000..716bbd06
--- /dev/null
+++ b/optimization_engine/config/migrator.py
@@ -0,0 +1,844 @@
+"""
+AtomizerSpec v2.0 Migrator
+
+Converts legacy optimization_config.json files to AtomizerSpec v2.0 format.
+
+Supports migration from:
+- Mirror/Zernike configs (extraction_method, zernike_settings)
+- Structural/Bracket configs (optimization_settings, simulation_settings)
+- Canvas Intent format (simplified canvas output)
+
+Migration Rules:
+- bounds: [min, max] -> bounds: {min, max}
+- parameter -> expression_name
+- goal/type: "minimize"/"maximize" -> direction: "minimize"/"maximize"
+- Infers extractors from objectives and extraction settings
+- Generates canvas edges automatically
+"""
+
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
+import json
+import re
+
+
+class MigrationError(Exception):
+ """Raised when migration fails."""
+ pass
+
+
+class SpecMigrator:
+ """
+ Migrate old optimization_config.json to AtomizerSpec v2.0.
+
+ Handles multiple legacy formats and infers missing information.
+ """
+
+ # Extractor type inference based on objective names
+ EXTRACTOR_INFERENCE = {
+ # Zernike patterns
+ r"wfe|zernike|opd": "zernike_opd",
+ r"mfg|manufacturing": "zernike_opd",
+ r"rms": "zernike_opd",
+ # Structural patterns
+ r"displacement|deflection|deform": "displacement",
+ r"stress|von.?mises": "stress",
+ r"frequency|modal|eigen": "frequency",
+ r"mass|weight": "mass",
+ r"stiffness": "displacement", # Stiffness computed from displacement
+ r"temperature|thermal": "temperature",
+ }
+
+ def __init__(self, study_path: Optional[Path] = None):
+ """
+ Initialize migrator.
+
+ Args:
+ study_path: Path to study directory (for inferring sim/fem paths)
+ """
+ self.study_path = Path(study_path) if study_path else None
+ self._extractor_counter = 0
+ self._objective_counter = 0
+ self._constraint_counter = 0
+ self._dv_counter = 0
+
+ def migrate(
+ self,
+ old_config: Dict[str, Any],
+ study_name: Optional[str] = None
+ ) -> Dict[str, Any]:
+ """
+ Convert old config to AtomizerSpec v2.0.
+
+ Args:
+ old_config: Legacy config dict
+ study_name: Override study name (defaults to config value)
+
+ Returns:
+ AtomizerSpec v2.0 dict
+ """
+ # Reset counters
+ self._extractor_counter = 0
+ self._objective_counter = 0
+ self._constraint_counter = 0
+ self._dv_counter = 0
+
+ # Detect config type
+ config_type = self._detect_config_type(old_config)
+
+ # Build spec
+ spec = {
+ "meta": self._migrate_meta(old_config, study_name),
+ "model": self._migrate_model(old_config, config_type),
+ "design_variables": self._migrate_design_variables(old_config),
+ "extractors": [],
+ "objectives": [],
+ "constraints": [],
+ "optimization": self._migrate_optimization(old_config, config_type),
+ "canvas": {"edges": [], "layout_version": "2.0"}
+ }
+
+ # Migrate extractors and objectives together (they're linked)
+ extractors, objectives = self._migrate_extractors_and_objectives(old_config, config_type)
+ spec["extractors"] = extractors
+ spec["objectives"] = objectives
+
+ # Migrate constraints
+ spec["constraints"] = self._migrate_constraints(old_config, spec["extractors"])
+
+ # Generate canvas edges
+ spec["canvas"]["edges"] = self._generate_edges(spec)
+
+ # Add workflow if SAT/turbo settings present
+ if self._has_sat_settings(old_config):
+ spec["workflow"] = self._migrate_workflow(old_config)
+
+ return spec
+
+ def migrate_file(
+ self,
+ config_path: Union[str, Path],
+ output_path: Optional[Union[str, Path]] = None
+ ) -> Dict[str, Any]:
+ """
+ Migrate a config file and optionally save the result.
+
+ Args:
+ config_path: Path to old config file
+ output_path: Path to save new spec (optional)
+
+ Returns:
+ AtomizerSpec v2.0 dict
+ """
+ config_path = Path(config_path)
+
+ if not config_path.exists():
+ raise MigrationError(f"Config file not found: {config_path}")
+
+ with open(config_path, 'r', encoding='utf-8') as f:
+ old_config = json.load(f)
+
+ # Infer study path from config location
+ if self.study_path is None:
+ # Config is typically in study_dir/1_setup/ or study_dir/
+ if config_path.parent.name == "1_setup":
+ self.study_path = config_path.parent.parent
+ else:
+ self.study_path = config_path.parent
+
+ spec = self.migrate(old_config)
+
+ if output_path:
+ output_path = Path(output_path)
+ with open(output_path, 'w', encoding='utf-8') as f:
+ json.dump(spec, f, indent=2, ensure_ascii=False)
+
+ return spec
+
+ # =========================================================================
+ # Detection
+ # =========================================================================
+
+ def _detect_config_type(self, config: Dict) -> str:
+ """Detect the type of config format."""
+ if "extraction_method" in config or "zernike_settings" in config:
+ return "mirror"
+ elif "simulation_settings" in config or "extraction_settings" in config:
+ return "structural"
+ elif "optimization_settings" in config:
+ return "structural"
+ elif "extractors" in config:
+ # Already partially in new format (canvas intent)
+ return "canvas_intent"
+ else:
+ # Generic/minimal format
+ return "generic"
+
+ def _has_sat_settings(self, config: Dict) -> bool:
+ """Check if config has SAT/turbo settings."""
+ return (
+ "sat_settings" in config or
+ config.get("optimization", {}).get("algorithm") in ["SAT_v3", "SAT", "turbo"]
+ )
+
+ # =========================================================================
+ # Meta Migration
+ # =========================================================================
+
+ def _migrate_meta(self, config: Dict, study_name: Optional[str]) -> Dict:
+ """Migrate metadata section."""
+ now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
+
+ name = study_name or config.get("study_name", "migrated_study")
+ # Ensure snake_case
+ name = re.sub(r'[^a-z0-9_]', '_', name.lower())
+ name = re.sub(r'_+', '_', name).strip('_')
+
+ meta = {
+ "version": "2.0",
+ "created": now,
+ "modified": now,
+ "created_by": "migration",
+ "modified_by": "migration",
+ "study_name": name,
+ "description": config.get("description", ""),
+ "tags": []
+ }
+
+ # Extract tags from various sources
+ if "study_tag" in config:
+ meta["tags"].append(config["study_tag"])
+
+ if "business_context" in config:
+ meta["engineering_context"] = config["business_context"].get("purpose", "")
+
+ # Infer tags from config type
+ if "zernike_settings" in config:
+ meta["tags"].extend(["mirror", "zernike"])
+ if "extraction_method" in config:
+ if config["extraction_method"].get("type") == "zernike_opd":
+ meta["tags"].append("opd")
+
+ return meta
+
+ # =========================================================================
+ # Model Migration
+ # =========================================================================
+
+ def _migrate_model(self, config: Dict, config_type: str) -> Dict:
+ """Migrate model section (sim/fem/prt paths)."""
+ model = {
+ "sim": {
+ "path": "",
+ "solver": "nastran"
+ }
+ }
+
+ # Extract from nx_settings (mirror format)
+ if "nx_settings" in config:
+ nx = config["nx_settings"]
+ model["sim"]["path"] = nx.get("sim_file", "")
+ if "nx_install_path" in nx:
+ model["nx_settings"] = {
+ "nx_install_path": nx["nx_install_path"],
+ "simulation_timeout_s": nx.get("simulation_timeout_s", 600)
+ }
+
+ # Extract from simulation_settings (structural format)
+ elif "simulation_settings" in config:
+ sim = config["simulation_settings"]
+ model["sim"]["path"] = sim.get("sim_file", "")
+ solver = sim.get("solver", "nastran").lower()
+ # Normalize solver name - valid values: nastran, NX_Nastran, abaqus
+ solver_map = {"nx": "nastran", "nx_nastran": "NX_Nastran", "nxnastran": "NX_Nastran"}
+ model["sim"]["solver"] = solver_map.get(solver, "nastran" if solver not in ["nastran", "NX_Nastran", "abaqus"] else solver)
+ if sim.get("solution_type"):
+ model["sim"]["solution_type"] = sim["solution_type"]
+
+ if sim.get("model_file"):
+ model["nx_part"] = {"path": sim["model_file"]}
+ if sim.get("fem_file"):
+ model["fem"] = {"path": sim["fem_file"]}
+
+ # Try to infer from study path
+ if self.study_path and not model["sim"]["path"]:
+ setup_dir = self.study_path / "1_setup" / "model"
+ if setup_dir.exists():
+ for f in setup_dir.glob("*.sim"):
+ model["sim"]["path"] = str(f.relative_to(self.study_path))
+ break
+
+ return model
+
+ # =========================================================================
+ # Design Variables Migration
+ # =========================================================================
+
+ def _migrate_design_variables(self, config: Dict) -> List[Dict]:
+ """Migrate design variables."""
+ dvs = []
+
+ for dv in config.get("design_variables", []):
+ self._dv_counter += 1
+
+ # Handle different bound formats
+ if "bounds" in dv:
+ if isinstance(dv["bounds"], list):
+ bounds = {"min": dv["bounds"][0], "max": dv["bounds"][1]}
+ else:
+ bounds = dv["bounds"]
+ else:
+ bounds = {"min": dv.get("min", 0), "max": dv.get("max", 1)}
+
+ # Ensure min < max (fix degenerate cases)
+ if bounds["min"] >= bounds["max"]:
+ # Expand bounds slightly around the value
+ val = bounds["min"]
+ if val == 0:
+ bounds = {"min": -0.001, "max": 0.001}
+ else:
+ bounds = {"min": val * 0.99, "max": val * 1.01}
+
+ # Determine type
+ dv_type = dv.get("type", "continuous")
+ if dv_type not in ["continuous", "integer", "categorical"]:
+ dv_type = "continuous"
+
+ new_dv = {
+ "id": f"dv_{self._dv_counter:03d}",
+ "name": dv.get("name", f"param_{self._dv_counter}"),
+ "expression_name": dv.get("expression_name", dv.get("parameter", dv.get("name", ""))),
+ "type": dv_type,
+ "bounds": bounds,
+ "baseline": dv.get("baseline", dv.get("initial")),
+ "units": dv.get("units", dv.get("unit", "")),
+ "enabled": dv.get("enabled", True),
+ "description": dv.get("description", dv.get("notes", "")),
+ "canvas_position": {"x": 50, "y": 100 + (self._dv_counter - 1) * 80}
+ }
+
+ dvs.append(new_dv)
+
+ return dvs
+
+ # =========================================================================
+ # Extractors and Objectives Migration
+ # =========================================================================
+
+ def _migrate_extractors_and_objectives(
+ self,
+ config: Dict,
+ config_type: str
+ ) -> Tuple[List[Dict], List[Dict]]:
+ """
+ Migrate extractors and objectives together.
+
+ Returns tuple of (extractors, objectives).
+ """
+ extractors = []
+ objectives = []
+
+ # Handle mirror/zernike configs
+ if config_type == "mirror" and "zernike_settings" in config:
+ extractor = self._create_zernike_extractor(config)
+ extractors.append(extractor)
+
+ # Create objectives from config
+ for obj in config.get("objectives", []):
+ self._objective_counter += 1
+ objectives.append(self._create_objective(obj, extractor["id"]))
+
+ # Handle structural configs
+ elif config_type == "structural":
+ # Create extractors based on extraction_settings
+ if "extraction_settings" in config:
+ extractor = self._create_structural_extractor(config)
+ extractors.append(extractor)
+ ext_id = extractor["id"]
+ else:
+ # Infer extractors from objectives
+ ext_id = None
+
+ for obj in config.get("objectives", []):
+ self._objective_counter += 1
+
+ # Infer extractor if not yet created
+ if ext_id is None:
+ inferred_type = self._infer_extractor_type(obj.get("name", ""))
+ ext_id = self._get_or_create_extractor(extractors, inferred_type, obj.get("name", ""))
+
+ objectives.append(self._create_objective(obj, ext_id))
+
+ # Handle canvas intent or generic
+ else:
+ # Pass through existing extractors if present
+ for ext in config.get("extractors", []):
+ self._extractor_counter += 1
+ ext_copy = dict(ext)
+ if "id" not in ext_copy:
+ ext_copy["id"] = f"ext_{self._extractor_counter:03d}"
+ extractors.append(ext_copy)
+
+ # Create objectives
+ for obj in config.get("objectives", []):
+ self._objective_counter += 1
+
+ # Find or create extractor
+ ext_id = None
+ if extractors:
+ ext_id = extractors[0]["id"]
+ else:
+ inferred_type = self._infer_extractor_type(obj.get("name", ""))
+ ext_id = self._get_or_create_extractor(extractors, inferred_type, obj.get("name", ""))
+
+ objectives.append(self._create_objective(obj, ext_id))
+
+ return extractors, objectives
+
+ def _create_zernike_extractor(self, config: Dict) -> Dict:
+ """Create a Zernike OPD extractor from config."""
+ self._extractor_counter += 1
+
+ zs = config.get("zernike_settings", {})
+ em = config.get("extraction_method", {})
+
+ # Collect all output names from objectives
+ outputs = []
+ for obj in config.get("objectives", []):
+ obj_name = obj.get("name", "")
+ outputs.append({
+ "name": obj_name,
+ "metric": "filtered_rms_nm"
+ })
+
+ # Get outer radius with sensible default for telescope mirrors
+ outer_radius = em.get("outer_radius", zs.get("outer_radius"))
+ if outer_radius is None:
+ # Default to typical M1 mirror outer radius
+ outer_radius = 500.0
+
+ extractor = {
+ "id": f"ext_{self._extractor_counter:03d}",
+ "name": "Zernike WFE Extractor",
+ "type": "zernike_opd",
+ "builtin": True,
+ "config": {
+ "inner_radius_mm": em.get("inner_radius", zs.get("inner_radius", 0)),
+ "outer_radius_mm": outer_radius,
+ "n_modes": zs.get("n_modes", 40),
+ "filter_low_orders": zs.get("filter_low_orders", 4),
+ "displacement_unit": zs.get("displacement_unit", "mm"),
+ "reference_subcase": int(zs.get("reference_subcase", 1))
+ },
+ "outputs": outputs,
+ "canvas_position": {"x": 740, "y": 100}
+ }
+
+ return extractor
+
+ def _create_structural_extractor(self, config: Dict) -> Dict:
+ """Create extractor from extraction_settings."""
+ self._extractor_counter += 1
+
+ es = config.get("extraction_settings", {})
+
+ # Infer type from extractor class name
+ extractor_class = es.get("extractor_class", "")
+ if "stiffness" in extractor_class.lower():
+ ext_type = "displacement"
+ elif "stress" in extractor_class.lower():
+ ext_type = "stress"
+ elif "frequency" in extractor_class.lower():
+ ext_type = "frequency"
+ else:
+ ext_type = "displacement"
+
+ # Create outputs from objectives
+ outputs = []
+ for obj in config.get("objectives", []):
+ outputs.append({
+ "name": obj.get("name", "output"),
+ "metric": es.get("displacement_aggregation", "max")
+ })
+
+ extractor = {
+ "id": f"ext_{self._extractor_counter:03d}",
+ "name": f"{extractor_class or 'Results'} Extractor",
+ "type": ext_type,
+ "builtin": True,
+ "config": {
+ "result_type": es.get("displacement_component", "z"),
+ "metric": es.get("displacement_aggregation", "max")
+ },
+ "outputs": outputs,
+ "canvas_position": {"x": 740, "y": 100}
+ }
+
+ return extractor
+
+ def _infer_extractor_type(self, objective_name: str) -> str:
+ """Infer extractor type from objective name."""
+ name_lower = objective_name.lower()
+
+ for pattern, ext_type in self.EXTRACTOR_INFERENCE.items():
+ if re.search(pattern, name_lower):
+ return ext_type
+
+ return "displacement" # Default
+
+ def _get_or_create_extractor(
+ self,
+ extractors: List[Dict],
+ ext_type: str,
+ output_name: str
+ ) -> str:
+ """Get existing extractor of type or create new one."""
+ # Look for existing
+ for ext in extractors:
+ if ext.get("type") == ext_type:
+ # Add output if not present
+ output_names = {o["name"] for o in ext.get("outputs", [])}
+ if output_name not in output_names:
+ ext["outputs"].append({"name": output_name, "metric": "total"})
+ return ext["id"]
+
+ # Create new
+ self._extractor_counter += 1
+ ext_id = f"ext_{self._extractor_counter:03d}"
+
+ extractor = {
+ "id": ext_id,
+ "name": f"{ext_type.title()} Extractor",
+ "type": ext_type,
+ "builtin": True,
+ "outputs": [{"name": output_name, "metric": "total"}],
+ "canvas_position": {"x": 740, "y": 100 + (len(extractors)) * 150}
+ }
+
+ extractors.append(extractor)
+ return ext_id
+
+ def _create_objective(self, obj: Dict, extractor_id: str) -> Dict:
+ """Create objective from old format."""
+ # Normalize direction
+ direction = obj.get("direction", obj.get("type", obj.get("goal", "minimize")))
+ if direction not in ["minimize", "maximize"]:
+ direction = "minimize" if "min" in direction.lower() else "maximize"
+
+ obj_name = obj.get("name", f"objective_{self._objective_counter}")
+
+ return {
+ "id": f"obj_{self._objective_counter:03d}",
+ "name": obj.get("description", obj_name),
+ "direction": direction,
+ "weight": obj.get("weight", 1.0),
+ "source": {
+ "extractor_id": extractor_id,
+ "output_name": obj_name
+ },
+ "target": obj.get("target"),
+ "units": obj.get("units", ""),
+ "canvas_position": {"x": 1020, "y": 100 + (self._objective_counter - 1) * 100}
+ }
+
+ # =========================================================================
+ # Constraints Migration
+ # =========================================================================
+
+ def _migrate_constraints(self, config: Dict, extractors: List[Dict]) -> List[Dict]:
+ """Migrate constraints."""
+ constraints = []
+
+ for con in config.get("constraints", []):
+ self._constraint_counter += 1
+
+ # Determine constraint type
+ con_type = con.get("type", "hard")
+ if con_type not in ["hard", "soft"]:
+ # Infer from type field
+ if con_type in ["less_than", "greater_than", "less_equal", "greater_equal"]:
+ con_type = "hard"
+
+ # Determine operator
+ operator = con.get("operator", "<=")
+ old_type = con.get("type", "")
+ if "less" in old_type:
+ operator = "<=" if "equal" in old_type else "<"
+ elif "greater" in old_type:
+ operator = ">=" if "equal" in old_type else ">"
+
+ # Try to parse expression for threshold
+ threshold = con.get("threshold", con.get("value"))
+ if threshold is None and "expression" in con:
+ # Parse from expression like "mass_kg <= 120.0"
+ match = re.search(r'([<>=!]+)\s*([\d.]+)', con["expression"])
+ if match:
+ operator = match.group(1)
+ threshold = float(match.group(2))
+
+ # Find or create extractor for constraint
+ con_name = con.get("name", "constraint")
+ extractor_id = None
+ output_name = con_name
+
+ # Check if name matches existing objective (share extractor)
+ for ext in extractors:
+ for out in ext.get("outputs", []):
+ if con_name.replace("_max", "").replace("_min", "") in out["name"]:
+ extractor_id = ext["id"]
+ output_name = out["name"]
+ break
+ if extractor_id:
+ break
+
+ # If no match, use first extractor or create mass extractor for mass constraints
+ if extractor_id is None:
+ if "mass" in con_name.lower():
+ # Check if mass extractor exists
+ for ext in extractors:
+ if ext.get("type") == "mass":
+ extractor_id = ext["id"]
+ break
+
+ if extractor_id is None:
+ # Create mass extractor
+ ext_id = f"ext_{len(extractors) + 1:03d}"
+ extractors.append({
+ "id": ext_id,
+ "name": "Mass Extractor",
+ "type": "mass",
+ "builtin": True,
+ "outputs": [{"name": "mass_kg", "metric": "total"}],
+ "canvas_position": {"x": 740, "y": 100 + len(extractors) * 150}
+ })
+ extractor_id = ext_id
+ output_name = "mass_kg"
+ elif extractors:
+ extractor_id = extractors[0]["id"]
+ output_name = extractors[0]["outputs"][0]["name"] if extractors[0].get("outputs") else con_name
+
+ constraint = {
+ "id": f"con_{self._constraint_counter:03d}",
+ "name": con.get("description", con_name),
+ "type": con_type if con_type in ["hard", "soft"] else "hard",
+ "operator": operator,
+ "threshold": threshold or 0,
+ "source": {
+ "extractor_id": extractor_id or "ext_001",
+ "output_name": output_name
+ },
+ "penalty_config": {
+ "method": "quadratic",
+ "weight": con.get("penalty_weight", 1000.0)
+ },
+ "canvas_position": {"x": 1020, "y": 400 + (self._constraint_counter - 1) * 100}
+ }
+
+ constraints.append(constraint)
+
+ return constraints
+
+ # =========================================================================
+ # Optimization Migration
+ # =========================================================================
+
+ def _migrate_optimization(self, config: Dict, config_type: str) -> Dict:
+ """Migrate optimization settings."""
+ # Extract from different locations
+ if "optimization" in config:
+ opt = config["optimization"]
+ elif "optimization_settings" in config:
+ opt = config["optimization_settings"]
+ else:
+ opt = {}
+
+ # Normalize algorithm name
+ algo = opt.get("algorithm", opt.get("sampler", "TPE"))
+ algo_map = {
+ "tpe": "TPE",
+ "tpesampler": "TPE",
+ "cma-es": "CMA-ES",
+ "cmaes": "CMA-ES",
+ "nsga-ii": "NSGA-II",
+ "nsgaii": "NSGA-II",
+ "nsga2": "NSGA-II",
+ "random": "RandomSearch",
+ "randomsampler": "RandomSearch",
+ "randomsearch": "RandomSearch",
+ "sat": "SAT_v3",
+ "sat_v3": "SAT_v3",
+ "turbo": "SAT_v3",
+ "gp": "GP-BO",
+ "gp-bo": "GP-BO",
+ "gpbo": "GP-BO",
+ "bo": "GP-BO",
+ "bayesian": "GP-BO"
+ }
+ # Valid algorithm types for schema
+ valid_algorithms = {"TPE", "CMA-ES", "NSGA-II", "RandomSearch", "SAT_v3", "GP-BO"}
+ algo = algo_map.get(algo.lower(), algo)
+ # Fallback to TPE if still invalid
+ if algo not in valid_algorithms:
+ algo = "TPE"
+
+ optimization = {
+ "algorithm": {
+ "type": algo,
+ "config": {}
+ },
+ "budget": {
+ "max_trials": opt.get("n_trials", 100)
+ },
+ "canvas_position": {"x": 1300, "y": 150}
+ }
+
+ # Algorithm-specific config
+ if algo == "CMA-ES":
+ optimization["algorithm"]["config"]["sigma0"] = opt.get("sigma0", 0.3)
+ elif algo == "NSGA-II":
+ optimization["algorithm"]["config"]["population_size"] = opt.get("population_size", 50)
+ elif algo == "TPE":
+ optimization["algorithm"]["config"]["n_startup_trials"] = opt.get("n_startup_trials", 10)
+
+ # Seed
+ if "seed" in opt:
+ optimization["algorithm"]["config"]["seed"] = opt["seed"]
+
+ # Timeout/patience
+ if opt.get("timeout"):
+ optimization["budget"]["max_time_hours"] = opt["timeout"] / 3600
+
+ # SAT/surrogate settings
+ if "sat_settings" in config:
+ sat = config["sat_settings"]
+ optimization["surrogate"] = {
+ "enabled": True,
+ "type": "ensemble",
+ "config": {
+ "n_models": sat.get("n_ensemble_models", 10),
+ "architecture": sat.get("hidden_dims", [256, 128]),
+ "train_every_n_trials": sat.get("retrain_frequency", 20),
+ "min_training_samples": sat.get("min_samples", 30)
+ }
+ }
+
+ return optimization
+
+ # =========================================================================
+ # Workflow Migration
+ # =========================================================================
+
+ def _migrate_workflow(self, config: Dict) -> Dict:
+ """Migrate SAT/turbo workflow settings."""
+ sat = config.get("sat_settings", {})
+
+ exploration_trials = sat.get("min_samples", 30)
+ total_trials = config.get("optimization", {}).get("n_trials", 100)
+
+ return {
+ "stages": [
+ {
+ "id": "stage_exploration",
+ "name": "Design Space Exploration",
+ "algorithm": "RandomSearch",
+ "trials": exploration_trials,
+ "purpose": "Build initial training data for surrogate"
+ },
+ {
+ "id": "stage_optimization",
+ "name": "Surrogate-Assisted Optimization",
+ "algorithm": "SAT_v3",
+ "trials": total_trials - exploration_trials,
+ "purpose": "Neural-accelerated optimization"
+ }
+ ],
+ "transitions": [
+ {
+ "from": "stage_exploration",
+ "to": "stage_optimization",
+ "condition": f"trial_count >= {exploration_trials}"
+ }
+ ]
+ }
+
+ # =========================================================================
+ # Canvas Edge Generation
+ # =========================================================================
+
+ def _generate_edges(self, spec: Dict) -> List[Dict]:
+ """Generate canvas edges connecting nodes."""
+ edges = []
+
+ # DVs -> model
+ for dv in spec.get("design_variables", []):
+ edges.append({"source": dv["id"], "target": "model"})
+
+ # model -> solver
+ edges.append({"source": "model", "target": "solver"})
+
+ # solver -> extractors
+ for ext in spec.get("extractors", []):
+ edges.append({"source": "solver", "target": ext["id"]})
+
+ # extractors -> objectives
+ for obj in spec.get("objectives", []):
+ ext_id = obj.get("source", {}).get("extractor_id")
+ if ext_id:
+ edges.append({"source": ext_id, "target": obj["id"]})
+
+ # extractors -> constraints
+ for con in spec.get("constraints", []):
+ ext_id = con.get("source", {}).get("extractor_id")
+ if ext_id:
+ edges.append({"source": ext_id, "target": con["id"]})
+
+ # objectives -> optimization
+ for obj in spec.get("objectives", []):
+ edges.append({"source": obj["id"], "target": "optimization"})
+
+ # constraints -> optimization
+ for con in spec.get("constraints", []):
+ edges.append({"source": con["id"], "target": "optimization"})
+
+ return edges
+
+
+# ============================================================================
+# Convenience Functions
+# ============================================================================
+
+def migrate_config(
+ old_config: Dict[str, Any],
+ study_name: Optional[str] = None
+) -> Dict[str, Any]:
+ """
+ Migrate old config dict to AtomizerSpec v2.0.
+
+ Args:
+ old_config: Legacy config dict
+ study_name: Override study name
+
+ Returns:
+ AtomizerSpec v2.0 dict
+ """
+ migrator = SpecMigrator()
+ return migrator.migrate(old_config, study_name)
+
+
+def migrate_config_file(
+ config_path: Union[str, Path],
+ output_path: Optional[Union[str, Path]] = None
+) -> Dict[str, Any]:
+ """
+ Migrate a config file to AtomizerSpec v2.0.
+
+ Args:
+ config_path: Path to old config file
+ output_path: Path to save new spec (optional)
+
+ Returns:
+ AtomizerSpec v2.0 dict
+ """
+ migrator = SpecMigrator()
+ return migrator.migrate_file(config_path, output_path)
diff --git a/optimization_engine/extractors/__init__.py b/optimization_engine/extractors/__init__.py
index 24281741..7b0be5df 100644
--- a/optimization_engine/extractors/__init__.py
+++ b/optimization_engine/extractors/__init__.py
@@ -11,6 +11,7 @@ Available extractors:
- SPC Forces: extract_spc_forces, extract_total_reaction_force
- Zernike: extract_zernike_from_op2, ZernikeExtractor (telescope mirrors)
- Part Introspection: introspect_part (comprehensive NX .prt analysis)
+- Custom: CustomExtractorLoader for user-defined Python extractors
Phase 2 Extractors (2025-12-06):
- Principal stress extraction (sigma1, sigma2, sigma3)
@@ -25,6 +26,10 @@ Phase 3 Extractors (2025-12-06):
Phase 4 Extractors (2025-12-19):
- Part Introspection (E12): Comprehensive .prt analysis (expressions, mass, materials, attributes, groups, features)
+
+Phase 5 Extractors (2026-01-17):
+- Custom Extractor Loader: Dynamic loading and execution of user-defined Python extractors
+ from AtomizerSpec v2.0 (sandboxed execution with security validation)
"""
# Zernike extractor for telescope mirror optimization (standard Z-only method)
@@ -119,6 +124,26 @@ from optimization_engine.extractors.introspect_part import (
print_introspection_summary,
)
+# Custom extractor loader (Phase 5) - dynamic Python extractors from AtomizerSpec v2.0
+from optimization_engine.extractors.custom_extractor_loader import (
+ CustomExtractor,
+ CustomExtractorLoader,
+ CustomExtractorContext,
+ ExtractorSecurityError,
+ ExtractorValidationError,
+ load_custom_extractors,
+ execute_custom_extractor,
+ validate_custom_extractor,
+)
+
+# Spec extractor builder - builds extractors from AtomizerSpec
+from optimization_engine.extractors.spec_extractor_builder import (
+ SpecExtractorBuilder,
+ build_extractors_from_spec,
+ get_extractor_outputs,
+ list_available_builtin_extractors,
+)
+
__all__ = [
# Part mass & material (from .prt)
'extract_part_mass_material',
@@ -174,4 +199,18 @@ __all__ = [
'get_expressions_dict',
'get_expression_value',
'print_introspection_summary',
+ # Custom extractor loader (Phase 5)
+ 'CustomExtractor',
+ 'CustomExtractorLoader',
+ 'CustomExtractorContext',
+ 'ExtractorSecurityError',
+ 'ExtractorValidationError',
+ 'load_custom_extractors',
+ 'execute_custom_extractor',
+ 'validate_custom_extractor',
+ # Spec extractor builder
+ 'SpecExtractorBuilder',
+ 'build_extractors_from_spec',
+ 'get_extractor_outputs',
+ 'list_available_builtin_extractors',
]