diff --git a/examples/run_optimization.py b/examples/run_optimization.py new file mode 100644 index 00000000..7d998553 --- /dev/null +++ b/examples/run_optimization.py @@ -0,0 +1,206 @@ +""" +Example: Running Complete Optimization + +This example demonstrates the complete optimization workflow: +1. Load optimization configuration +2. Update NX model parameters +3. Run simulation (dummy for now - would call NX solver) +4. Extract results from OP2 +5. Optimize with Optuna + +For a real run, you would need: +- pyNastran installed for OP2 extraction +- NX solver accessible to run simulations +""" + +from pathlib import Path +import sys + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from optimization_engine.runner import OptimizationRunner +from optimization_engine.nx_updater import update_nx_model + + +# ================================================== +# STEP 1: Define model updater function +# ================================================== +def bracket_model_updater(design_vars: dict): + """ + Update the bracket model with new design variable values. + + Args: + design_vars: Dict like {'tip_thickness': 22.5, 'support_angle': 35.0} + """ + prt_file = project_root / "examples/bracket/Bracket.prt" + + print(f"\n[MODEL UPDATE] Updating {prt_file.name} with:") + for name, value in design_vars.items(): + print(f" {name} = {value:.4f}") + + # Update the .prt file with new parameter values + update_nx_model(prt_file, design_vars, backup=False) + + print("[MODEL UPDATE] Complete") + + +# ================================================== +# STEP 2: Define simulation runner function +# ================================================== +def bracket_simulation_runner() -> Path: + """ + Run NX simulation and return path to result files. + + In a real implementation, this would: + 1. Open NX (or use batch mode) + 2. Update the .sim file + 3. Run the solver + 4. Wait for completion + 5. Return path to .op2 file + + For now, we return the path to existing results. + """ + print("\n[SIMULATION] Running NX Nastran solver...") + print("[SIMULATION] (Using existing results for demonstration)") + + # In real use, this would run the actual solver + # For now, return path to existing OP2 file + result_file = project_root / "examples/bracket/bracket_sim1-solution_1.op2" + + if not result_file.exists(): + raise FileNotFoundError(f"Result file not found: {result_file}") + + print(f"[SIMULATION] Results: {result_file.name}") + return result_file + + +# ================================================== +# STEP 3: Define result extractors (dummy versions) +# ================================================== +def dummy_mass_extractor(result_path: Path) -> dict: + """ + Dummy mass extractor. + In real use, would call: from optimization_engine.result_extractors.extractors import mass_extractor + """ + import random + # Simulate varying mass based on a simple model + # In reality, this would extract from OP2 + base_mass = 0.45 # kg + variation = random.uniform(-0.05, 0.05) + + return { + 'total_mass': base_mass + variation, + 'cg_x': 0.0, + 'cg_y': 0.0, + 'cg_z': 0.0, + 'units': 'kg' + } + + +def dummy_stress_extractor(result_path: Path) -> dict: + """ + Dummy stress extractor. + In real use, would call: from optimization_engine.result_extractors.extractors import stress_extractor + """ + import random + # Simulate stress results + base_stress = 180.0 # MPa + variation = random.uniform(-30.0, 30.0) + + return { + 'max_von_mises': base_stress + variation, + 'stress_type': 'von_mises', + 'element_id': 1234, + 'units': 'MPa' + } + + +def dummy_displacement_extractor(result_path: Path) -> dict: + """ + Dummy displacement extractor. + In real use, would call: from optimization_engine.result_extractors.extractors import displacement_extractor + """ + import random + # Simulate displacement results + base_disp = 0.9 # mm + variation = random.uniform(-0.2, 0.2) + + return { + 'max_displacement': base_disp + variation, + 'max_node_id': 5678, + 'dx': 0.0, + 'dy': 0.0, + 'dz': base_disp + variation, + 'units': 'mm' + } + + +# ================================================== +# MAIN: Run optimization +# ================================================== +if __name__ == "__main__": + print("="*60) + print("ATOMIZER - OPTIMIZATION EXAMPLE") + print("="*60) + + # Path to optimization configuration + config_path = project_root / "examples/bracket/optimization_config.json" + + if not config_path.exists(): + print(f"Error: Configuration file not found: {config_path}") + print("Please run the MCP build_optimization_config tool first.") + sys.exit(1) + + print(f"\nConfiguration: {config_path}") + + # Create result extractors dict + extractors = { + 'mass_extractor': dummy_mass_extractor, + 'stress_extractor': dummy_stress_extractor, + 'displacement_extractor': dummy_displacement_extractor + } + + # Create optimization runner + runner = OptimizationRunner( + config_path=config_path, + model_updater=bracket_model_updater, + simulation_runner=bracket_simulation_runner, + result_extractors=extractors + ) + + # Run optimization (use fewer trials for demo) + print("\n" + "="*60) + print("Starting optimization with 10 trials (demo)") + print("For full optimization, modify n_trials in config") + print("="*60) + + # Override n_trials for demo + runner.config['optimization_settings']['n_trials'] = 10 + + # Run! + study = runner.run(study_name="bracket_optimization_demo") + + print("\n" + "="*60) + print("OPTIMIZATION RESULTS") + print("="*60) + print(f"\nBest parameters found:") + for param, value in study.best_params.items(): + print(f" {param}: {value:.4f}") + + print(f"\nBest objective value: {study.best_value:.6f}") + + print(f"\nResults saved to: {runner.output_dir}") + print(" - history.csv (all trials)") + print(" - history.json (detailed results)") + print(" - optimization_summary.json (best results)") + + print("\n" + "="*60) + print("NEXT STEPS:") + print("="*60) + print("1. Install pyNastran: conda install -c conda-forge pynastran") + print("2. Replace dummy extractors with real OP2 extractors") + print("3. Integrate with NX solver (batch mode or NXOpen)") + print("4. Run full optimization with n_trials=100+") + print("="*60) diff --git a/optimization_engine/nx_updater.py b/optimization_engine/nx_updater.py new file mode 100644 index 00000000..c7a850ee --- /dev/null +++ b/optimization_engine/nx_updater.py @@ -0,0 +1,292 @@ +""" +NX Parameter Updater + +Updates design variable values in NX .prt files. + +NX .prt files are binary, but expressions are stored in readable text sections. +This module can update expression values by: +1. Reading the binary file +2. Finding and replacing expression value patterns +3. Writing back the updated file + +Alternative: Use NXOpen API if NX is running (future enhancement) +""" + +from pathlib import Path +from typing import Dict, List +import re +import shutil +from datetime import datetime + + +class NXParameterUpdater: + """ + Updates parametric expression values in NX .prt files. + + NX Expression Format in binary .prt files: + #(Number [mm]) tip_thickness: 20.0; + *(Number [degrees]) support_angle: 30.0; + """ + + def __init__(self, prt_file_path: Path, backup: bool = True): + """ + Initialize updater for a specific .prt file. + + Args: + prt_file_path: Path to NX .prt file + backup: If True, create backup before modifying + """ + self.prt_path = Path(prt_file_path) + + if not self.prt_path.exists(): + raise FileNotFoundError(f".prt file not found: {prt_file_path}") + + self.backup_enabled = backup + self.content = None + self.text_content = None + self._load_file() + + def _load_file(self): + """Load .prt file as binary.""" + with open(self.prt_path, 'rb') as f: + self.content = bytearray(f.read()) + + # Decode as latin-1 for text operations (preserves all bytes) + self.text_content = self.content.decode('latin-1', errors='ignore') + + def _create_backup(self): + """Create timestamped backup of original file.""" + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_path = self.prt_path.with_suffix(f'.prt.bak_{timestamp}') + shutil.copy2(self.prt_path, backup_path) + print(f"Backup created: {backup_path}") + return backup_path + + def find_expressions(self) -> List[Dict[str, any]]: + """ + Find all expressions in the .prt file. + + Returns: + List of dicts with name, value, units + """ + expressions = [] + + # Pattern for NX expressions: + # #(Number [mm]) tip_thickness: 20.0; + # *(Number [mm]) p3: 10.0; + # ((Number [degrees]) support_angle: 30.0; + pattern = r'[#*\(]*\((\w+)\s*\[([^\]]*)\]\)\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)' + + for match in re.finditer(pattern, self.text_content): + expr_type, units, name, value = match.groups() + expressions.append({ + 'name': name, + 'value': float(value), + 'units': units, + 'type': expr_type + }) + + return expressions + + def update_expression(self, name: str, new_value: float) -> bool: + """ + Update a single expression value. + + Args: + name: Expression name + new_value: New value + + Returns: + True if updated, False if not found + """ + # Find the expression pattern + # Match: (Type [units]) name: old_value; + # We need to be careful to match the exact name and preserve formatting + + # Pattern that captures the full expression line + pattern = rf'([#*\(]*\(\w+\s*\[[^\]]*\]\)\s*)({re.escape(name)})\s*:\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)' + + matches = list(re.finditer(pattern, self.text_content)) + + if not matches: + print(f"Warning: Expression '{name}' not found in .prt file") + return False + + if len(matches) > 1: + print(f"Warning: Multiple matches for '{name}', updating first occurrence") + + # Get the first match + match = matches[0] + prefix, expr_name, old_value = match.groups() + + # Format new value (preserve decimal places if possible) + # Check if old value had decimal point + if '.' in old_value or 'e' in old_value.lower(): + # Use same precision as old value + decimal_places = len(old_value.split('.')[-1]) if '.' in old_value else 2 + new_value_str = f"{new_value:.{decimal_places}f}" + else: + # Integer format + new_value_str = f"{int(new_value)}" + + # Build replacement string + replacement = f"{prefix}{expr_name}: {new_value_str}" + + # Replace in text content + old_match = match.group(0) + self.text_content = self.text_content.replace(old_match, replacement, 1) + + # Also update in binary content + old_bytes = old_match.encode('latin-1') + new_bytes = replacement.encode('latin-1') + + # Find and replace in binary content + start_pos = self.content.find(old_bytes) + if start_pos != -1: + # Replace bytes + self.content[start_pos:start_pos+len(old_bytes)] = new_bytes + + print(f"Updated: {name} = {old_value} -> {new_value_str}") + return True + else: + print(f"Warning: Could not update binary content for '{name}'") + return False + + def update_expressions(self, updates: Dict[str, float]): + """ + Update multiple expressions at once. + + Args: + updates: Dict mapping expression name to new value + {'tip_thickness': 22.5, 'support_angle': 35.0} + """ + print(f"\nUpdating {len(updates)} expressions in {self.prt_path.name}:") + + updated_count = 0 + for name, value in updates.items(): + if self.update_expression(name, value): + updated_count += 1 + + print(f"Successfully updated {updated_count}/{len(updates)} expressions") + + def save(self, output_path: Path = None): + """ + Save modified .prt file. + + Args: + output_path: Optional different path to save to. + If None, overwrites original (with backup if enabled) + """ + if output_path is None: + output_path = self.prt_path + if self.backup_enabled: + self._create_backup() + + # Write updated binary content + with open(output_path, 'wb') as f: + f.write(self.content) + + print(f"Saved to: {output_path}") + + def verify_update(self, name: str, expected_value: float, tolerance: float = 1e-6) -> bool: + """ + Verify that an expression was updated correctly. + + Args: + name: Expression name + expected_value: Expected value + tolerance: Acceptable difference + + Returns: + True if value matches (within tolerance) + """ + expressions = self.find_expressions() + expr = next((e for e in expressions if e['name'] == name), None) + + if expr is None: + print(f"Expression '{name}' not found") + return False + + actual_value = expr['value'] + difference = abs(actual_value - expected_value) + + if difference <= tolerance: + print(f"OK Verified: {name} = {actual_value} (expected {expected_value})") + return True + else: + print(f"FAIL Verification failed: {name} = {actual_value}, expected {expected_value} (diff: {difference})") + return False + + +# Convenience function for optimization loop +def update_nx_model(prt_file_path: Path, design_variables: Dict[str, float], backup: bool = False): + """ + Convenience function to update NX model parameters. + + Args: + prt_file_path: Path to .prt file + design_variables: Dict of parameter name -> value + backup: Whether to create backup + + Example: + >>> update_nx_model( + ... Path("Bracket.prt"), + ... {'tip_thickness': 22.5, 'support_angle': 35.0} + ... ) + """ + updater = NXParameterUpdater(prt_file_path, backup=backup) + updater.update_expressions(design_variables) + updater.save() + + +# Example usage +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python nx_updater.py ") + sys.exit(1) + + prt_path = Path(sys.argv[1]) + + # Test: Find and print all expressions + print("="*60) + print("NX PARAMETER UPDATER TEST") + print("="*60) + + updater = NXParameterUpdater(prt_path, backup=True) + + print("\nCurrent expressions in file:") + expressions = updater.find_expressions() + for expr in expressions: + print(f" {expr['name']}: {expr['value']} {expr['units']}") + + # Test update (if expressions found) + if expressions: + print("\n" + "="*60) + print("TEST UPDATE") + print("="*60) + + # Update first expression + first_expr = expressions[0] + test_name = first_expr['name'] + test_new_value = first_expr['value'] * 1.1 # Increase by 10% + + print(f"\nUpdating {test_name} from {first_expr['value']} to {test_new_value}") + + updater.update_expression(test_name, test_new_value) + + # Save to test file + test_output = prt_path.with_suffix('.prt.test') + updater.save(test_output) + + # Verify by re-reading + print("\n" + "="*60) + print("VERIFICATION") + print("="*60) + verifier = NXParameterUpdater(test_output, backup=False) + verifier.verify_update(test_name, test_new_value) + + print(f"\nTest complete. Modified file: {test_output}") + else: + print("\nNo expressions found in file. Nothing to test.") diff --git a/optimization_engine/result_extractors/extractors.py b/optimization_engine/result_extractors/extractors.py new file mode 100644 index 00000000..ef5d7760 --- /dev/null +++ b/optimization_engine/result_extractors/extractors.py @@ -0,0 +1,207 @@ +""" +Result Extractors + +Wrapper functions that integrate with the optimization runner. +These extract optimization metrics from NX Nastran result files. +""" + +from pathlib import Path +from typing import Dict, Any +import sys + +# Add project root to path +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from optimization_engine.result_extractors.op2_extractor_example import ( + extract_max_displacement, + extract_max_stress, + extract_mass +) + + +def mass_extractor(result_path: Path) -> Dict[str, float]: + """ + Extract mass metrics for optimization. + + Args: + result_path: Path to .op2 file or directory containing results + + Returns: + Dict with 'total_mass' and other mass-related metrics + """ + # If result_path is a directory, find the .op2 file + if result_path.is_dir(): + op2_files = list(result_path.glob("*.op2")) + if not op2_files: + raise FileNotFoundError(f"No .op2 files found in {result_path}") + op2_path = op2_files[0] # Use first .op2 file + else: + op2_path = result_path + + if not op2_path.exists(): + raise FileNotFoundError(f"Result file not found: {op2_path}") + + result = extract_mass(op2_path) + + # Ensure total_mass key exists + if 'total_mass' not in result or result['total_mass'] is None: + raise ValueError(f"Could not extract total_mass from {op2_path}") + + return result + + +def stress_extractor(result_path: Path) -> Dict[str, float]: + """ + Extract stress metrics for optimization. + + Args: + result_path: Path to .op2 file or directory containing results + + Returns: + Dict with 'max_von_mises' and other stress metrics + """ + # If result_path is a directory, find the .op2 file + if result_path.is_dir(): + op2_files = list(result_path.glob("*.op2")) + if not op2_files: + raise FileNotFoundError(f"No .op2 files found in {result_path}") + op2_path = op2_files[0] + else: + op2_path = result_path + + if not op2_path.exists(): + raise FileNotFoundError(f"Result file not found: {op2_path}") + + result = extract_max_stress(op2_path, stress_type='von_mises') + + # Ensure max_von_mises key exists + if 'max_stress' in result: + result['max_von_mises'] = result['max_stress'] + + if 'max_von_mises' not in result or result['max_von_mises'] is None: + raise ValueError(f"Could not extract max_von_mises from {op2_path}") + + return result + + +def displacement_extractor(result_path: Path) -> Dict[str, float]: + """ + Extract displacement metrics for optimization. + + Args: + result_path: Path to .op2 file or directory containing results + + Returns: + Dict with 'max_displacement' and other displacement metrics + """ + # If result_path is a directory, find the .op2 file + if result_path.is_dir(): + op2_files = list(result_path.glob("*.op2")) + if not op2_files: + raise FileNotFoundError(f"No .op2 files found in {result_path}") + op2_path = op2_files[0] + else: + op2_path = result_path + + if not op2_path.exists(): + raise FileNotFoundError(f"Result file not found: {op2_path}") + + result = extract_max_displacement(op2_path) + + # Ensure max_displacement key exists + if 'max_displacement' not in result or result['max_displacement'] is None: + raise ValueError(f"Could not extract max_displacement from {op2_path}") + + return result + + +def volume_extractor(result_path: Path) -> Dict[str, float]: + """ + Extract volume metrics for optimization. + + Note: Volume is often not directly in OP2 files. + This is a placeholder that could be extended to: + - Calculate from mass and density + - Extract from .f06 file + - Query from NX model directly + + Args: + result_path: Path to result files + + Returns: + Dict with 'total_volume' + """ + # For now, estimate from mass (would need material density) + # This is a placeholder implementation + mass_result = mass_extractor(result_path) + + # Assuming steel density ~7850 kg/m^3 = 7.85e-6 kg/mm^3 + # volume (mm^3) = mass (kg) / density (kg/mm^3) + assumed_density = 7.85e-6 # kg/mm^3 + + if mass_result['total_mass']: + total_volume = mass_result['total_mass'] / assumed_density + else: + total_volume = None + + return { + 'total_volume': total_volume, + 'note': 'Volume estimated from mass using assumed density' + } + + +# Registry of all available extractors +EXTRACTOR_REGISTRY = { + 'mass_extractor': mass_extractor, + 'stress_extractor': stress_extractor, + 'displacement_extractor': displacement_extractor, + 'volume_extractor': volume_extractor +} + + +def get_extractor(extractor_name: str): + """ + Get an extractor function by name. + + Args: + extractor_name: Name of the extractor + + Returns: + Extractor function + + Raises: + ValueError: If extractor not found + """ + if extractor_name not in EXTRACTOR_REGISTRY: + available = ', '.join(EXTRACTOR_REGISTRY.keys()) + raise ValueError(f"Unknown extractor: {extractor_name}. Available: {available}") + + return EXTRACTOR_REGISTRY[extractor_name] + + +# Example usage +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python extractors.py ") + print("\nAvailable extractors:") + for name in EXTRACTOR_REGISTRY.keys(): + print(f" - {name}") + sys.exit(1) + + result_path = Path(sys.argv[1]) + + print("="*60) + print("TESTING ALL EXTRACTORS") + print("="*60) + + for extractor_name, extractor_func in EXTRACTOR_REGISTRY.items(): + print(f"\n{extractor_name}:") + try: + result = extractor_func(result_path) + for key, value in result.items(): + print(f" {key}: {value}") + except Exception as e: + print(f" Error: {e}") diff --git a/optimization_engine/runner.py b/optimization_engine/runner.py new file mode 100644 index 00000000..d50996a7 --- /dev/null +++ b/optimization_engine/runner.py @@ -0,0 +1,374 @@ +""" +Optimization Runner + +Orchestrates the optimization loop: +1. Load configuration +2. Initialize Optuna study +3. For each trial: + - Update design variables in NX model + - Run simulation + - Extract results (OP2 file) + - Return objective/constraint values to Optuna +4. Save optimization history +""" + +from pathlib import Path +from typing import Dict, Any, List, Optional, Callable +import json +import time +import optuna +from optuna.samplers import TPESampler, CmaEsSampler, GPSampler +import pandas as pd +from datetime import datetime + + +class OptimizationRunner: + """ + Main optimization runner that coordinates: + - Optuna optimization loop + - NX model parameter updates + - Simulation execution + - Result extraction + """ + + def __init__( + self, + config_path: Path, + model_updater: Callable, + simulation_runner: Callable, + result_extractors: Dict[str, Callable] + ): + """ + Initialize optimization runner. + + Args: + config_path: Path to optimization_config.json + model_updater: Function(design_vars: Dict) -> None + Updates NX model with new parameter values + simulation_runner: Function() -> Path + Runs simulation and returns path to result files + result_extractors: Dict mapping extractor name to extraction function + e.g., {'mass_extractor': extract_mass_func} + """ + self.config_path = Path(config_path) + self.config = self._load_config() + self.model_updater = model_updater + self.simulation_runner = simulation_runner + self.result_extractors = result_extractors + + # Initialize storage + self.history = [] + self.study = None + self.best_params = None + self.best_value = None + + # Paths + self.output_dir = self.config_path.parent / 'optimization_results' + self.output_dir.mkdir(exist_ok=True) + + def _load_config(self) -> Dict[str, Any]: + """Load and validate optimization configuration.""" + with open(self.config_path, 'r') as f: + config = json.load(f) + + # Validate required fields + required = ['design_variables', 'objectives', 'optimization_settings'] + for field in required: + if field not in config: + raise ValueError(f"Missing required field in config: {field}") + + return config + + def _get_sampler(self, sampler_name: str): + """Get Optuna sampler instance.""" + samplers = { + 'TPE': TPESampler, + 'CMAES': CmaEsSampler, + 'GP': GPSampler + } + + if sampler_name not in samplers: + raise ValueError(f"Unknown sampler: {sampler_name}. Choose from {list(samplers.keys())}") + + return samplers[sampler_name]() + + def _objective_function(self, trial: optuna.Trial) -> float: + """ + Optuna objective function. + + This is called for each optimization trial. + + Args: + trial: Optuna trial object + + Returns: + Objective value (float) or tuple of values for multi-objective + """ + # 1. Sample design variables + design_vars = {} + for dv in self.config['design_variables']: + if dv['type'] == 'continuous': + design_vars[dv['name']] = trial.suggest_float( + dv['name'], + dv['bounds'][0], + dv['bounds'][1] + ) + elif dv['type'] == 'discrete': + design_vars[dv['name']] = trial.suggest_int( + dv['name'], + int(dv['bounds'][0]), + int(dv['bounds'][1]) + ) + + # 2. Update NX model with new parameters + try: + self.model_updater(design_vars) + except Exception as e: + print(f"Error updating model: {e}") + raise optuna.TrialPruned() + + # 3. Run simulation + try: + result_path = self.simulation_runner() + except Exception as e: + print(f"Error running simulation: {e}") + raise optuna.TrialPruned() + + # 4. Extract results + extracted_results = {} + for obj in self.config['objectives']: + extractor_name = obj['extractor'] + if extractor_name not in self.result_extractors: + raise ValueError(f"Missing result extractor: {extractor_name}") + + extractor_func = self.result_extractors[extractor_name] + try: + result = extractor_func(result_path) + metric_name = obj['metric'] + extracted_results[obj['name']] = result[metric_name] + except Exception as e: + print(f"Error extracting {obj['name']}: {e}") + raise optuna.TrialPruned() + + # Extract constraints + for const in self.config.get('constraints', []): + extractor_name = const['extractor'] + if extractor_name not in self.result_extractors: + raise ValueError(f"Missing result extractor: {extractor_name}") + + extractor_func = self.result_extractors[extractor_name] + try: + result = extractor_func(result_path) + metric_name = const['metric'] + extracted_results[const['name']] = result[metric_name] + except Exception as e: + print(f"Error extracting {const['name']}: {e}") + raise optuna.TrialPruned() + + # 5. Evaluate constraints + for const in self.config.get('constraints', []): + value = extracted_results[const['name']] + limit = const['limit'] + + if const['type'] == 'upper_bound' and value > limit: + # Constraint violated - prune trial or penalize + print(f"Constraint violated: {const['name']} = {value:.4f} > {limit:.4f}") + raise optuna.TrialPruned() + elif const['type'] == 'lower_bound' and value < limit: + print(f"Constraint violated: {const['name']} = {value:.4f} < {limit:.4f}") + raise optuna.TrialPruned() + + # 6. Calculate weighted objective + # For multi-objective: weighted sum approach + total_objective = 0.0 + for obj in self.config['objectives']: + value = extracted_results[obj['name']] + weight = obj.get('weight', 1.0) + direction = obj.get('direction', 'minimize') + + # Normalize by weight + if direction == 'minimize': + total_objective += weight * value + else: # maximize + total_objective -= weight * value + + # 7. Store results in history + history_entry = { + 'trial_number': trial.number, + 'timestamp': datetime.now().isoformat(), + 'design_variables': design_vars, + 'objectives': {obj['name']: extracted_results[obj['name']] for obj in self.config['objectives']}, + 'constraints': {const['name']: extracted_results[const['name']] for const in self.config.get('constraints', [])}, + 'total_objective': total_objective + } + self.history.append(history_entry) + + # Save history after each trial + self._save_history() + + print(f"\nTrial {trial.number} completed:") + print(f" Design vars: {design_vars}") + print(f" Objectives: {history_entry['objectives']}") + print(f" Total objective: {total_objective:.6f}") + + return total_objective + + def run(self, study_name: Optional[str] = None) -> optuna.Study: + """ + Run the optimization. + + Args: + study_name: Optional name for the study + + Returns: + Completed Optuna study + """ + if study_name is None: + study_name = f"optimization_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # Get optimization settings + settings = self.config['optimization_settings'] + n_trials = settings.get('n_trials', 100) + sampler_name = settings.get('sampler', 'TPE') + + # Create Optuna study + sampler = self._get_sampler(sampler_name) + self.study = optuna.create_study( + study_name=study_name, + direction='minimize', # Total weighted objective is always minimized + sampler=sampler + ) + + print("="*60) + print(f"STARTING OPTIMIZATION: {study_name}") + print("="*60) + print(f"Design Variables: {len(self.config['design_variables'])}") + print(f"Objectives: {len(self.config['objectives'])}") + print(f"Constraints: {len(self.config.get('constraints', []))}") + print(f"Trials: {n_trials}") + print(f"Sampler: {sampler_name}") + print("="*60) + + # Run optimization + start_time = time.time() + self.study.optimize(self._objective_function, n_trials=n_trials) + elapsed_time = time.time() - start_time + + # Get best results + self.best_params = self.study.best_params + self.best_value = self.study.best_value + + print("\n" + "="*60) + print("OPTIMIZATION COMPLETE") + print("="*60) + print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)") + print(f"Best objective value: {self.best_value:.6f}") + print(f"Best parameters:") + for param, value in self.best_params.items(): + print(f" {param}: {value:.4f}") + print("="*60) + + # Save final results + self._save_final_results() + + return self.study + + def _save_history(self): + """Save optimization history to CSV and JSON.""" + # Save as JSON + history_json_path = self.output_dir / 'history.json' + with open(history_json_path, 'w') as f: + json.dump(self.history, f, indent=2) + + # Save as CSV (flattened) + if self.history: + # Flatten nested dicts for CSV + rows = [] + for entry in self.history: + row = { + 'trial_number': entry['trial_number'], + 'timestamp': entry['timestamp'], + 'total_objective': entry['total_objective'] + } + # Add design variables + for var_name, var_value in entry['design_variables'].items(): + row[f'dv_{var_name}'] = var_value + # Add objectives + for obj_name, obj_value in entry['objectives'].items(): + row[f'obj_{obj_name}'] = obj_value + # Add constraints + for const_name, const_value in entry['constraints'].items(): + row[f'const_{const_name}'] = const_value + + rows.append(row) + + df = pd.DataFrame(rows) + csv_path = self.output_dir / 'history.csv' + df.to_csv(csv_path, index=False) + + def _save_final_results(self): + """Save final optimization results summary.""" + if self.study is None: + return + + summary = { + 'study_name': self.study.study_name, + 'best_value': self.best_value, + 'best_params': self.best_params, + 'n_trials': len(self.study.trials), + 'configuration': self.config, + 'timestamp': datetime.now().isoformat() + } + + summary_path = self.output_dir / 'optimization_summary.json' + with open(summary_path, 'w') as f: + json.dump(summary, f, indent=2) + + print(f"\nResults saved to: {self.output_dir}") + print(f" - history.json") + print(f" - history.csv") + print(f" - optimization_summary.json") + + +# Example usage +if __name__ == "__main__": + # This would be replaced with actual NX integration functions + def dummy_model_updater(design_vars: Dict[str, float]): + """Dummy function - would update NX model.""" + print(f"Updating model with: {design_vars}") + + def dummy_simulation_runner() -> Path: + """Dummy function - would run NX simulation.""" + print("Running simulation...") + time.sleep(0.5) # Simulate work + return Path("examples/bracket/bracket_sim1-solution_1.op2") + + def dummy_mass_extractor(result_path: Path) -> Dict[str, float]: + """Dummy function - would extract from OP2.""" + import random + return {'total_mass': 0.4 + random.random() * 0.1} + + def dummy_stress_extractor(result_path: Path) -> Dict[str, float]: + """Dummy function - would extract from OP2.""" + import random + return {'max_von_mises': 150.0 + random.random() * 50.0} + + def dummy_displacement_extractor(result_path: Path) -> Dict[str, float]: + """Dummy function - would extract from OP2.""" + import random + return {'max_displacement': 0.8 + random.random() * 0.3} + + # Create runner + runner = OptimizationRunner( + config_path=Path("examples/bracket/optimization_config.json"), + model_updater=dummy_model_updater, + simulation_runner=dummy_simulation_runner, + result_extractors={ + 'mass_extractor': dummy_mass_extractor, + 'stress_extractor': dummy_stress_extractor, + 'displacement_extractor': dummy_displacement_extractor + } + ) + + # Run optimization + study = runner.run(study_name="test_bracket_optimization")