""" BaseOptimizationRunner - Unified base class for all optimization studies. This module eliminates ~4,200 lines of duplicated code across study run_optimization.py files by providing a config-driven optimization runner. Usage: # In study's run_optimization.py (now ~50 lines instead of ~300): from optimization_engine.base_runner import ConfigDrivenRunner runner = ConfigDrivenRunner(__file__) runner.run() Or for custom extraction logic: from optimization_engine.base_runner import BaseOptimizationRunner class MyStudyRunner(BaseOptimizationRunner): def extract_objectives(self, op2_file, dat_file, design_vars): # Custom extraction logic return {'mass': ..., 'stress': ..., 'stiffness': ...} runner = MyStudyRunner(__file__) runner.run() """ from pathlib import Path import sys import json import argparse from datetime import datetime from typing import Dict, Any, Optional, Tuple, List, Callable from abc import ABC, abstractmethod import importlib import optuna from optuna.samplers import NSGAIISampler, TPESampler class ConfigNormalizer: """ Normalizes different config formats to a standard internal format. Handles variations like: - 'parameter' vs 'name' for variable names - 'bounds' vs 'min'/'max' for ranges - 'goal' vs 'direction' for objective direction """ @staticmethod def normalize_config(config: Dict) -> Dict: """Convert any config format to standardized format.""" normalized = { 'study_name': config.get('study_name', 'unnamed_study'), 'description': config.get('description', ''), 'design_variables': [], 'objectives': [], 'constraints': [], 'simulation': {}, 'optimization': {}, 'neural_acceleration': config.get('neural_acceleration', {}), } # Normalize design variables for var in config.get('design_variables', []): normalized['design_variables'].append({ 'name': var.get('parameter') or var.get('name'), 'type': var.get('type', 'continuous'), 'min': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[0] if 'bounds' in var else var.get('min', 0), 'max': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[1] if 'bounds' in var else var.get('max', 1), 'units': var.get('units', ''), 'description': var.get('description', ''), }) # Normalize objectives for obj in config.get('objectives', []): normalized['objectives'].append({ 'name': obj.get('name'), 'direction': obj.get('goal') or obj.get('direction', 'minimize'), 'description': obj.get('description', ''), 'extraction': obj.get('extraction', {}), }) # Normalize constraints for con in config.get('constraints', []): normalized['constraints'].append({ 'name': con.get('name'), 'type': con.get('type', 'less_than'), 'value': con.get('threshold') or con.get('value', 0), 'units': con.get('units', ''), 'description': con.get('description', ''), 'extraction': con.get('extraction', {}), }) # Normalize simulation settings sim = config.get('simulation', {}) normalized['simulation'] = { 'prt_file': sim.get('prt_file') or sim.get('model_file', ''), 'sim_file': sim.get('sim_file', ''), 'fem_file': sim.get('fem_file', ''), 'dat_file': sim.get('dat_file', ''), 'op2_file': sim.get('op2_file', ''), 'solution_name': sim.get('solution_name', 'Solution 1'), 'solver': sim.get('solver', 'nastran'), } # Normalize optimization settings opt = config.get('optimization', config.get('optimization_settings', {})) normalized['optimization'] = { 'algorithm': opt.get('algorithm') or opt.get('sampler', 'NSGAIISampler'), 'n_trials': opt.get('n_trials', 100), 'population_size': opt.get('population_size', 20), 'seed': opt.get('seed', 42), 'timeout_per_trial': opt.get('timeout_per_trial', 600), } return normalized class BaseOptimizationRunner(ABC): """ Abstract base class for optimization runners. Subclasses must implement extract_objectives() to define how physics results are extracted from FEA output files. """ def __init__(self, script_path: str, config_path: Optional[str] = None): """ Initialize the runner. Args: script_path: Path to the study's run_optimization.py (__file__) config_path: Optional explicit path to config file """ self.study_dir = Path(script_path).parent self.config_path = Path(config_path) if config_path else self._find_config() self.model_dir = self.study_dir / "1_setup" / "model" self.results_dir = self.study_dir / "2_results" # Load and normalize config with open(self.config_path, 'r') as f: self.raw_config = json.load(f) self.config = ConfigNormalizer.normalize_config(self.raw_config) self.study_name = self.config['study_name'] self.logger = None self.nx_solver = None def _find_config(self) -> Path: """Find the optimization config file.""" candidates = [ self.study_dir / "optimization_config.json", self.study_dir / "1_setup" / "optimization_config.json", ] for path in candidates: if path.exists(): return path raise FileNotFoundError(f"No optimization_config.json found in {self.study_dir}") def _setup(self): """Initialize solver and logger.""" # Add project root to path project_root = self.study_dir.parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from optimization_engine.nx_solver import NXSolver from optimization_engine.logger import get_logger self.results_dir.mkdir(exist_ok=True) self.logger = get_logger(self.study_name, study_dir=self.results_dir) self.nx_solver = NXSolver(nastran_version="2506") def sample_design_variables(self, trial: optuna.Trial) -> Dict[str, float]: """Sample design variables from the config.""" design_vars = {} for var in self.config['design_variables']: name = var['name'] if var['type'] == 'integer': design_vars[name] = trial.suggest_int(name, int(var['min']), int(var['max'])) else: design_vars[name] = trial.suggest_float(name, var['min'], var['max']) return design_vars def run_simulation(self, design_vars: Dict[str, float]) -> Dict[str, Any]: """Run the FEA simulation with given design variables.""" sim_file = self.model_dir / self.config['simulation']['sim_file'] result = self.nx_solver.run_simulation( sim_file=sim_file, working_dir=self.model_dir, expression_updates=design_vars, solution_name=self.config['simulation'].get('solution_name'), cleanup=True ) return result @abstractmethod def extract_objectives(self, op2_file: Path, dat_file: Path, design_vars: Dict[str, float]) -> Dict[str, float]: """ Extract objective values from FEA results. Args: op2_file: Path to OP2 results file dat_file: Path to DAT/BDF file design_vars: Design variable values for this trial Returns: Dictionary of objective names to values """ pass def check_constraints(self, objectives: Dict[str, float], op2_file: Path) -> Tuple[bool, Dict[str, float]]: """ Check if constraints are satisfied. Returns: Tuple of (feasible, constraint_values) """ feasible = True constraint_values = {} for con in self.config['constraints']: name = con['name'] threshold = con['value'] con_type = con['type'] # Try to get constraint value from objectives or extract if name in objectives: value = objectives[name] elif 'stress' in name.lower() and 'stress' in objectives: value = objectives['stress'] elif 'displacement' in name.lower() and 'displacement' in objectives: value = objectives['displacement'] else: # Need to extract separately value = 0 # Default constraint_values[name] = value if con_type == 'less_than' and value > threshold: feasible = False self.logger.warning(f' Constraint violation: {name} = {value:.2f} > {threshold}') elif con_type == 'greater_than' and value < threshold: feasible = False self.logger.warning(f' Constraint violation: {name} = {value:.2f} < {threshold}') return feasible, constraint_values def objective_function(self, trial: optuna.Trial) -> Tuple[float, ...]: """ Main objective function for Optuna optimization. Returns tuple of objective values for multi-objective optimization. """ design_vars = self.sample_design_variables(trial) self.logger.trial_start(trial.number, design_vars) try: # Run simulation result = self.run_simulation(design_vars) if not result['success']: self.logger.trial_failed(trial.number, f"Simulation failed: {result.get('error', 'Unknown')}") return tuple([float('inf')] * len(self.config['objectives'])) op2_file = result['op2_file'] dat_file = self.model_dir / self.config['simulation']['dat_file'] # Extract objectives objectives = self.extract_objectives(op2_file, dat_file, design_vars) # Check constraints feasible, constraint_values = self.check_constraints(objectives, op2_file) # Set user attributes for name, value in objectives.items(): trial.set_user_attr(name, value) trial.set_user_attr('feasible', feasible) self.logger.trial_complete(trial.number, objectives, constraint_values, feasible) # Return objectives in order, converting maximize to minimize obj_values = [] for obj_config in self.config['objectives']: name = obj_config['name'] value = objectives.get(name, float('inf')) if obj_config['direction'] == 'maximize': value = -value # Negate for maximization obj_values.append(value) return tuple(obj_values) except Exception as e: self.logger.trial_failed(trial.number, str(e)) return tuple([float('inf')] * len(self.config['objectives'])) def get_sampler(self): """Get the appropriate Optuna sampler based on config.""" alg = self.config['optimization']['algorithm'] pop_size = self.config['optimization']['population_size'] seed = self.config['optimization']['seed'] if 'NSGA' in alg.upper(): return NSGAIISampler(population_size=pop_size, seed=seed) elif 'TPE' in alg.upper(): return TPESampler(seed=seed) else: return NSGAIISampler(population_size=pop_size, seed=seed) def get_directions(self) -> List[str]: """Get optimization directions for all objectives.""" # All directions are 'minimize' since we negate maximize objectives return ['minimize'] * len(self.config['objectives']) def clean_nastran_files(self): """Remove old Nastran solver output files.""" patterns = ['*.op2', '*.f06', '*.log', '*.f04', '*.pch', '*.DBALL', '*.MASTER', '_temp*.txt'] deleted = [] for pattern in patterns: for f in self.model_dir.glob(pattern): try: f.unlink() deleted.append(f) self.logger.info(f" Deleted: {f.name}") except Exception as e: self.logger.warning(f" Failed to delete {f.name}: {e}") return deleted def print_study_info(self): """Print study information to console.""" print("\n" + "=" * 60) print(f" {self.study_name.upper()}") print("=" * 60) print(f"\nDescription: {self.config['description']}") print(f"\nDesign Variables ({len(self.config['design_variables'])}):") for var in self.config['design_variables']: print(f" - {var['name']}: {var['min']}-{var['max']} {var['units']}") print(f"\nObjectives ({len(self.config['objectives'])}):") for obj in self.config['objectives']: print(f" - {obj['name']}: {obj['direction']}") print(f"\nConstraints ({len(self.config['constraints'])}):") for c in self.config['constraints']: print(f" - {c['name']}: < {c['value']} {c['units']}") print() def run(self, args=None): """ Main entry point for running optimization. Args: args: Optional argparse Namespace. If None, will parse sys.argv """ if args is None: args = self.parse_args() self._setup() if args.clean: self.clean_nastran_files() self.print_study_info() # Determine number of trials and storage if args.discover: n_trials = 1 storage = f"sqlite:///{self.results_dir / 'study_test.db'}" study_suffix = "_discover" elif args.validate: n_trials = 1 storage = f"sqlite:///{self.results_dir / 'study_test.db'}" study_suffix = "_validate" elif args.test: n_trials = 3 storage = f"sqlite:///{self.results_dir / 'study_test.db'}" study_suffix = "_test" else: n_trials = args.trials storage = f"sqlite:///{self.results_dir / 'study.db'}" study_suffix = "" # Create or load study full_study_name = f"{self.study_name}{study_suffix}" if args.resume and study_suffix == "": study = optuna.load_study( study_name=self.study_name, storage=storage, sampler=self.get_sampler() ) print(f"\nResuming study with {len(study.trials)} existing trials...") else: study = optuna.create_study( study_name=full_study_name, storage=storage, sampler=self.get_sampler(), directions=self.get_directions(), load_if_exists=(study_suffix == "") ) # Run optimization if study_suffix == "": self.logger.study_start(self.study_name, n_trials, self.config['optimization']['algorithm']) print(f"\nRunning {n_trials} trials...") study.optimize( self.objective_function, n_trials=n_trials, show_progress_bar=True ) # Report results n_complete = len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]) if study_suffix == "": self.logger.study_complete(self.study_name, len(study.trials), n_complete) print("\n" + "=" * 60) print(" COMPLETE!") print("=" * 60) print(f"\nTotal trials: {len(study.trials)}") print(f"Successful: {n_complete}") if hasattr(study, 'best_trials'): print(f"Pareto front: {len(study.best_trials)} solutions") if study_suffix == "": print("\nNext steps:") print(" 1. Run method selector:") print(f" python -m optimization_engine.method_selector {self.config_path.relative_to(self.study_dir)} 2_results/study.db") print(" 2. If turbo recommended, run neural acceleration") return 0 def parse_args(self) -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser(description=f'{self.study_name} - Optimization') stage_group = parser.add_mutually_exclusive_group() stage_group.add_argument('--discover', action='store_true', help='Discover model outputs (1 trial)') stage_group.add_argument('--validate', action='store_true', help='Run single validation trial') stage_group.add_argument('--test', action='store_true', help='Run 3-trial test') stage_group.add_argument('--run', action='store_true', help='Run full optimization') parser.add_argument('--trials', type=int, default=self.config['optimization']['n_trials'], help='Number of trials') parser.add_argument('--resume', action='store_true', help='Resume existing study') parser.add_argument('--clean', action='store_true', help='Clean old files first') args = parser.parse_args() if not any([args.discover, args.validate, args.test, args.run]): print("No stage specified. Use --discover, --validate, --test, or --run") print("\nTypical workflow:") print(" 1. python run_optimization.py --discover # Discover model outputs") print(" 2. python run_optimization.py --validate # Single trial validation") print(" 3. python run_optimization.py --test # Quick 3-trial test") print(f" 4. python run_optimization.py --run --trials {self.config['optimization']['n_trials']} # Full run") sys.exit(1) return args class ConfigDrivenRunner(BaseOptimizationRunner): """ Fully config-driven optimization runner. Automatically extracts objectives based on config file definitions. Supports standard extractors: mass, stress, displacement, stiffness. """ def __init__(self, script_path: str, config_path: Optional[str] = None, element_type: str = 'auto'): """ Initialize config-driven runner. Args: script_path: Path to the study's script (__file__) config_path: Optional explicit path to config element_type: Element type for stress extraction ('ctetra', 'cquad4', 'auto') """ super().__init__(script_path, config_path) self.element_type = element_type self._extractors_loaded = False self._extractors = {} def _load_extractors(self): """Lazy-load extractor functions.""" if self._extractors_loaded: return from optimization_engine.extractors.bdf_mass_extractor import extract_mass_from_bdf from optimization_engine.extractors.extract_displacement import extract_displacement from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress self._extractors = { 'extract_mass_from_bdf': extract_mass_from_bdf, 'extract_displacement': extract_displacement, 'extract_solid_stress': extract_solid_stress, } self._extractors_loaded = True def _detect_element_type(self, dat_file: Path) -> str: """Auto-detect element type from BDF/DAT file.""" if self.element_type != 'auto': return self.element_type try: with open(dat_file, 'r') as f: content = f.read(50000) # Read first 50KB if 'CTETRA' in content: return 'ctetra' elif 'CHEXA' in content: return 'chexa' elif 'CQUAD4' in content: return 'cquad4' elif 'CTRIA3' in content: return 'ctria3' else: return 'ctetra' # Default except Exception: return 'ctetra' def extract_objectives(self, op2_file: Path, dat_file: Path, design_vars: Dict[str, float]) -> Dict[str, float]: """ Extract all objectives based on config. Handles common objectives: mass, stress, displacement, stiffness """ self._load_extractors() objectives = {} element_type = self._detect_element_type(dat_file) for obj_config in self.config['objectives']: name = obj_config['name'].lower() try: if 'mass' in name: objectives[obj_config['name']] = self._extractors['extract_mass_from_bdf'](str(dat_file)) self.logger.info(f" {obj_config['name']}: {objectives[obj_config['name']]:.2f} kg") elif 'stress' in name: stress_result = self._extractors['extract_solid_stress']( op2_file, subcase=1, element_type=element_type ) # Convert kPa to MPa stress_mpa = stress_result.get('max_von_mises', float('inf')) / 1000.0 objectives[obj_config['name']] = stress_mpa self.logger.info(f" {obj_config['name']}: {stress_mpa:.2f} MPa") elif 'displacement' in name: disp_result = self._extractors['extract_displacement'](op2_file, subcase=1) objectives[obj_config['name']] = disp_result['max_displacement'] self.logger.info(f" {obj_config['name']}: {disp_result['max_displacement']:.3f} mm") elif 'stiffness' in name: disp_result = self._extractors['extract_displacement'](op2_file, subcase=1) max_disp = disp_result['max_displacement'] applied_force = 1000.0 # N - standard assumption stiffness = applied_force / max(abs(max_disp), 1e-6) objectives[obj_config['name']] = stiffness objectives['displacement'] = max_disp # Store for constraint check self.logger.info(f" {obj_config['name']}: {stiffness:.1f} N/mm") self.logger.info(f" displacement: {max_disp:.3f} mm") else: self.logger.warning(f" Unknown objective: {name}") objectives[obj_config['name']] = float('inf') except Exception as e: self.logger.error(f" Failed to extract {name}: {e}") objectives[obj_config['name']] = float('inf') return objectives def create_runner(script_path: str, element_type: str = 'auto') -> ConfigDrivenRunner: """ Factory function to create a ConfigDrivenRunner. Args: script_path: Path to the study's run_optimization.py (__file__) element_type: Element type for stress extraction Returns: Configured runner ready to execute """ return ConfigDrivenRunner(script_path, element_type=element_type)