""" Study Creation Wizard - Comprehensive study setup for Atomizer =============================================================== A powerful, LLM-friendly wizard that automates the complete study creation workflow: 1. Model Introspection - Discover expressions, solutions, and available results 2. Configuration Generation - Build optimization_config.json from user requirements 3. Script Generation - Generate run_optimization.py with proper extractors 4. Documentation Generation - Create README.md, STUDY_REPORT.md, MODEL_INTROSPECTION.md This module is designed to work seamlessly with Claude Code skills. Usage: from optimization_engine.study_wizard import StudyWizard wizard = StudyWizard( study_name="my_optimization", description="Optimize bracket for stiffness and mass" ) # Step 1: Set model files wizard.set_model_files( prt_file="path/to/model.prt", sim_file="path/to/model_sim1.sim" ) # Step 2: Introspect model introspection = wizard.introspect() # Step 3: Add design variables, objectives, constraints wizard.add_design_variable("thickness", bounds=[5, 20], units="mm") wizard.add_objective("mass", goal="minimize", extractor="extract_mass_from_bdf") wizard.add_constraint("max_stress", type="less_than", threshold=250, units="MPa") # Step 4: Generate study wizard.generate() Author: Atomizer Development Team Version: 1.0.0 Last Updated: 2025-12-06 """ from pathlib import Path from typing import Dict, Any, List, Optional, Tuple, Union from dataclasses import dataclass, field from datetime import datetime import json import logging import shutil logger = logging.getLogger(__name__) # ============================================================================= # Data Classes # ============================================================================= @dataclass class DesignVariable: """Design variable specification.""" parameter: str bounds: Tuple[float, float] description: str = "" units: str = "" def to_dict(self) -> Dict[str, Any]: return { "parameter": self.parameter, "bounds": list(self.bounds), "description": self.description or f"{self.parameter} design variable", "units": self.units } @dataclass class Objective: """Optimization objective specification.""" name: str goal: str # "minimize" or "maximize" extractor: str params: Dict[str, Any] = field(default_factory=dict) weight: float = 1.0 description: str = "" def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "goal": self.goal, "weight": self.weight, "description": self.description or f"{self.goal.capitalize()} {self.name}", "extraction": { "action": self.extractor, "domain": "result_extraction", "params": self.params } } @dataclass class Constraint: """Optimization constraint specification.""" name: str type: str # "less_than" or "greater_than" threshold: float extractor: str params: Dict[str, Any] = field(default_factory=dict) description: str = "" units: str = "" def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "type": self.type, "threshold": self.threshold, "description": self.description or f"{self.name} constraint", "extraction": { "action": self.extractor, "domain": "result_extraction", "params": self.params } } @dataclass class IntrospectionResult: """Results from model introspection.""" success: bool expressions: List[Dict[str, Any]] = field(default_factory=list) solutions: List[Dict[str, Any]] = field(default_factory=list) bodies: List[Dict[str, Any]] = field(default_factory=list) mass_properties: Dict[str, Any] = field(default_factory=dict) mesh_info: Dict[str, Any] = field(default_factory=dict) available_results: Dict[str, bool] = field(default_factory=dict) boundary_conditions: List[Dict[str, Any]] = field(default_factory=list) loads: List[Dict[str, Any]] = field(default_factory=list) materials: List[Dict[str, Any]] = field(default_factory=list) subcases: List[int] = field(default_factory=list) error: str = "" def get_expression_names(self) -> List[str]: """Get list of expression names.""" return [e.get('name', '') for e in self.expressions] def get_solution_names(self) -> List[str]: """Get list of solution names.""" return [s.get('name', '') for s in self.solutions] def suggest_design_variables(self) -> List[Dict[str, Any]]: """Suggest potential design variables from expressions.""" suggestions = [] for expr in self.expressions: name = expr.get('name', '') value = expr.get('value') # Skip system/reference expressions if name.startswith('p') and name[1:].isdigit(): continue if 'mass' in name.lower() and 'input' not in name.lower(): continue if value is not None and isinstance(value, (int, float)): # Suggest bounds based on value if value > 0: bounds = (value * 0.5, value * 1.5) else: bounds = (value * 1.5, value * 0.5) suggestions.append({ 'name': name, 'current_value': value, 'suggested_bounds': bounds, 'units': expr.get('unit', ''), 'confidence': 'high' if any(kw in name.lower() for kw in ['thickness', 'angle', 'radius', 'length', 'width', 'height']) else 'medium' }) return suggestions # ============================================================================= # Extractor Catalog # ============================================================================= EXTRACTOR_CATALOG = { # Mass extractors "extract_mass_from_bdf": { "module": "optimization_engine.extractors.bdf_mass_extractor", "function": "extract_mass_from_bdf", "input": ".dat/.bdf", "output_unit": "kg", "description": "Extract total mass from BDF/DAT file" }, "extract_part_mass": { "module": "optimization_engine.extractors.extract_part_mass_material", "function": "extract_part_mass", "input": ".prt", "output_unit": "kg", "description": "Extract mass from NX part file via journal" }, # Displacement extractors "extract_displacement": { "module": "optimization_engine.extractors.extract_displacement", "function": "extract_displacement", "input": ".op2", "output_unit": "mm", "description": "Extract displacement results from OP2" }, # Stress extractors "extract_solid_stress": { "module": "optimization_engine.extractors.extract_von_mises_stress", "function": "extract_solid_stress", "input": ".op2", "output_unit": "MPa", "description": "Extract von Mises stress from OP2" }, "extract_principal_stress": { "module": "optimization_engine.extractors.extract_principal_stress", "function": "extract_principal_stress", "input": ".op2", "output_unit": "MPa", "description": "Extract principal stresses (σ1, σ2, σ3)" }, # Strain energy extractors "extract_strain_energy": { "module": "optimization_engine.extractors.extract_strain_energy", "function": "extract_strain_energy", "input": ".op2", "output_unit": "J", "description": "Extract strain energy from OP2" }, "extract_total_strain_energy": { "module": "optimization_engine.extractors.extract_strain_energy", "function": "extract_total_strain_energy", "input": ".op2", "output_unit": "J", "description": "Extract total strain energy (convenience)" }, # Reaction forces "extract_spc_forces": { "module": "optimization_engine.extractors.extract_spc_forces", "function": "extract_spc_forces", "input": ".op2", "output_unit": "N", "description": "Extract SPC/reaction forces from OP2" }, # Frequency extractors "extract_frequency": { "module": "optimization_engine.extractors.extract_frequency", "function": "extract_frequency", "input": ".op2", "output_unit": "Hz", "description": "Extract natural frequencies from modal analysis" }, "get_first_frequency": { "module": "optimization_engine.extractors.extract_modal_mass", "function": "get_first_frequency", "input": ".f06", "output_unit": "Hz", "description": "Get first natural frequency from F06" }, # Temperature extractors (Phase 3) "extract_temperature": { "module": "optimization_engine.extractors.extract_temperature", "function": "extract_temperature", "input": ".op2", "output_unit": "K/°C", "description": "Extract temperatures from thermal analysis" }, "get_max_temperature": { "module": "optimization_engine.extractors.extract_temperature", "function": "get_max_temperature", "input": ".op2", "output_unit": "K/°C", "description": "Get maximum temperature (convenience)" }, # Modal mass (Phase 3) "extract_modal_mass": { "module": "optimization_engine.extractors.extract_modal_mass", "function": "extract_modal_mass", "input": ".f06", "output_unit": "kg", "description": "Extract modal effective mass from F06" }, # Zernike (optical) "extract_zernike_from_op2": { "module": "optimization_engine.extractors.extract_zernike", "function": "extract_zernike_from_op2", "input": ".op2 + .bdf", "output_unit": "nm", "description": "Extract Zernike coefficients for optical surfaces" } } # ============================================================================= # Protocol Catalog # ============================================================================= PROTOCOL_CATALOG = { "protocol_10_single": { "name": "Single-Objective IMSO", "sampler": "TPESampler", "description": "Adaptive single-objective optimization", "use_when": ["single objective", "maximize or minimize one thing"], "directions": 1 }, "protocol_11_multi": { "name": "Multi-Objective NSGA-II", "sampler": "NSGAIISampler", "description": "Pareto-optimal multi-objective optimization", "use_when": ["multiple objectives", "pareto front", "trade-offs"], "directions": "multiple" } } # ============================================================================= # Study Wizard # ============================================================================= class StudyWizard: """ Comprehensive study creation wizard for Atomizer. This wizard guides the complete study setup process: 1. Model introspection (discover expressions, solutions, results) 2. Configuration generation (optimization_config.json) 3. Script generation (run_optimization.py, reset_study.py) 4. Documentation generation (README.md, STUDY_REPORT.md, MODEL_INTROSPECTION.md) """ def __init__( self, study_name: str, description: str = "", studies_dir: Optional[Path] = None ): """ Initialize study wizard. Args: study_name: Name of the study (used for directory name) description: Human-readable description studies_dir: Base directory for studies (default: project/studies/) """ self.study_name = study_name self.description = description # Set studies directory if studies_dir is None: # Find project root by looking for CLAUDE.md current = Path(__file__).parent while current != current.parent: if (current / "CLAUDE.md").exists(): studies_dir = current / "studies" break current = current.parent else: studies_dir = Path.cwd() / "studies" self.studies_dir = Path(studies_dir) self.study_dir = self.studies_dir / study_name # Model files self.prt_file: Optional[Path] = None self.sim_file: Optional[Path] = None self.fem_file: Optional[Path] = None self.op2_file: Optional[Path] = None # Configuration self.design_variables: List[DesignVariable] = [] self.objectives: List[Objective] = [] self.constraints: List[Constraint] = [] # Introspection results self.introspection: Optional[IntrospectionResult] = None # Settings self.protocol = "protocol_11_multi" # Default to multi-objective self.n_trials = 100 self.timeout_per_trial = 400 self.neural_enabled = False logger.info(f"StudyWizard initialized for '{study_name}'") logger.info(f" Study directory: {self.study_dir}") # ========================================================================= # Model File Management # ========================================================================= def set_model_files( self, prt_file: Union[str, Path], sim_file: Optional[Union[str, Path]] = None, fem_file: Optional[Union[str, Path]] = None, op2_file: Optional[Union[str, Path]] = None ) -> "StudyWizard": """ Set model files for the study. Args: prt_file: Path to NX part file (.prt) sim_file: Path to simulation file (.sim) - auto-detected if None fem_file: Path to FEM file (.fem) - auto-detected if None op2_file: Path to OP2 results file (.op2) - optional Returns: Self for method chaining """ self.prt_file = Path(prt_file) if not self.prt_file.exists(): raise FileNotFoundError(f"Part file not found: {self.prt_file}") # Auto-detect sim file if sim_file: self.sim_file = Path(sim_file) else: # Look for *_sim1.sim or *.sim in same directory prt_dir = self.prt_file.parent sim_candidates = list(prt_dir.glob("*_sim1.sim")) + list(prt_dir.glob("*.sim")) if sim_candidates: self.sim_file = sim_candidates[0] logger.info(f" Auto-detected sim file: {self.sim_file.name}") # Auto-detect fem file if fem_file: self.fem_file = Path(fem_file) elif self.sim_file: fem_candidates = list(self.sim_file.parent.glob("*_fem1.fem")) + list(self.sim_file.parent.glob("*.fem")) if fem_candidates: self.fem_file = fem_candidates[0] logger.info(f" Auto-detected fem file: {self.fem_file.name}") # Set op2 file if provided if op2_file: self.op2_file = Path(op2_file) logger.info(f"Model files set:") logger.info(f" PRT: {self.prt_file}") logger.info(f" SIM: {self.sim_file}") logger.info(f" FEM: {self.fem_file}") return self # ========================================================================= # Model Introspection # ========================================================================= def introspect(self, run_baseline: bool = False) -> IntrospectionResult: """ Introspect model to discover expressions, solutions, and available results. Uses the model_introspection module for comprehensive analysis. Args: run_baseline: If True, run a baseline solve to generate OP2 for introspection Returns: IntrospectionResult with all discovered information """ logger.info("=" * 60) logger.info("MODEL INTROSPECTION") logger.info("=" * 60) try: from optimization_engine.hooks.nx_cad.model_introspection import ( introspect_part, introspect_simulation, introspect_op2 ) except ImportError: logger.warning("Model introspection module not available") self.introspection = IntrospectionResult( success=False, error="Model introspection module not available" ) return self.introspection result = IntrospectionResult(success=True) # Introspect part file if self.prt_file and self.prt_file.exists(): logger.info(f"\nIntrospecting part: {self.prt_file.name}") part_result = introspect_part(str(self.prt_file)) if part_result.get('success'): data = part_result.get('data', {}) result.expressions = data.get('expressions', []) result.bodies = data.get('bodies', []) result.mass_properties = data.get('mass_properties', {}) logger.info(f" Found {len(result.expressions)} expressions") logger.info(f" Found {len(result.bodies)} bodies") if result.mass_properties: logger.info(f" Mass: {result.mass_properties.get('mass', 'N/A')} kg") else: logger.warning(f" Part introspection failed: {part_result.get('error')}") # Introspect simulation file if self.sim_file and self.sim_file.exists(): logger.info(f"\nIntrospecting simulation: {self.sim_file.name}") sim_result = introspect_simulation(str(self.sim_file)) if sim_result.get('success'): data = sim_result.get('data', {}) result.solutions = data.get('solutions', []) result.boundary_conditions = data.get('boundary_conditions', []) result.loads = data.get('loads', []) result.materials = data.get('materials', []) result.mesh_info = data.get('mesh_info', {}) logger.info(f" Found {len(result.solutions)} solutions") logger.info(f" Found {len(result.boundary_conditions)} boundary conditions") logger.info(f" Found {len(result.loads)} loads") else: logger.warning(f" Simulation introspection failed: {sim_result.get('error')}") # Introspect OP2 file if available if self.op2_file and self.op2_file.exists(): logger.info(f"\nIntrospecting OP2: {self.op2_file.name}") op2_result = introspect_op2(str(self.op2_file)) if op2_result.get('success'): data = op2_result.get('data', {}) result.available_results = data.get('available_results', {}) result.subcases = data.get('subcases', []) logger.info(f" Available results: {result.available_results}") logger.info(f" Subcases: {result.subcases}") else: logger.warning(f" OP2 introspection failed: {op2_result.get('error')}") self.introspection = result return result # ========================================================================= # Design Variable, Objective, Constraint Management # ========================================================================= def add_design_variable( self, parameter: str, bounds: Tuple[float, float], description: str = "", units: str = "" ) -> "StudyWizard": """ Add a design variable to the study. Args: parameter: Name of the NX expression to vary bounds: (min, max) bounds for the variable description: Human-readable description units: Units (e.g., "mm", "degrees") Returns: Self for method chaining """ dv = DesignVariable( parameter=parameter, bounds=bounds, description=description, units=units ) self.design_variables.append(dv) logger.info(f"Added design variable: {parameter} [{bounds[0]}, {bounds[1]}] {units}") return self def add_objective( self, name: str, goal: str, extractor: str, params: Optional[Dict[str, Any]] = None, weight: float = 1.0, description: str = "" ) -> "StudyWizard": """ Add an optimization objective. Args: name: Objective name (e.g., "mass", "stiffness") goal: "minimize" or "maximize" extractor: Extractor function name from catalog params: Additional parameters for extractor weight: Weight for multi-objective optimization description: Human-readable description Returns: Self for method chaining """ if extractor not in EXTRACTOR_CATALOG: logger.warning(f"Extractor '{extractor}' not in catalog, proceeding anyway") obj = Objective( name=name, goal=goal, extractor=extractor, params=params or {}, weight=weight, description=description ) self.objectives.append(obj) logger.info(f"Added objective: {goal} {name} (extractor: {extractor})") return self def add_constraint( self, name: str, constraint_type: str, threshold: float, extractor: str, params: Optional[Dict[str, Any]] = None, description: str = "", units: str = "" ) -> "StudyWizard": """ Add an optimization constraint. Args: name: Constraint name (e.g., "max_stress") constraint_type: "less_than" or "greater_than" threshold: Constraint threshold value extractor: Extractor function name from catalog params: Additional parameters for extractor description: Human-readable description units: Units for display Returns: Self for method chaining """ const = Constraint( name=name, type=constraint_type, threshold=threshold, extractor=extractor, params=params or {}, description=description, units=units ) self.constraints.append(const) logger.info(f"Added constraint: {name} {constraint_type} {threshold} {units}") return self # ========================================================================= # Settings # ========================================================================= def set_protocol(self, protocol: str) -> "StudyWizard": """Set optimization protocol.""" if protocol not in PROTOCOL_CATALOG: raise ValueError(f"Unknown protocol: {protocol}. Available: {list(PROTOCOL_CATALOG.keys())}") self.protocol = protocol return self def set_trials(self, n_trials: int) -> "StudyWizard": """Set number of optimization trials.""" self.n_trials = n_trials return self def enable_neural(self, enabled: bool = True) -> "StudyWizard": """Enable/disable neural acceleration.""" self.neural_enabled = enabled return self # ========================================================================= # Generation # ========================================================================= def generate(self, copy_model_files: bool = True) -> Dict[str, Path]: """ Generate complete study structure. Creates: - Study directory structure - optimization_config.json - workflow_config.json - run_optimization.py - reset_study.py - README.md - STUDY_REPORT.md - MODEL_INTROSPECTION.md Args: copy_model_files: If True, copy model files to study directory Returns: Dict of generated file paths """ logger.info("=" * 60) logger.info("GENERATING STUDY") logger.info("=" * 60) # Validate if not self.design_variables: raise ValueError("At least one design variable is required") if not self.objectives: raise ValueError("At least one objective is required") # Create directory structure setup_dir = self.study_dir / "1_setup" model_dir = setup_dir / "model" results_dir = self.study_dir / "2_results" setup_dir.mkdir(parents=True, exist_ok=True) model_dir.mkdir(exist_ok=True) results_dir.mkdir(exist_ok=True) logger.info(f"Created study directory: {self.study_dir}") generated_files = {} # Copy model files if copy_model_files and self.prt_file: self._copy_model_files(model_dir) # Generate optimization_config.json config_path = setup_dir / "optimization_config.json" self._generate_config(config_path) generated_files['optimization_config'] = config_path # Generate workflow_config.json workflow_path = setup_dir / "workflow_config.json" self._generate_workflow_config(workflow_path) generated_files['workflow_config'] = workflow_path # Generate run_optimization.py run_script_path = self.study_dir / "run_optimization.py" self._generate_run_script(run_script_path) generated_files['run_optimization'] = run_script_path # Generate reset_study.py reset_script_path = self.study_dir / "reset_study.py" self._generate_reset_script(reset_script_path) generated_files['reset_study'] = reset_script_path # Generate documentation readme_path = self.study_dir / "README.md" self._generate_readme(readme_path) generated_files['readme'] = readme_path report_path = self.study_dir / "STUDY_REPORT.md" self._generate_study_report(report_path) generated_files['study_report'] = report_path introspection_path = self.study_dir / "MODEL_INTROSPECTION.md" self._generate_introspection_report(introspection_path) generated_files['model_introspection'] = introspection_path logger.info("=" * 60) logger.info("STUDY GENERATION COMPLETE") logger.info("=" * 60) logger.info(f"\nGenerated files:") for name, path in generated_files.items(): logger.info(f" {name}: {path}") logger.info(f"\nNext steps:") logger.info(f" 1. Review generated files") logger.info(f" 2. cd {self.study_dir}") logger.info(f" 3. python run_optimization.py --discover") logger.info(f" 4. python run_optimization.py --validate") logger.info(f" 5. python run_optimization.py --run --trials {self.n_trials}") return generated_files def _copy_model_files(self, model_dir: Path): """Copy model files to study directory.""" logger.info("Copying model files...") files_to_copy = [self.prt_file, self.sim_file, self.fem_file] for src in files_to_copy: if src and src.exists(): dst = model_dir / src.name if not dst.exists(): shutil.copy2(src, dst) logger.info(f" Copied: {src.name}") else: logger.info(f" Already exists: {src.name}") def _generate_config(self, path: Path): """Generate optimization_config.json.""" logger.info(f"Generating: {path.name}") # Determine simulation files model_name = self.prt_file.stem if self.prt_file else "model" sim_name = self.sim_file.stem if self.sim_file else f"{model_name}_sim1" # Infer dat and op2 file names dat_file = f"{sim_name.lower()}-solution_1.dat" op2_file = f"{sim_name.lower()}-solution_1.op2" # Determine sampler from protocol sampler = PROTOCOL_CATALOG.get(self.protocol, {}).get('sampler', 'NSGAIISampler') config = { "study_name": self.study_name, "description": self.description, "engineering_context": f"Generated by StudyWizard on {datetime.now().strftime('%Y-%m-%d %H:%M')}", "template_info": { "category": "structural", "analysis_type": "static", "typical_applications": [], "neural_enabled": self.neural_enabled }, "optimization_settings": { "protocol": self.protocol, "n_trials": self.n_trials, "sampler": sampler, "pruner": None, "timeout_per_trial": self.timeout_per_trial }, "design_variables": [dv.to_dict() for dv in self.design_variables], "objectives": [obj.to_dict() for obj in self.objectives], "constraints": [const.to_dict() for const in self.constraints], "simulation": { "model_file": self.prt_file.name if self.prt_file else "", "sim_file": self.sim_file.name if self.sim_file else "", "fem_file": self.fem_file.name if self.fem_file else "", "solver": "nastran", "analysis_types": ["static"], "solution_name": "Solution 1", "dat_file": dat_file, "op2_file": op2_file }, "result_extraction": self._build_extraction_config(), "reporting": { "generate_plots": True, "save_incremental": True, "llm_summary": True, "generate_pareto_front": len(self.objectives) > 1 }, "neural_acceleration": { "enabled": self.neural_enabled, "min_training_points": 50, "auto_train": True, "epochs": 100, "validation_split": 0.2 } } with open(path, 'w') as f: json.dump(config, f, indent=2) def _build_extraction_config(self) -> Dict[str, Any]: """Build result_extraction section of config.""" extraction = {} # Add extractors for objectives for obj in self.objectives: extractor_info = EXTRACTOR_CATALOG.get(obj.extractor, {}) extraction[obj.name] = { "method": obj.extractor, "extractor_module": extractor_info.get('module', ''), "function": extractor_info.get('function', obj.extractor), "output_unit": extractor_info.get('output_unit', '') } # Add extractors for constraints for const in self.constraints: if const.name not in extraction: extractor_info = EXTRACTOR_CATALOG.get(const.extractor, {}) extraction[const.name] = { "method": const.extractor, "extractor_module": extractor_info.get('module', ''), "function": extractor_info.get('function', const.extractor), "output_unit": extractor_info.get('output_unit', const.units) } return extraction def _generate_workflow_config(self, path: Path): """Generate workflow_config.json.""" logger.info(f"Generating: {path.name}") config = { "workflow_id": f"{self.study_name}_workflow", "description": f"Workflow for {self.study_name}", "steps": [] } with open(path, 'w') as f: json.dump(config, f, indent=2) def _generate_run_script(self, path: Path): """Generate run_optimization.py script.""" logger.info(f"Generating: {path.name}") # Build import statements for extractors extractor_imports = set() for obj in self.objectives: info = EXTRACTOR_CATALOG.get(obj.extractor, {}) if info.get('module'): extractor_imports.add(f"from {info['module']} import {info.get('function', obj.extractor)}") for const in self.constraints: info = EXTRACTOR_CATALOG.get(const.extractor, {}) if info.get('module'): extractor_imports.add(f"from {info['module']} import {info.get('function', const.extractor)}") # Determine if multi-objective is_multi = len(self.objectives) > 1 sampler = PROTOCOL_CATALOG.get(self.protocol, {}).get('sampler', 'NSGAIISampler') # Build objective function extraction code extraction_code = self._build_extraction_code() # Build return statement if is_multi: returns = ", ".join([f"obj_{obj.name}" for obj in self.objectives]) return_stmt = f"return ({returns})" else: obj = self.objectives[0] return_stmt = f"return obj_{obj.name}" script = f'''""" {self.study_name} - Optimization Script {"=" * 60} {self.description} Protocol: {PROTOCOL_CATALOG.get(self.protocol, {}).get('name', self.protocol)} Staged Workflow: ---------------- 1. DISCOVER: python run_optimization.py --discover 2. VALIDATE: python run_optimization.py --validate 3. TEST: python run_optimization.py --test 4. RUN: python run_optimization.py --run --trials {self.n_trials} Generated by StudyWizard on {datetime.now().strftime('%Y-%m-%d %H:%M')} """ from pathlib import Path import sys import json import argparse from datetime import datetime from typing import Optional, Tuple, List # Add parent directory to path project_root = Path(__file__).resolve().parents[2] sys.path.insert(0, str(project_root)) import optuna from optuna.samplers import {sampler} # Core imports from optimization_engine.nx_solver import NXSolver from optimization_engine.logger import get_logger # Extractor imports {chr(10).join(sorted(extractor_imports))} def load_config(config_file: Path) -> dict: """Load configuration from JSON file.""" with open(config_file, 'r') as f: return json.load(f) def clean_nastran_files(model_dir: Path, logger) -> List[Path]: """Remove old Nastran solver output files.""" patterns = ['*.op2', '*.f06', '*.log', '*.f04', '*.pch', '*.DBALL', '*.MASTER', '_temp*.txt'] deleted = [] for pattern in patterns: for f in model_dir.glob(pattern): try: f.unlink() deleted.append(f) logger.info(f" Deleted: {{f.name}}") except Exception as e: logger.warning(f" Failed to delete {{f.name}}: {{e}}") return deleted def objective(trial: optuna.Trial, config: dict, nx_solver: NXSolver, model_dir: Path, logger) -> {"Tuple[" + ", ".join(["float"] * len(self.objectives)) + "]" if is_multi else "float"}: """ Objective function for optimization. {"Returns tuple of objectives for multi-objective optimization." if is_multi else "Returns single objective value."} """ # Sample design variables design_vars = {{}} for var in config['design_variables']: param_name = var['parameter'] bounds = var['bounds'] design_vars[param_name] = trial.suggest_float(param_name, bounds[0], bounds[1]) logger.trial_start(trial.number, design_vars) try: # Get file paths sim_file = model_dir / config['simulation']['sim_file'] # Run FEA simulation result = nx_solver.run_simulation( sim_file=sim_file, working_dir=model_dir, expression_updates=design_vars, solution_name=config['simulation'].get('solution_name'), cleanup=True ) if not result['success']: logger.trial_failed(trial.number, f"Simulation failed: {{result.get('error', 'Unknown')}}") return {"(" + ", ".join(["float('inf')"] * len(self.objectives)) + ")" if is_multi else "float('inf')"} op2_file = result['op2_file'] dat_file = model_dir / config['simulation']['dat_file'] {extraction_code} # Check constraints feasible = True constraint_results = {{}} {self._build_constraint_check_code()} # Set user attributes {self._build_user_attrs_code()} trial.set_user_attr('feasible', feasible) objectives = {{{", ".join([f"'{obj.name}': obj_{obj.name}" for obj in self.objectives])}}} logger.trial_complete(trial.number, objectives, constraint_results, feasible) {return_stmt} except Exception as e: logger.trial_failed(trial.number, str(e)) return {"(" + ", ".join(["float('inf')"] * len(self.objectives)) + ")" if is_multi else "float('inf')"} def main(): """Main optimization workflow.""" parser = argparse.ArgumentParser(description='{self.study_name}') stage_group = parser.add_mutually_exclusive_group() stage_group.add_argument('--discover', action='store_true', help='Discover model outputs') stage_group.add_argument('--validate', action='store_true', help='Run single validation trial') stage_group.add_argument('--test', action='store_true', help='Run 3-trial test') stage_group.add_argument('--run', action='store_true', help='Run optimization') parser.add_argument('--trials', type=int, default={self.n_trials}, help='Number of trials') parser.add_argument('--resume', action='store_true', help='Resume existing study') parser.add_argument('--clean', action='store_true', help='Clean old files first') args = parser.parse_args() if not any([args.discover, args.validate, args.test, args.run]): print("No stage specified. Use --discover, --validate, --test, or --run") return 1 # Setup paths study_dir = Path(__file__).parent config_path = study_dir / "1_setup" / "optimization_config.json" model_dir = study_dir / "1_setup" / "model" results_dir = study_dir / "2_results" results_dir.mkdir(exist_ok=True) study_name = "{self.study_name}" # Initialize logger = get_logger(study_name, study_dir=results_dir) config = load_config(config_path) nx_solver = NXSolver() if args.clean: clean_nastran_files(model_dir, logger) # Run appropriate stage if args.discover or args.validate or args.test: # Run limited trials for these stages n = 1 if args.discover or args.validate else 3 storage = f"sqlite:///{{results_dir / 'study_test.db'}}" study = optuna.create_study( study_name=f"{{study_name}}_test", storage=storage, sampler={sampler}({"population_size=5, seed=42" if is_multi else "seed=42"}), {"directions=['minimize'] * " + str(len(self.objectives)) if is_multi else "direction='minimize'"}, load_if_exists=False ) study.optimize( lambda trial: objective(trial, config, nx_solver, model_dir, logger), n_trials=n, show_progress_bar=True ) logger.info(f"Completed {{len(study.trials)}} trial(s)") return 0 # Full optimization run storage = f"sqlite:///{{results_dir / 'study.db'}}" if args.resume: study = optuna.load_study( study_name=study_name, storage=storage, sampler={sampler}({"population_size=20, seed=42" if is_multi else "seed=42"}) ) else: study = optuna.create_study( study_name=study_name, storage=storage, sampler={sampler}({"population_size=20, seed=42" if is_multi else "seed=42"}), {"directions=['minimize'] * " + str(len(self.objectives)) if is_multi else "direction='minimize'"}, load_if_exists=True ) logger.study_start(study_name, args.trials, "{sampler}") study.optimize( lambda trial: objective(trial, config, nx_solver, model_dir, logger), n_trials=args.trials, show_progress_bar=True ) n_complete = len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]) logger.study_complete(study_name, len(study.trials), n_complete) # Report results {"pareto_trials = study.best_trials" if is_multi else "best_trial = study.best_trial"} logger.info(f"\\nOptimization Complete!") logger.info(f"Total trials: {{len(study.trials)}}") logger.info(f"Successful: {{n_complete}}") return 0 if __name__ == "__main__": exit(main()) ''' with open(path, 'w') as f: f.write(script) def _build_extraction_code(self) -> str: """Build extraction code for objective function.""" lines = [] lines.append(" # Extract results") for obj in self.objectives: info = EXTRACTOR_CATALOG.get(obj.extractor, {}) func = info.get('function', obj.extractor) # Determine source file if '.dat' in info.get('input', '') or '.bdf' in info.get('input', ''): source = "dat_file" else: source = "op2_file" # Build extraction call if 'displacement' in obj.extractor.lower(): lines.append(f" disp_result = {func}({source}, subcase=1)") lines.append(f" max_displacement = disp_result['max_displacement']") if obj.goal == 'maximize': lines.append(f" # For stiffness maximization, use inverse of displacement") lines.append(f" applied_force = 1000.0 # N - adjust based on your model") lines.append(f" obj_{obj.name} = -applied_force / max(abs(max_displacement), 1e-6)") else: lines.append(f" obj_{obj.name} = max_displacement") elif 'mass' in obj.extractor.lower(): lines.append(f" obj_{obj.name} = {func}(str({source}))") if obj.goal == 'maximize': lines.append(f" obj_{obj.name} = -obj_{obj.name} # Negate for maximization") elif 'stress' in obj.extractor.lower(): lines.append(f" stress_result = {func}({source}, subcase=1)") lines.append(f" obj_{obj.name} = stress_result.get('max_von_mises', float('inf'))") if obj.goal == 'maximize': lines.append(f" obj_{obj.name} = -obj_{obj.name} # Negate for maximization") else: # Generic extraction lines.append(f" obj_{obj.name} = {func}({source})") if obj.goal == 'maximize': lines.append(f" obj_{obj.name} = -obj_{obj.name} # Negate for maximization") lines.append(f" logger.info(f' {obj.name}: {{obj_{obj.name}}}')") lines.append("") return "\n".join(lines) def _build_constraint_check_code(self) -> str: """Build constraint checking code.""" if not self.constraints: return " pass # No constraints defined" lines = [] for const in self.constraints: lines.append(f" # Check {const.name}") # Get the value to check (may need extraction) if any(obj.name == const.name for obj in self.objectives): # Already extracted as objective value_var = f"obj_{const.name}" else: # Need to extract info = EXTRACTOR_CATALOG.get(const.extractor, {}) func = info.get('function', const.extractor) source = "dat_file" if '.dat' in info.get('input', '') else "op2_file" lines.append(f" const_{const.name} = {func}({source})") value_var = f"const_{const.name}" lines.append(f" constraint_results['{const.name}'] = {value_var}") if const.type == "less_than": lines.append(f" if {value_var} > {const.threshold}:") else: lines.append(f" if {value_var} < {const.threshold}:") lines.append(f" feasible = False") lines.append(f" logger.warning(f' Constraint violation: {const.name} = {{{value_var}}} vs {const.threshold}')") lines.append("") return "\n".join(lines) def _build_user_attrs_code(self) -> str: """Build user attributes setting code.""" lines = [] for obj in self.objectives: lines.append(f" trial.set_user_attr('{obj.name}', obj_{obj.name})") return "\n".join(lines) def _generate_reset_script(self, path: Path): """Generate reset_study.py script.""" logger.info(f"Generating: {path.name}") script = f'''""" Reset study - Delete results database and logs. Usage: python reset_study.py python reset_study.py --confirm # Skip confirmation """ from pathlib import Path import shutil def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument('--confirm', action='store_true', help='Skip confirmation') args = parser.parse_args() study_dir = Path(__file__).parent results_dir = study_dir / "2_results" if not args.confirm: print(f"This will delete all results in: {{results_dir}}") response = input("Are you sure? (y/N): ") if response.lower() != 'y': print("Cancelled.") return # Delete database files for f in results_dir.glob("*.db"): f.unlink() print(f"Deleted: {{f.name}}") # Delete log files for f in results_dir.glob("*.log"): f.unlink() print(f"Deleted: {{f.name}}") # Delete JSON results for f in results_dir.glob("*.json"): f.unlink() print(f"Deleted: {{f.name}}") print("Study reset complete.") if __name__ == "__main__": main() ''' with open(path, 'w') as f: f.write(script) def _generate_readme(self, path: Path): """Generate README.md documentation.""" logger.info(f"Generating: {path.name}") # Build design variables table dv_table = "| Parameter | Bounds | Units | Description |\n|-----------|--------|-------|-------------|\n" for dv in self.design_variables: dv_table += f"| `{dv.parameter}` | [{dv.bounds[0]}, {dv.bounds[1]}] | {dv.units} | {dv.description} |\n" # Build objectives table obj_table = "| Objective | Goal | Extractor | Weight |\n|-----------|------|-----------|--------|\n" for obj in self.objectives: obj_table += f"| {obj.name} | {obj.goal} | `{obj.extractor}` | {obj.weight} |\n" # Build constraints table const_table = "| Constraint | Type | Threshold | Units |\n|------------|------|-----------|-------|\n" for const in self.constraints: const_table += f"| {const.name} | {const.type} | {const.threshold} | {const.units} |\n" protocol_info = PROTOCOL_CATALOG.get(self.protocol, {}) readme = f'''# {self.study_name} {self.description} **Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M')} **Protocol**: {protocol_info.get('name', self.protocol)} **Trials**: {self.n_trials} --- ## 1. Engineering Problem {self.description} --- ## 2. Mathematical Formulation ### Design Variables {dv_table} ### Objectives {obj_table} ### Constraints {const_table if self.constraints else "No constraints defined."} --- ## 3. Optimization Algorithm - **Protocol**: {self.protocol} - **Sampler**: {protocol_info.get('sampler', 'TPESampler')} - **Trials**: {self.n_trials} - **Neural Acceleration**: {"Enabled" if self.neural_enabled else "Disabled"} --- ## 4. Simulation Pipeline ``` Design Variables → NX Expression Update → Nastran Solve → Result Extraction → Objective Evaluation ``` --- ## 5. Result Extraction Methods | Result | Extractor | Source | |--------|-----------|--------| {chr(10).join([f"| {obj.name} | `{obj.extractor}` | OP2/DAT |" for obj in self.objectives])} --- ## 6. Study File Structure ``` {self.study_name}/ ├── 1_setup/ │ ├── model/ │ │ ├── {self.prt_file.name if self.prt_file else "Model.prt"} │ │ ├── {self.sim_file.name if self.sim_file else "Model_sim1.sim"} │ │ └── {self.fem_file.name if self.fem_file else "Model_fem1.fem"} │ ├── optimization_config.json │ └── workflow_config.json ├── 2_results/ │ ├── study.db │ └── optimization.log ├── run_optimization.py ├── reset_study.py ├── README.md ├── STUDY_REPORT.md └── MODEL_INTROSPECTION.md ``` --- ## 7. Quick Start ```bash # 1. Discover model outputs python run_optimization.py --discover # 2. Validate setup with single trial python run_optimization.py --validate # 3. Run integration test (3 trials) python run_optimization.py --test # 4. Run full optimization python run_optimization.py --run --trials {self.n_trials} # 5. Resume if interrupted python run_optimization.py --run --trials 50 --resume ``` --- ## 8. Results Location | File | Description | |------|-------------| | `2_results/study.db` | Optuna SQLite database | | `2_results/optimization.log` | Structured log file | | `2_results/pareto_front.json` | Pareto-optimal solutions | --- ## 9. References - [Atomizer Documentation](../../docs/) - [Protocol {self.protocol}](../../docs/protocols/system/) - [Extractor Library](../../docs/protocols/system/SYS_12_EXTRACTOR_LIBRARY.md) ''' with open(path, 'w') as f: f.write(readme) def _generate_study_report(self, path: Path): """Generate STUDY_REPORT.md template.""" logger.info(f"Generating: {path.name}") report = f'''# Study Report: {self.study_name} **Status**: Not Started **Created**: {datetime.now().strftime('%Y-%m-%d %H:%M')} **Last Updated**: {datetime.now().strftime('%Y-%m-%d %H:%M')} --- ## 1. Optimization Progress | Metric | Value | |--------|-------| | Total Trials | 0 | | Successful Trials | 0 | | Best Objective | - | | Duration | - | --- ## 2. Best Solutions *No optimization runs completed yet.* --- ## 3. Pareto Front (if multi-objective) *No Pareto front generated yet.* --- ## 4. Design Variable Sensitivity *Analysis pending optimization runs.* --- ## 5. Constraint Satisfaction *Analysis pending optimization runs.* --- ## 6. Recommendations *Recommendations will be added after optimization runs.* --- ## 7. Next Steps 1. [ ] Run `python run_optimization.py --discover` 2. [ ] Run `python run_optimization.py --validate` 3. [ ] Run `python run_optimization.py --test` 4. [ ] Run `python run_optimization.py --run --trials {self.n_trials}` 5. [ ] Analyze results and update this report --- *Generated by StudyWizard* ''' with open(path, 'w') as f: f.write(report) def _generate_introspection_report(self, path: Path): """Generate MODEL_INTROSPECTION.md report.""" logger.info(f"Generating: {path.name}") # Build expressions table if self.introspection and self.introspection.expressions: expr_table = "| Name | Value | Unit | Optimization Candidate |\n|------|-------|------|------------------------|\n" for expr in self.introspection.expressions[:20]: # Limit to 20 name = expr.get('name', '') value = expr.get('value', 'N/A') unit = expr.get('unit', '') candidate = "✓ High" if any(kw in name.lower() for kw in ['thickness', 'angle', 'radius', 'length']) else "Medium" expr_table += f"| {name} | {value} | {unit} | {candidate} |\n" else: expr_table = "*Run introspection to discover expressions.*" # Build solutions table if self.introspection and self.introspection.solutions: sol_table = "| Solution | Type | Status |\n|----------|------|--------|\n" for sol in self.introspection.solutions: sol_table += f"| {sol.get('name', 'Unknown')} | {sol.get('type', 'Static')} | ✓ Active |\n" else: sol_table = "*Run introspection to discover solutions.*" report = f'''# Model Introspection Report **Study**: {self.study_name} **Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M')} **Introspection Version**: 1.0 --- ## 1. Files Discovered | Type | File | Status | |------|------|--------| | Part (.prt) | {self.prt_file.name if self.prt_file else "Not set"} | {"✓ Found" if self.prt_file and self.prt_file.exists() else "❌ Missing"} | | Simulation (.sim) | {self.sim_file.name if self.sim_file else "Not set"} | {"✓ Found" if self.sim_file and self.sim_file.exists() else "❌ Missing"} | | FEM (.fem) | {self.fem_file.name if self.fem_file else "Not set"} | {"✓ Found" if self.fem_file and self.fem_file.exists() else "⚠ Will be created"} | --- ## 2. Expressions (Potential Design Variables) {expr_table} --- ## 3. Solutions {sol_table} --- ## 4. Available Results | Result Type | Available | Subcases | |-------------|-----------|----------| | Displacement | {"✓" if self.introspection and self.introspection.available_results.get('displacement') else "?"} | - | | Stress | {"✓" if self.introspection and self.introspection.available_results.get('stress') else "?"} | - | | SPC Forces | {"✓" if self.introspection and self.introspection.available_results.get('spc_forces') else "?"} | - | --- ## 5. Optimization Configuration ### Selected Design Variables {chr(10).join([f"- `{dv.parameter}`: [{dv.bounds[0]}, {dv.bounds[1]}] {dv.units}" for dv in self.design_variables]) if self.design_variables else "*No design variables configured yet.*"} ### Selected Objectives {chr(10).join([f"- {obj.goal.capitalize()} `{obj.name}` using `{obj.extractor}`" for obj in self.objectives]) if self.objectives else "*No objectives configured yet.*"} ### Selected Constraints {chr(10).join([f"- `{c.name}` {c.type} {c.threshold} {c.units}" for c in self.constraints]) if self.constraints else "*No constraints configured.*"} --- *Ready to create optimization study? Run `python run_optimization.py --discover` to proceed.* ''' with open(path, 'w') as f: f.write(report) # ============================================================================= # Convenience Functions # ============================================================================= def create_study( study_name: str, description: str, prt_file: Union[str, Path], design_variables: List[Dict[str, Any]], objectives: List[Dict[str, Any]], constraints: Optional[List[Dict[str, Any]]] = None, n_trials: int = 100, protocol: str = "protocol_11_multi" ) -> Dict[str, Path]: """ Convenience function to create a complete study in one call. Args: study_name: Name of the study description: Human-readable description prt_file: Path to NX part file design_variables: List of design variable dicts with keys: - parameter: str - bounds: [min, max] - units: str (optional) - description: str (optional) objectives: List of objective dicts with keys: - name: str - goal: "minimize" or "maximize" - extractor: str - params: dict (optional) constraints: List of constraint dicts with keys: - name: str - type: "less_than" or "greater_than" - threshold: float - extractor: str - units: str (optional) n_trials: Number of optimization trials protocol: Optimization protocol Returns: Dict of generated file paths Example: create_study( study_name="bracket_opt", description="Optimize bracket for stiffness", prt_file="Bracket.prt", design_variables=[ {"parameter": "thickness", "bounds": [5, 20], "units": "mm"} ], objectives=[ {"name": "stiffness", "goal": "maximize", "extractor": "extract_displacement"} ], constraints=[ {"name": "mass", "type": "less_than", "threshold": 0.5, "extractor": "extract_mass_from_bdf", "units": "kg"} ] ) """ wizard = StudyWizard(study_name, description) wizard.set_model_files(prt_file) wizard.set_protocol(protocol) wizard.set_trials(n_trials) # Add design variables for dv in design_variables: wizard.add_design_variable( parameter=dv['parameter'], bounds=tuple(dv['bounds']), units=dv.get('units', ''), description=dv.get('description', '') ) # Add objectives for obj in objectives: wizard.add_objective( name=obj['name'], goal=obj['goal'], extractor=obj['extractor'], params=obj.get('params', {}), weight=obj.get('weight', 1.0), description=obj.get('description', '') ) # Add constraints for const in (constraints or []): wizard.add_constraint( name=const['name'], constraint_type=const['type'], threshold=const['threshold'], extractor=const['extractor'], params=const.get('params', {}), units=const.get('units', ''), description=const.get('description', '') ) # Run introspection if model files exist try: wizard.introspect() except Exception as e: logger.warning(f"Introspection failed: {e}") return wizard.generate() def list_extractors() -> Dict[str, Dict[str, Any]]: """Return the extractor catalog.""" return EXTRACTOR_CATALOG.copy() def list_protocols() -> Dict[str, Dict[str, Any]]: """Return the protocol catalog.""" return PROTOCOL_CATALOG.copy() # ============================================================================= # CLI # ============================================================================= def main(): """CLI for study wizard.""" import argparse parser = argparse.ArgumentParser(description="Atomizer Study Creation Wizard") parser.add_argument('--name', required=True, help='Study name') parser.add_argument('--description', default='', help='Study description') parser.add_argument('--prt', required=True, help='Path to PRT file') parser.add_argument('--list-extractors', action='store_true', help='List available extractors') args = parser.parse_args() if args.list_extractors: print("\nAvailable Extractors:") print("=" * 60) for name, info in EXTRACTOR_CATALOG.items(): print(f"\n{name}:") print(f" Input: {info.get('input', 'N/A')}") print(f" Output: {info.get('output_unit', 'N/A')}") print(f" {info.get('description', '')}") return 0 # Interactive wizard would go here print(f"\nStudy Wizard initialized for: {args.name}") print("Use the Python API for full functionality.") return 0 if __name__ == "__main__": exit(main())