#!/usr/bin/env python3 """ M1 Mirror Adaptive Surrogate Optimization V12 ============================================== Adaptive optimization with TUNED neural surrogate hyperparameters. Key Improvements over V11: 1. Optuna-based hyperparameter tuning for NN architecture 2. K-fold cross-validation to prevent overfitting 3. Early stopping with proper validation set 4. Ensemble models for better uncertainty quantification 5. Proper train/validation split Usage: python run_optimization.py --start python run_optimization.py --start --tune-trials 30 --ensemble-size 3 python run_optimization.py --tune-only # Just tune hyperparameters Dashboard: FEA trials shown as circles (blue) NN trials shown as crosses (orange) """ import sys import os import json import time import argparse import logging import sqlite3 import shutil from pathlib import Path from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass, field from datetime import datetime import numpy as np import pandas as pd # Add parent directories to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) import optuna from optuna.samplers import TPESampler import torch import torch.nn as nn # Atomizer imports from optimization_engine.nx_solver import NXSolver from optimization_engine.utils import ensure_nx_running from optimization_engine.extractors import ZernikeExtractor from optimization_engine.surrogate_tuner import ( SurrogateHyperparameterTuner, TunedEnsembleSurrogate, SurrogateConfig, tune_surrogate_for_study ) # ============================================================================ # Paths # ============================================================================ STUDY_DIR = Path(__file__).parent SETUP_DIR = STUDY_DIR / "1_setup" ITERATIONS_DIR = STUDY_DIR / "2_iterations" RESULTS_DIR = STUDY_DIR / "3_results" CONFIG_PATH = SETUP_DIR / "optimization_config.json" # V11 paths (source data - has 107 FEA samples) V11_DIR = STUDY_DIR.parent / "m1_mirror_adaptive_V11" V11_DB = V11_DIR / "3_results" / "study.db" V11_MODEL_DIR = V11_DIR / "1_setup" / "model" # Fallback to original m1_mirror_zernike_optimization if V11 not available ORIG_DIR = STUDY_DIR.parent / "m1_mirror_zernike_optimization" ORIG_DB = ORIG_DIR / "2_results" / "study.db" ORIG_MODEL_DIR = ORIG_DIR / "1_setup" / "model" # Ensure directories exist ITERATIONS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) # Logging LOG_FILE = RESULTS_DIR / "optimization.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, mode='a') ] ) logger = logging.getLogger(__name__) # ============================================================================ # Configuration # ============================================================================ DESIGN_VAR_NAMES = [ 'lateral_inner_angle', 'lateral_outer_angle', 'lateral_outer_pivot', 'lateral_inner_pivot', 'lateral_middle_pivot', 'lateral_closeness', 'whiffle_min', 'whiffle_outer_to_vertical', 'whiffle_triangle_closeness', 'blank_backface_angle', 'inner_circular_rib_dia' ] OBJ_NAMES = ['rel_filtered_rms_40_vs_20', 'rel_filtered_rms_60_vs_20', 'mfg_90_optician_workload'] OUTPUT_NAMES = OBJ_NAMES + ['mass_kg'] # All surrogate outputs including constraints # Mass constraint MASS_LIMIT_KG = 99.0 MASS_PENALTY_WEIGHT = 100.0 # Heavy penalty for exceeding mass limit def estimate_mass_from_angle(blank_backface_angle: float) -> float: """ Estimate mass based on blank_backface_angle. This is a linear approximation based on the relationship: - Higher backface angle = more material removed = lower mass - Typical range: 4.0 to 5.0 degrees - Mass range estimate: ~95-105 kg (based on prior runs) Will be refined after first FEA batch collects real data. """ # Linear approximation: mass decreases as angle increases # At angle=4.0 -> ~105 kg (minimum removal) # At angle=5.0 -> ~90 kg (maximum removal) mass_at_4deg = 105.0 mass_at_5deg = 90.0 slope = (mass_at_5deg - mass_at_4deg) / (5.0 - 4.0) # -15 kg/deg mass = mass_at_4deg + slope * (blank_backface_angle - 4.0) return max(85.0, min(110.0, mass)) # Clamp to reasonable range @dataclass class AdaptiveConfig: """Adaptive optimization configuration.""" max_iterations: int = 100 surrogate_trials_per_iter: int = 1000 fea_batch_size: int = 5 strategy: str = 'hybrid' exploration_ratio: float = 0.3 convergence_threshold: float = 0.3 patience: int = 5 min_training_samples: int = 30 # Surrogate tuning tune_trials: int = 50 ensemble_size: int = 5 cv_folds: int = 5 # Objective weights weight_40_vs_20: float = 5.0 weight_60_vs_20: float = 5.0 weight_mfg: float = 1.0 # Targets for normalization target_40_vs_20: float = 4.0 target_60_vs_20: float = 10.0 target_mfg: float = 20.0 @dataclass class AdaptiveState: """Tracks optimization state.""" iteration: int = 0 total_fea_count: int = 0 total_nn_count: int = 0 best_40_vs_20: float = float('inf') best_60_vs_20: float = float('inf') best_mfg: float = float('inf') best_weighted: float = float('inf') best_params: Dict = field(default_factory=dict) no_improvement_count: int = 0 history: List[Dict] = field(default_factory=list) # Tuning info tuning_completed: bool = False best_hyperparams: Dict = field(default_factory=dict) cv_r2_scores: Dict = field(default_factory=dict) # ============================================================================ # FEA Data Loader (from V11) # ============================================================================ def load_fea_data() -> List[Dict]: """Load FEA data from V11 study database (has 107 samples).""" # Try V11 first if V11_DB.exists(): logger.info(f"Loading FEA data from V11: {V11_DB}") return _load_from_v11_db(V11_DB) # Fall back to original study if ORIG_DB.exists(): logger.info(f"Loading FEA data from original study: {ORIG_DB}") return _load_from_orig_db(ORIG_DB) logger.error("No source database found!") return [] def _load_from_v11_db(db_path: Path) -> List[Dict]: """Load FEA data from V11 database format.""" conn = sqlite3.connect(str(db_path)) fea_data = [] try: cursor = conn.cursor() # Get the v11_fea study cursor.execute('SELECT study_id FROM studies WHERE study_name = "v11_fea"') result = cursor.fetchone() if not result: logger.error("v11_fea study not found in database") return [] study_id = result[0] # Get all completed trials cursor.execute(''' SELECT t.trial_id, t.number FROM trials t WHERE t.study_id = ? AND t.state = 'COMPLETE' ''', (study_id,)) trials = cursor.fetchall() for trial_id, trial_num in trials: # Get params cursor.execute(''' SELECT param_name, param_value FROM trial_params WHERE trial_id = ? ''', (trial_id,)) params = {name: float(value) for name, value in cursor.fetchall()} if not params: continue # Get objectives from user_attributes cursor.execute(''' SELECT key, value_json FROM trial_user_attributes WHERE trial_id = ? ''', (trial_id,)) attrs = {key: json.loads(value) for key, value in cursor.fetchall()} # Check if we have all required objectives if all(k in attrs for k in OBJ_NAMES): objectives = {k: attrs[k] for k in OBJ_NAMES} source = attrs.get('source', 'FEA') # Estimate mass if not available (V11 didn't track mass) mass_kg = attrs.get('mass_kg') if mass_kg is None and 'blank_backface_angle' in params: mass_kg = estimate_mass_from_angle(params['blank_backface_angle']) fea_data.append({ 'trial_num': trial_num, 'params': params, 'objectives': objectives, 'mass_kg': mass_kg, 'source': source }) except Exception as e: logger.error(f"Error loading V11 data: {e}") import traceback traceback.print_exc() finally: conn.close() logger.info(f"Loaded {len(fea_data)} FEA trials from V11") return fea_data def _load_from_orig_db(db_path: Path) -> List[Dict]: """Load FEA data from original m1_mirror_zernike_optimization database.""" conn = sqlite3.connect(str(db_path)) fea_data = [] try: cursor = conn.cursor() cursor.execute(''' SELECT trial_id, number FROM trials WHERE state = 'COMPLETE' ''') trials = cursor.fetchall() for trial_id, trial_num in trials: # Get params cursor.execute(''' SELECT param_name, param_value FROM trial_params WHERE trial_id = ? ''', (trial_id,)) params = {name: float(value) for name, value in cursor.fetchall()} if not params: continue # Get objectives cursor.execute(''' SELECT key, value_json FROM trial_user_attributes WHERE trial_id = ? AND key = 'objectives' ''', (trial_id,)) result = cursor.fetchone() if result: objectives = json.loads(result[1]) if all(k in objectives for k in OBJ_NAMES): # Estimate mass from backface angle mass_kg = None if 'blank_backface_angle' in params: mass_kg = estimate_mass_from_angle(params['blank_backface_angle']) fea_data.append({ 'trial_num': trial_num, 'params': params, 'objectives': objectives, 'mass_kg': mass_kg, 'source': 'V10_FEA' }) except Exception as e: logger.error(f"Error loading original data: {e}") finally: conn.close() logger.info(f"Loaded {len(fea_data)} FEA trials from original study") return fea_data # ============================================================================ # FEA Runner # ============================================================================ class FEARunner: """Runs actual FEA simulations.""" def __init__(self, config: Dict[str, Any]): self.config = config self.nx_solver = None self.nx_manager = None self.master_model_dir = None def setup(self): """Setup NX and copy master model.""" logger.info("Setting up NX session and model...") # Copy master model if not exists local_model_dir = SETUP_DIR / "model" if not local_model_dir.exists(): # Try V11 first, then original if V11_MODEL_DIR.exists(): logger.info(f"Copying master model from V11...") shutil.copytree(V11_MODEL_DIR, local_model_dir) elif ORIG_MODEL_DIR.exists(): logger.info(f"Copying master model from original study...") shutil.copytree(ORIG_MODEL_DIR, local_model_dir) else: raise FileNotFoundError("No source model directory found!") self.master_model_dir = local_model_dir # Setup NX study_name = self.config.get('study_name', 'm1_adaptive_V12') try: self.nx_manager, nx_was_started = ensure_nx_running( session_id=study_name, auto_start=True, start_timeout=120 ) logger.info("NX session ready" + (" (started)" if nx_was_started else " (existing)")) except Exception as e: logger.error(f"Failed to setup NX: {e}") raise # Initialize solver import re nx_settings = self.config.get('nx_settings', {}) nx_install_dir = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506') version_match = re.search(r'NX(\d+)', nx_install_dir) nastran_version = version_match.group(1) if version_match else "2506" self.nx_solver = NXSolver( master_model_dir=str(self.master_model_dir), nx_install_dir=nx_install_dir, nastran_version=nastran_version, timeout=nx_settings.get('simulation_timeout_s', 600), use_iteration_folders=True, study_name="m1_mirror_adaptive_V12" ) def run_fea(self, params: Dict[str, float], trial_num: int) -> Optional[Dict]: """Run FEA and extract objectives.""" if self.nx_solver is None: self.setup() logger.info(f" [FEA {trial_num}] Running simulation...") expressions = {var['expression_name']: params[var['name']] for var in self.config['design_variables']} iter_folder = self.nx_solver.create_iteration_folder( iterations_base_dir=ITERATIONS_DIR, iteration_number=trial_num, expression_updates=expressions ) try: nx_settings = self.config.get('nx_settings', {}) sim_file = iter_folder / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim') t_start = time.time() result = self.nx_solver.run_simulation( sim_file=sim_file, working_dir=iter_folder, expression_updates=expressions, solution_name=nx_settings.get('solution_name', 'Solution 1'), cleanup=False ) solve_time = time.time() - t_start if not result['success']: logger.error(f" [FEA {trial_num}] Solve failed: {result.get('error')}") return None logger.info(f" [FEA {trial_num}] Solved in {solve_time:.1f}s") # Extract objectives op2_path = Path(result['op2_file']) objectives = self._extract_objectives(op2_path) if objectives is None: return None # Extract mass from expression mass_kg = self._extract_mass(iter_folder) logger.info(f" [FEA {trial_num}] 40-20: {objectives['rel_filtered_rms_40_vs_20']:.2f} nm") logger.info(f" [FEA {trial_num}] 60-20: {objectives['rel_filtered_rms_60_vs_20']:.2f} nm") logger.info(f" [FEA {trial_num}] Mfg: {objectives['mfg_90_optician_workload']:.2f} nm") logger.info(f" [FEA {trial_num}] Mass: {mass_kg:.2f} kg {'(OVER LIMIT!)' if mass_kg > MASS_LIMIT_KG else ''}") return { 'trial_num': trial_num, 'params': params, 'objectives': objectives, 'mass_kg': mass_kg, 'source': 'FEA', 'solve_time': solve_time } except Exception as e: logger.error(f" [FEA {trial_num}] Error: {e}") import traceback traceback.print_exc() return None def _extract_mass(self, iter_folder: Path) -> float: """Extract mass from .exp file in iteration folder.""" import re # Find .exp file (usually named after the part) exp_files = list(iter_folder.glob("*.exp")) for exp_file in exp_files: try: content = exp_file.read_text(encoding='utf-8', errors='ignore') # Look for mass expression: [Kilogram]p173= match = re.search(r'\[Kilogram\]p173=([0-9.]+)', content) if match: mass_kg = float(match.group(1)) logger.info(f" Mass extracted: {mass_kg:.2f} kg") return mass_kg except Exception as e: logger.warning(f"Could not parse {exp_file}: {e}") # If no .exp file, try reading from .prt binary prt_files = list(iter_folder.glob("*.prt")) for prt_file in prt_files: try: content = prt_file.read_bytes().decode('utf-8', errors='ignore') match = re.search(r'\[Kilogram\]p173=([0-9.]+)', content) if match: mass_kg = float(match.group(1)) logger.info(f" Mass from .prt: {mass_kg:.2f} kg") return mass_kg except Exception: pass logger.warning(" Mass not found, using default estimate") return 95.0 # Fallback estimate def _extract_objectives(self, op2_path: Path) -> Optional[Dict[str, float]]: """Extract objectives using ZernikeExtractor.""" try: zernike_settings = self.config.get('zernike_settings', {}) extractor = ZernikeExtractor( op2_path, bdf_path=None, displacement_unit=zernike_settings.get('displacement_unit', 'mm'), n_modes=zernike_settings.get('n_modes', 50), filter_orders=zernike_settings.get('filter_low_orders', 4) ) ref = zernike_settings.get('reference_subcase', '2') rel_40 = extractor.extract_relative("3", ref) rel_60 = extractor.extract_relative("4", ref) rel_90 = extractor.extract_relative("1", ref) return { 'rel_filtered_rms_40_vs_20': rel_40['relative_filtered_rms_nm'], 'rel_filtered_rms_60_vs_20': rel_60['relative_filtered_rms_nm'], 'mfg_90_optician_workload': rel_90['relative_rms_filter_j1to3'] } except Exception as e: logger.error(f"Zernike extraction failed: {e}") return None def cleanup(self): """Cleanup NX session.""" if self.nx_manager: if self.nx_manager.can_close_nx(): self.nx_manager.close_nx_if_allowed() self.nx_manager.cleanup() # ============================================================================ # Adaptive Optimizer with Tuned Surrogate # ============================================================================ class TunedAdaptiveOptimizer: """Main adaptive optimization loop with hyperparameter tuning.""" def __init__(self, config: AdaptiveConfig, study_config: Dict[str, Any]): self.config = config self.study_config = study_config self.state = AdaptiveState() self.surrogate: Optional[TunedEnsembleSurrogate] = None self.fea_runner = FEARunner(study_config) # Load source data (V11 or original) self.fea_data = load_fea_data() self.state.total_fea_count = len(self.fea_data) # Study database self.db_path = RESULTS_DIR / "study.db" self.storage = optuna.storages.RDBStorage(f'sqlite:///{self.db_path}') def compute_weighted_objective(self, objectives: Dict[str, float]) -> float: """Compute normalized weighted objective.""" w1, w2, w3 = self.config.weight_40_vs_20, self.config.weight_60_vs_20, self.config.weight_mfg t1, t2, t3 = self.config.target_40_vs_20, self.config.target_60_vs_20, self.config.target_mfg weighted = (w1 * objectives['rel_filtered_rms_40_vs_20'] / t1 + w2 * objectives['rel_filtered_rms_60_vs_20'] / t2 + w3 * objectives['mfg_90_optician_workload'] / t3) / (w1 + w2 + w3) return weighted def _prepare_training_data(self) -> Tuple[np.ndarray, np.ndarray]: """Prepare training data including mass.""" X = np.array([[d['params'][name] for name in DESIGN_VAR_NAMES] for d in self.fea_data]) # Include objectives + mass as outputs Y_list = [] for d in self.fea_data: row = [d['objectives'][name] for name in OBJ_NAMES] mass = d.get('mass_kg') if mass is None: mass = estimate_mass_from_angle(d['params'].get('blank_backface_angle', 4.5)) row.append(mass) Y_list.append(row) Y = np.array(Y_list) return X, Y def tune_surrogate(self) -> SurrogateConfig: """Tune surrogate hyperparameters using Optuna.""" logger.info("\n" + "=" * 70) logger.info("HYPERPARAMETER TUNING") logger.info("=" * 70) # Prepare data (objectives + mass) X, Y = self._prepare_training_data() logger.info(f"Training data: {len(X)} samples") logger.info(f"Outputs: {OUTPUT_NAMES}") logger.info(f"Tuning trials: {self.config.tune_trials}") logger.info(f"CV folds: {self.config.cv_folds}") # Create tuner (4 outputs: 3 objectives + mass) tuner = SurrogateHyperparameterTuner( input_dim=len(DESIGN_VAR_NAMES), output_dim=len(OUTPUT_NAMES), n_trials=self.config.tune_trials, n_cv_folds=self.config.cv_folds ) # Run tuning best_config = tuner.tune(X, Y, output_names=OUTPUT_NAMES) # Store results in state self.state.tuning_completed = True self.state.best_hyperparams = { 'hidden_dims': best_config.hidden_dims, 'dropout': best_config.dropout, 'activation': best_config.activation, 'use_batch_norm': best_config.use_batch_norm, 'learning_rate': best_config.learning_rate, 'weight_decay': best_config.weight_decay, 'batch_size': best_config.batch_size } self.state.cv_r2_scores = best_config.val_r2 # Save tuning results with open(RESULTS_DIR / "tuning_results.json", 'w') as f: json.dump({ 'best_hyperparams': self.state.best_hyperparams, 'cv_r2_scores': self.state.cv_r2_scores, 'val_loss': best_config.val_loss, 'n_trials': self.config.tune_trials, 'n_folds': self.config.cv_folds }, f, indent=2) return best_config def train_ensemble(self, config: SurrogateConfig): """Train ensemble surrogate with tuned hyperparameters.""" logger.info(f"\nTraining ensemble of {self.config.ensemble_size} models...") logger.info(f"Outputs: {OUTPUT_NAMES} (3 objectives + mass)") # Prepare data (objectives + mass) X, Y = self._prepare_training_data() # Create and train ensemble (4 outputs: 3 objectives + mass) self.surrogate = TunedEnsembleSurrogate( config=config, input_dim=len(DESIGN_VAR_NAMES), output_dim=len(OUTPUT_NAMES), n_models=self.config.ensemble_size ) self.surrogate.train(X, Y, val_split=0.2) self.surrogate.save(RESULTS_DIR / "surrogate_ensemble.pt") def run(self, skip_tuning: bool = False): """Run adaptive optimization.""" logger.info("\n" + "=" * 70) logger.info("M1 MIRROR ADAPTIVE SURROGATE OPTIMIZATION V12") logger.info("With Hyperparameter Tuning + Ensemble Models + Mass Constraint") logger.info("=" * 70) logger.info(f"Source FEA data: {len(self.fea_data)} trials") logger.info(f"Mass constraint: < {MASS_LIMIT_KG} kg") logger.info(f"Strategy: {self.config.strategy}") logger.info(f"NN trials/iter: {self.config.surrogate_trials_per_iter}") logger.info(f"FEA batch/iter: {self.config.fea_batch_size}") logger.info(f"Patience: {self.config.patience}") start_time = time.time() # Initialize best from source data if self.fea_data: for d in self.fea_data: weighted = self.compute_weighted_objective(d['objectives']) if weighted < self.state.best_weighted: self.state.best_weighted = weighted self.state.best_40_vs_20 = d['objectives']['rel_filtered_rms_40_vs_20'] self.state.best_60_vs_20 = d['objectives']['rel_filtered_rms_60_vs_20'] self.state.best_mfg = d['objectives']['mfg_90_optician_workload'] self.state.best_params = d['params'] logger.info(f"\nBest from source data:") logger.info(f" 40-20: {self.state.best_40_vs_20:.2f} nm") logger.info(f" 60-20: {self.state.best_60_vs_20:.2f} nm") logger.info(f" Mfg: {self.state.best_mfg:.2f} nm") # Store source FEA data in Optuna self._store_source_fea_in_optuna() # Check minimum samples if len(self.fea_data) < self.config.min_training_samples: logger.error(f"Not enough source data ({len(self.fea_data)}), need {self.config.min_training_samples}") return # Tune hyperparameters or load existing tuning_file = RESULTS_DIR / "tuning_results.json" ensemble_file = RESULTS_DIR / "surrogate_ensemble.pt" if skip_tuning and tuning_file.exists() and ensemble_file.exists(): logger.info("\nLoading existing tuned surrogate...") with open(tuning_file) as f: tuning_results = json.load(f) self.state.best_hyperparams = tuning_results['best_hyperparams'] self.state.cv_r2_scores = tuning_results['cv_r2_scores'] # Load ensemble (including mass output) X, Y = self._prepare_training_data() config = SurrogateConfig( hidden_dims=self.state.best_hyperparams['hidden_dims'], dropout=self.state.best_hyperparams['dropout'], activation=self.state.best_hyperparams['activation'], use_batch_norm=self.state.best_hyperparams['use_batch_norm'], learning_rate=self.state.best_hyperparams['learning_rate'], weight_decay=self.state.best_hyperparams['weight_decay'], batch_size=self.state.best_hyperparams['batch_size'], input_mean=X.mean(axis=0), input_std=X.std(axis=0) + 1e-8, output_mean=Y.mean(axis=0), output_std=Y.std(axis=0) + 1e-8 ) self.surrogate = TunedEnsembleSurrogate( config=config, input_dim=len(DESIGN_VAR_NAMES), output_dim=len(OUTPUT_NAMES), n_models=self.config.ensemble_size ) self.surrogate.load(ensemble_file) else: # Tune and train best_config = self.tune_surrogate() self.train_ensemble(best_config) # Main loop while self.state.iteration < self.config.max_iterations: self.state.iteration += 1 logger.info(f"\n{'=' * 70}") logger.info(f"ITERATION {self.state.iteration}") logger.info(f"FEA: {self.state.total_fea_count}, NN: {self.state.total_nn_count}") logger.info(f"{'=' * 70}") # 1. Surrogate exploration candidates = self._surrogate_exploration() # 2. Select candidates selected = self._select_candidates(candidates) # 3. FEA validation new_results = self._fea_validation(selected) if new_results: # 4. Update state improved = self._update_state(new_results) # 5. Retrain ensemble with new data (including mass) logger.info("\nRetraining ensemble with new data...") X, Y = self._prepare_training_data() # Update normalization stats self.surrogate.config.input_mean = X.mean(axis=0) self.surrogate.config.input_std = X.std(axis=0) + 1e-8 self.surrogate.config.output_mean = Y.mean(axis=0) self.surrogate.config.output_std = Y.std(axis=0) + 1e-8 self.surrogate.train(X, Y, val_split=0.2) self.surrogate.save(RESULTS_DIR / f"surrogate_iter{self.state.iteration}.pt") if not improved: self.state.no_improvement_count += 1 else: self.state.no_improvement_count = 0 else: self.state.no_improvement_count += 1 self._save_state() # Check convergence if self.state.no_improvement_count >= self.config.patience: logger.info(f"\nCONVERGED after {self.config.patience} iterations without improvement") break # Final elapsed = time.time() - start_time self._print_final_results(elapsed) self.fea_runner.cleanup() def _surrogate_exploration(self) -> List[Dict]: """Fast NN exploration using ensemble.""" logger.info(f"\nNN exploration ({self.config.surrogate_trials_per_iter} trials)...") logger.info(f" Mass constraint: < {MASS_LIMIT_KG} kg") candidates = [] def objective(trial: optuna.Trial) -> float: params = {} for var in self.study_config['design_variables']: if var.get('enabled', False): params[var['name']] = trial.suggest_float(var['name'], var['min'], var['max']) # Predict with ensemble (proper uncertainty) # Outputs: [40_vs_20, 60_vs_20, mfg, mass_kg] pred, uncertainty = self.surrogate.predict_single(params, DESIGN_VAR_NAMES) # Map predictions to objective names pred_obj = { 'rel_filtered_rms_40_vs_20': pred['output_0'], 'rel_filtered_rms_60_vs_20': pred['output_1'], 'mfg_90_optician_workload': pred['output_2'] } # Mass is 4th output pred_mass = pred['output_3'] # Compute weighted objective weighted = self.compute_weighted_objective(pred_obj) # Apply mass constraint penalty mass_violation = max(0, pred_mass - MASS_LIMIT_KG) if mass_violation > 0: # Quadratic penalty for constraint violation penalty = MASS_PENALTY_WEIGHT * (mass_violation / MASS_LIMIT_KG) ** 2 weighted += penalty candidates.append({ 'params': params, 'predicted': pred_obj, 'predicted_mass': pred_mass, 'mass_violation': mass_violation, 'weighted': weighted, 'uncertainty': uncertainty }) # Tag as NN trial trial.set_user_attr('source', 'NN') trial.set_user_attr('predicted_40_vs_20', pred_obj['rel_filtered_rms_40_vs_20']) trial.set_user_attr('predicted_60_vs_20', pred_obj['rel_filtered_rms_60_vs_20']) trial.set_user_attr('predicted_mfg', pred_obj['mfg_90_optician_workload']) trial.set_user_attr('predicted_mass', pred_mass) trial.set_user_attr('mass_violation', mass_violation) trial.set_user_attr('uncertainty', uncertainty) return weighted study = optuna.create_study( study_name=f"v12_iter{self.state.iteration}_nn", storage=self.storage, direction='minimize', sampler=TPESampler(n_startup_trials=50, seed=42 + self.state.iteration), load_if_exists=True ) study.optimize(objective, n_trials=self.config.surrogate_trials_per_iter, show_progress_bar=True) self.state.total_nn_count += self.config.surrogate_trials_per_iter logger.info(f" Best predicted: {study.best_value:.4f}") return candidates def _select_candidates(self, candidates: List[Dict]) -> List[Dict]: """Select candidates for FEA validation.""" n = self.config.fea_batch_size if self.config.strategy == 'best': selected = sorted(candidates, key=lambda c: c['weighted'])[:n] elif self.config.strategy == 'uncertain': selected = sorted(candidates, key=lambda c: -c['uncertainty'])[:n] else: # hybrid n_best = int(n * (1 - self.config.exploration_ratio)) n_uncertain = n - n_best sorted_best = sorted(candidates, key=lambda c: c['weighted']) sorted_uncertain = sorted(candidates, key=lambda c: -c['uncertainty']) selected = sorted_best[:n_best] + sorted_uncertain[:n_uncertain] logger.info(f"\nSelected {len(selected)} candidates for FEA") for i, s in enumerate(selected): logger.info(f" {i+1}. pred={s['weighted']:.4f}, unc={s['uncertainty']:.4f}") return selected def _store_source_fea_in_optuna(self): """Store source FEA data in Optuna.""" try: try: existing_study = optuna.load_study( study_name="v12_fea", storage=self.storage ) if len(existing_study.trials) >= len(self.fea_data): logger.info(f"Source FEA data already in Optuna ({len(existing_study.trials)} trials)") return except KeyError: pass logger.info(f"Storing {len(self.fea_data)} source FEA trials in Optuna...") fea_study = optuna.create_study( study_name="v12_fea", storage=self.storage, direction='minimize', load_if_exists=True ) for d in self.fea_data: trial = fea_study.ask() for var in self.study_config['design_variables']: if var.get('enabled', False) and var['name'] in d['params']: trial.suggest_float(var['name'], var['min'], var['max']) trial.params[var['name']] = d['params'][var['name']] weighted = self.compute_weighted_objective(d['objectives']) trial.set_user_attr('source', d.get('source', 'FEA')) trial.set_user_attr('design_vars', d['params']) trial.set_user_attr('rel_filtered_rms_40_vs_20', d['objectives']['rel_filtered_rms_40_vs_20']) trial.set_user_attr('rel_filtered_rms_60_vs_20', d['objectives']['rel_filtered_rms_60_vs_20']) trial.set_user_attr('mfg_90_optician_workload', d['objectives']['mfg_90_optician_workload']) trial.set_user_attr('mass_kg', d.get('mass_kg')) fea_study.tell(trial, weighted) logger.info(f"Stored {len(self.fea_data)} source FEA trials in Optuna") except Exception as e: logger.warning(f"Failed to store source FEA data in Optuna: {e}") def _store_fea_trial_in_optuna(self, result: Dict): """Store FEA trial in Optuna database.""" try: fea_study = optuna.create_study( study_name="v12_fea", storage=self.storage, direction='minimize', load_if_exists=True ) trial = fea_study.ask() for var in self.study_config['design_variables']: if var.get('enabled', False) and var['name'] in result['params']: trial.suggest_float(var['name'], var['min'], var['max']) trial.params[var['name']] = result['params'][var['name']] weighted = self.compute_weighted_objective(result['objectives']) trial.set_user_attr('source', 'FEA') trial.set_user_attr('design_vars', result['params']) trial.set_user_attr('rel_filtered_rms_40_vs_20', result['objectives']['rel_filtered_rms_40_vs_20']) trial.set_user_attr('rel_filtered_rms_60_vs_20', result['objectives']['rel_filtered_rms_60_vs_20']) trial.set_user_attr('mfg_90_optician_workload', result['objectives']['mfg_90_optician_workload']) trial.set_user_attr('mass_kg', result.get('mass_kg')) trial.set_user_attr('solve_time', result.get('solve_time', 0)) fea_study.tell(trial, weighted) mass_str = f", mass={result.get('mass_kg', 0):.1f}kg" if result.get('mass_kg') else "" logger.info(f" [FEA {result['trial_num']}] Stored in Optuna (weighted={weighted:.4f}{mass_str})") except Exception as e: logger.warning(f"Failed to store FEA trial in Optuna: {e}") def _fea_validation(self, candidates: List[Dict]) -> List[Dict]: """Run real FEA on candidates.""" logger.info("\nRunning FEA validation...") new_results = [] for i, cand in enumerate(candidates): trial_num = self.state.total_fea_count + i + 1 result = self.fea_runner.run_fea(cand['params'], trial_num) if result: new_results.append(result) self._store_fea_trial_in_optuna(result) return new_results def _update_state(self, new_results: List[Dict]) -> bool: """Update state with new FEA results.""" self.fea_data.extend(new_results) self.state.total_fea_count = len(self.fea_data) improved = False for result in new_results: weighted = self.compute_weighted_objective(result['objectives']) if weighted < self.state.best_weighted: improvement = self.state.best_weighted - weighted self.state.best_weighted = weighted self.state.best_40_vs_20 = result['objectives']['rel_filtered_rms_40_vs_20'] self.state.best_60_vs_20 = result['objectives']['rel_filtered_rms_60_vs_20'] self.state.best_mfg = result['objectives']['mfg_90_optician_workload'] self.state.best_params = result['params'] if improvement > 0.01: improved = True logger.info(f"\n *** NEW BEST! ***") logger.info(f" 40-20: {self.state.best_40_vs_20:.2f} nm") logger.info(f" 60-20: {self.state.best_60_vs_20:.2f} nm") logger.info(f" Mfg: {self.state.best_mfg:.2f} nm") self.state.history.append({ 'iteration': self.state.iteration, 'fea_count': self.state.total_fea_count, 'nn_count': self.state.total_nn_count, 'best_40_vs_20': self.state.best_40_vs_20, 'best_60_vs_20': self.state.best_60_vs_20, 'best_mfg': self.state.best_mfg, 'best_weighted': self.state.best_weighted, 'improved': improved }) return improved def _save_state(self): """Save state to JSON.""" with open(RESULTS_DIR / "adaptive_state.json", 'w') as f: json.dump({ 'iteration': self.state.iteration, 'total_fea_count': self.state.total_fea_count, 'total_nn_count': self.state.total_nn_count, 'best_40_vs_20': self.state.best_40_vs_20, 'best_60_vs_20': self.state.best_60_vs_20, 'best_mfg': self.state.best_mfg, 'best_weighted': self.state.best_weighted, 'best_params': self.state.best_params, 'tuning_completed': self.state.tuning_completed, 'best_hyperparams': self.state.best_hyperparams, 'cv_r2_scores': self.state.cv_r2_scores, 'history': self.state.history }, f, indent=2) def _print_final_results(self, elapsed: float): """Print final results.""" logger.info("\n" + "=" * 70) logger.info("OPTIMIZATION COMPLETE") logger.info("=" * 70) logger.info(f"Time: {elapsed/60:.1f} min") logger.info(f"Iterations: {self.state.iteration}") logger.info(f"FEA evaluations: {self.state.total_fea_count}") logger.info(f"NN evaluations: {self.state.total_nn_count}") logger.info(f"\nTuned Hyperparameters:") for k, v in self.state.best_hyperparams.items(): logger.info(f" {k}: {v}") logger.info(f"\nCV R² Scores:") for k, v in self.state.cv_r2_scores.items(): logger.info(f" {k}: {v:.4f}") logger.info(f"\nBest FEA-validated:") logger.info(f" 40-20: {self.state.best_40_vs_20:.2f} nm") logger.info(f" 60-20: {self.state.best_60_vs_20:.2f} nm") logger.info(f" Mfg: {self.state.best_mfg:.2f} nm") logger.info(f"\nBest params:") for name, value in sorted(self.state.best_params.items()): logger.info(f" {name}: {value:.4f}") # Save final results with open(RESULTS_DIR / 'final_results.json', 'w') as f: json.dump({ 'summary': { 'iterations': self.state.iteration, 'fea_count': self.state.total_fea_count, 'nn_count': self.state.total_nn_count, 'best_40_vs_20': self.state.best_40_vs_20, 'best_60_vs_20': self.state.best_60_vs_20, 'best_mfg': self.state.best_mfg }, 'best_params': self.state.best_params, 'tuned_hyperparams': self.state.best_hyperparams, 'cv_r2_scores': self.state.cv_r2_scores, 'history': self.state.history }, f, indent=2) # ============================================================================ # Main # ============================================================================ def main(): parser = argparse.ArgumentParser(description='M1 Mirror Adaptive V12 (Tuned)') parser.add_argument('--start', action='store_true', help='Start optimization') parser.add_argument('--tune-only', action='store_true', help='Only tune hyperparameters') parser.add_argument('--skip-tuning', action='store_true', help='Skip tuning, use existing') parser.add_argument('--fea-batch', type=int, default=5, help='FEA per iteration') parser.add_argument('--nn-trials', type=int, default=1000, help='NN trials per iteration') parser.add_argument('--tune-trials', type=int, default=50, help='Optuna trials for tuning') parser.add_argument('--ensemble-size', type=int, default=5, help='Number of ensemble models') parser.add_argument('--patience', type=int, default=5, help='Iterations without improvement') parser.add_argument('--strategy', type=str, default='hybrid', choices=['best', 'uncertain', 'hybrid']) args = parser.parse_args() if not args.start and not args.tune_only: print("M1 Mirror Adaptive Surrogate Optimization V12") print("With Hyperparameter Tuning + Ensemble Models") print("=" * 50) print("\nUsage:") print(" python run_optimization.py --start") print(" python run_optimization.py --start --tune-trials 30 --ensemble-size 3") print(" python run_optimization.py --tune-only # Just tune hyperparameters") print(" python run_optimization.py --start --skip-tuning # Use existing tuning") print("\nThis will:") print(" 1. Load 107 FEA trials from V11 (or fallback to original study)") print(" 2. Tune NN hyperparameters with Optuna (k-fold CV)") print(" 3. Train ensemble surrogate") print(" 4. Run adaptive optimization with FEA validation") return with open(CONFIG_PATH, 'r') as f: study_config = json.load(f) adaptive_config = AdaptiveConfig( surrogate_trials_per_iter=args.nn_trials, fea_batch_size=args.fea_batch, patience=args.patience, strategy=args.strategy, tune_trials=args.tune_trials, ensemble_size=args.ensemble_size ) optimizer = TunedAdaptiveOptimizer(adaptive_config, study_config) if args.tune_only: optimizer.tune_surrogate() logger.info("\nTuning complete. Run with --start to begin optimization.") else: optimizer.run(skip_tuning=args.skip_tuning) if __name__ == "__main__": main()