""" Optimization Runner Orchestrates the optimization loop: 1. Load configuration 2. Initialize Optuna study 3. For each trial: - Update design variables in NX model - Run simulation - Extract results (OP2 file) - Return objective/constraint values to Optuna 4. Save optimization history """ from pathlib import Path from typing import Dict, Any, List, Optional, Callable import json import time import optuna from optuna.samplers import TPESampler, CmaEsSampler, GPSampler import pandas as pd from datetime import datetime class OptimizationRunner: """ Main optimization runner that coordinates: - Optuna optimization loop - NX model parameter updates - Simulation execution - Result extraction """ def __init__( self, config_path: Path, model_updater: Callable, simulation_runner: Callable, result_extractors: Dict[str, Callable] ): """ Initialize optimization runner. Args: config_path: Path to optimization_config.json model_updater: Function(design_vars: Dict) -> None Updates NX model with new parameter values simulation_runner: Function() -> Path Runs simulation and returns path to result files result_extractors: Dict mapping extractor name to extraction function e.g., {'mass_extractor': extract_mass_func} """ self.config_path = Path(config_path) self.config = self._load_config() self.model_updater = model_updater self.simulation_runner = simulation_runner self.result_extractors = result_extractors # Initialize storage self.history = [] self.study = None self.best_params = None self.best_value = None # Paths self.output_dir = self.config_path.parent / 'optimization_results' self.output_dir.mkdir(exist_ok=True) def _load_config(self) -> Dict[str, Any]: """Load and validate optimization configuration.""" with open(self.config_path, 'r') as f: config = json.load(f) # Validate required fields required = ['design_variables', 'objectives', 'optimization_settings'] for field in required: if field not in config: raise ValueError(f"Missing required field in config: {field}") return config def _get_sampler(self, sampler_name: str): """Get Optuna sampler instance with enhanced settings.""" opt_settings = self.config.get('optimization_settings', {}) if sampler_name == 'TPE': # Enhanced TPE sampler for better exploration/exploitation balance return TPESampler( n_startup_trials=opt_settings.get('n_startup_trials', 20), n_ei_candidates=opt_settings.get('tpe_n_ei_candidates', 24), multivariate=opt_settings.get('tpe_multivariate', True), seed=42 # For reproducibility ) elif sampler_name == 'CMAES': return CmaEsSampler(seed=42) elif sampler_name == 'GP': return GPSampler(seed=42) else: raise ValueError(f"Unknown sampler: {sampler_name}. Choose from ['TPE', 'CMAES', 'GP']") def _get_precision(self, var_name: str, units: str) -> int: """ Get appropriate decimal precision based on units. Args: var_name: Variable name units: Physical units (mm, degrees, MPa, etc.) Returns: Number of decimal places """ precision_map = { 'mm': 4, 'millimeter': 4, 'degrees': 4, 'deg': 4, 'mpa': 4, 'gpa': 6, 'kg': 3, 'n': 2, 'dimensionless': 6 } units_lower = units.lower() if units else 'dimensionless' return precision_map.get(units_lower, 4) # Default to 4 decimals def _objective_function(self, trial: optuna.Trial) -> float: """ Optuna objective function. This is called for each optimization trial. Args: trial: Optuna trial object Returns: Objective value (float) or tuple of values for multi-objective """ # 1. Sample design variables with appropriate precision design_vars = {} for dv in self.config['design_variables']: if dv['type'] == 'continuous': value = trial.suggest_float( dv['name'], dv['bounds'][0], dv['bounds'][1] ) # Round to appropriate precision precision = self._get_precision(dv['name'], dv.get('units', '')) design_vars[dv['name']] = round(value, precision) elif dv['type'] == 'discrete': design_vars[dv['name']] = trial.suggest_int( dv['name'], int(dv['bounds'][0]), int(dv['bounds'][1]) ) # 2. Update NX model with new parameters try: self.model_updater(design_vars) except Exception as e: print(f"Error updating model: {e}") raise optuna.TrialPruned() # 3. Run simulation try: result_path = self.simulation_runner() except Exception as e: print(f"Error running simulation: {e}") raise optuna.TrialPruned() # 4. Extract results with appropriate precision extracted_results = {} for obj in self.config['objectives']: extractor_name = obj['extractor'] if extractor_name not in self.result_extractors: raise ValueError(f"Missing result extractor: {extractor_name}") extractor_func = self.result_extractors[extractor_name] try: result = extractor_func(result_path) metric_name = obj['metric'] value = result[metric_name] # Round to appropriate precision based on units precision = self._get_precision(obj['name'], obj.get('units', '')) extracted_results[obj['name']] = round(value, precision) except Exception as e: print(f"Error extracting {obj['name']}: {e}") raise optuna.TrialPruned() # Extract constraints with appropriate precision for const in self.config.get('constraints', []): extractor_name = const['extractor'] if extractor_name not in self.result_extractors: raise ValueError(f"Missing result extractor: {extractor_name}") extractor_func = self.result_extractors[extractor_name] try: result = extractor_func(result_path) metric_name = const['metric'] value = result[metric_name] # Round to appropriate precision based on units precision = self._get_precision(const['name'], const.get('units', '')) extracted_results[const['name']] = round(value, precision) except Exception as e: print(f"Error extracting {const['name']}: {e}") raise optuna.TrialPruned() # 5. Evaluate constraints for const in self.config.get('constraints', []): value = extracted_results[const['name']] limit = const['limit'] if const['type'] == 'upper_bound' and value > limit: # Constraint violated - prune trial or penalize print(f"Constraint violated: {const['name']} = {value:.4f} > {limit:.4f}") raise optuna.TrialPruned() elif const['type'] == 'lower_bound' and value < limit: print(f"Constraint violated: {const['name']} = {value:.4f} < {limit:.4f}") raise optuna.TrialPruned() # 6. Calculate weighted objective # For multi-objective: weighted sum approach total_objective = 0.0 for obj in self.config['objectives']: value = extracted_results[obj['name']] weight = obj.get('weight', 1.0) direction = obj.get('direction', 'minimize') # Normalize by weight if direction == 'minimize': total_objective += weight * value else: # maximize total_objective -= weight * value # 7. Store results in history history_entry = { 'trial_number': trial.number, 'timestamp': datetime.now().isoformat(), 'design_variables': design_vars, 'objectives': {obj['name']: extracted_results[obj['name']] for obj in self.config['objectives']}, 'constraints': {const['name']: extracted_results[const['name']] for const in self.config.get('constraints', [])}, 'total_objective': total_objective } self.history.append(history_entry) # Save history after each trial self._save_history() print(f"\nTrial {trial.number} completed:") print(f" Design vars: {design_vars}") print(f" Objectives: {history_entry['objectives']}") print(f" Total objective: {total_objective:.6f}") return total_objective def run(self, study_name: Optional[str] = None) -> optuna.Study: """ Run the optimization. Args: study_name: Optional name for the study Returns: Completed Optuna study """ if study_name is None: study_name = f"optimization_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Get optimization settings settings = self.config['optimization_settings'] n_trials = settings.get('n_trials', 100) sampler_name = settings.get('sampler', 'TPE') # Create Optuna study sampler = self._get_sampler(sampler_name) self.study = optuna.create_study( study_name=study_name, direction='minimize', # Total weighted objective is always minimized sampler=sampler ) print("="*60) print(f"STARTING OPTIMIZATION: {study_name}") print("="*60) print(f"Design Variables: {len(self.config['design_variables'])}") print(f"Objectives: {len(self.config['objectives'])}") print(f"Constraints: {len(self.config.get('constraints', []))}") print(f"Trials: {n_trials}") print(f"Sampler: {sampler_name}") print("="*60) # Run optimization start_time = time.time() self.study.optimize(self._objective_function, n_trials=n_trials) elapsed_time = time.time() - start_time # Get best results self.best_params = self.study.best_params self.best_value = self.study.best_value print("\n" + "="*60) print("OPTIMIZATION COMPLETE") print("="*60) print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)") print(f"Best objective value: {self.best_value:.6f}") print(f"Best parameters:") for param, value in self.best_params.items(): print(f" {param}: {value:.4f}") print("="*60) # Save final results self._save_final_results() return self.study def _save_history(self): """Save optimization history to CSV and JSON.""" # Save as JSON history_json_path = self.output_dir / 'history.json' with open(history_json_path, 'w') as f: json.dump(self.history, f, indent=2) # Save as CSV (flattened) if self.history: # Flatten nested dicts for CSV rows = [] for entry in self.history: row = { 'trial_number': entry['trial_number'], 'timestamp': entry['timestamp'], 'total_objective': entry['total_objective'] } # Add design variables for var_name, var_value in entry['design_variables'].items(): row[f'dv_{var_name}'] = var_value # Add objectives for obj_name, obj_value in entry['objectives'].items(): row[f'obj_{obj_name}'] = obj_value # Add constraints for const_name, const_value in entry['constraints'].items(): row[f'const_{const_name}'] = const_value rows.append(row) df = pd.DataFrame(rows) csv_path = self.output_dir / 'history.csv' df.to_csv(csv_path, index=False) def _save_final_results(self): """Save final optimization results summary.""" if self.study is None: return summary = { 'study_name': self.study.study_name, 'best_value': self.best_value, 'best_params': self.best_params, 'n_trials': len(self.study.trials), 'configuration': self.config, 'timestamp': datetime.now().isoformat() } summary_path = self.output_dir / 'optimization_summary.json' with open(summary_path, 'w') as f: json.dump(summary, f, indent=2) print(f"\nResults saved to: {self.output_dir}") print(f" - history.json") print(f" - history.csv") print(f" - optimization_summary.json") # Example usage if __name__ == "__main__": # This would be replaced with actual NX integration functions def dummy_model_updater(design_vars: Dict[str, float]): """Dummy function - would update NX model.""" print(f"Updating model with: {design_vars}") def dummy_simulation_runner() -> Path: """Dummy function - would run NX simulation.""" print("Running simulation...") time.sleep(0.5) # Simulate work return Path("examples/bracket/bracket_sim1-solution_1.op2") def dummy_mass_extractor(result_path: Path) -> Dict[str, float]: """Dummy function - would extract from OP2.""" import random return {'total_mass': 0.4 + random.random() * 0.1} def dummy_stress_extractor(result_path: Path) -> Dict[str, float]: """Dummy function - would extract from OP2.""" import random return {'max_von_mises': 150.0 + random.random() * 50.0} def dummy_displacement_extractor(result_path: Path) -> Dict[str, float]: """Dummy function - would extract from OP2.""" import random return {'max_displacement': 0.8 + random.random() * 0.3} # Create runner runner = OptimizationRunner( config_path=Path("examples/bracket/optimization_config.json"), model_updater=dummy_model_updater, simulation_runner=dummy_simulation_runner, result_extractors={ 'mass_extractor': dummy_mass_extractor, 'stress_extractor': dummy_stress_extractor, 'displacement_extractor': dummy_displacement_extractor } ) # Run optimization study = runner.run(study_name="test_bracket_optimization")