#!/usr/bin/env python3 """ M1 Mirror NSGA-II Multi-Objective Optimization V15 =================================================== NSGA-II multi-objective optimization to explore Pareto trade-offs. Seeds from all V14 trials (785 including V11-V13 seeds). Key Features: 1. NSGA-II sampler - multi-objective genetic algorithm 2. Seeds from all V14 FEA trials (~764 valid trials) 3. Three separate objectives (no weighted sum for optimization) 4. Returns Pareto front for trade-off analysis Objectives: - Objective 1: 40° vs 20° tracking error (minimize) - Objective 2: 60° vs 20° tracking error (minimize) - Objective 3: Manufacturing optician workload (minimize) Usage: python run_optimization.py --start python run_optimization.py --start --trials 50 python run_optimization.py --start --trials 50 --resume Author: Atomizer Created: 2025-12-12 """ import sys import os import json import time import argparse import logging import sqlite3 import shutil import re from pathlib import Path from typing import Dict, List, Tuple, Optional, Any from datetime import datetime # Add parent directories to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) import optuna from optuna.samplers import NSGAIISampler # Atomizer imports from optimization_engine.nx_solver import NXSolver from optimization_engine.utils import ensure_nx_running from optimization_engine.extractors import ZernikeExtractor # ============================================================================ # Paths # ============================================================================ STUDY_DIR = Path(__file__).parent SETUP_DIR = STUDY_DIR / "1_setup" ITERATIONS_DIR = STUDY_DIR / "2_iterations" RESULTS_DIR = STUDY_DIR / "3_results" CONFIG_PATH = SETUP_DIR / "optimization_config.json" # Source study for seeding V14_DB = STUDY_DIR.parent / "m1_mirror_adaptive_V14" / "3_results" / "study.db" # Ensure directories exist ITERATIONS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) # Logging LOG_FILE = RESULTS_DIR / "optimization.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, mode='a') ] ) logger = logging.getLogger(__name__) # ============================================================================ # Objective names (for NSGA-II, no weights - each is separate) # ============================================================================ OBJ_NAMES = [ 'rel_filtered_rms_40_vs_20', 'rel_filtered_rms_60_vs_20', 'mfg_90_optician_workload' ] # Weights only for reference/comparison (not used in optimization) OBJ_WEIGHTS = { 'rel_filtered_rms_40_vs_20': 5.0, 'rel_filtered_rms_60_vs_20': 5.0, 'mfg_90_optician_workload': 1.0 } DESIGN_VAR_NAMES = [ 'lateral_inner_angle', 'lateral_outer_angle', 'lateral_outer_pivot', 'lateral_inner_pivot', 'lateral_middle_pivot', 'lateral_closeness', 'whiffle_min', 'whiffle_outer_to_vertical', 'whiffle_triangle_closeness', 'blank_backface_angle', 'inner_circular_rib_dia' ] def compute_weighted_sum(objectives: Dict[str, float]) -> float: """Compute weighted sum of objectives (for reference only).""" total = 0.0 for name, weight in OBJ_WEIGHTS.items(): total += weight * objectives.get(name, 1000.0) return total # ============================================================================ # Prior Data Loader # ============================================================================ def load_trials_from_v14() -> List[Dict]: """Load all valid trials from V14 database.""" if not V14_DB.exists(): logger.warning(f"V14 database not found: {V14_DB}") return [] all_data = [] conn = sqlite3.connect(str(V14_DB)) try: cursor = conn.cursor() cursor.execute(''' SELECT trial_id, number FROM trials WHERE state = 'COMPLETE' ''') trials = cursor.fetchall() for trial_id, trial_num in trials: # Get user attributes cursor.execute(''' SELECT key, value_json FROM trial_user_attributes WHERE trial_id = ? ''', (trial_id,)) attrs = {row[0]: json.loads(row[1]) for row in cursor.fetchall()} # Get objectives from user attributes obj_40 = attrs.get('rel_filtered_rms_40_vs_20') obj_60 = attrs.get('rel_filtered_rms_60_vs_20') obj_mfg = attrs.get('mfg_90_optician_workload') if obj_40 is None or obj_60 is None or obj_mfg is None: continue # Skip invalid trials if obj_40 > 1000 or obj_60 > 1000 or obj_mfg > 1000: continue # Get params cursor.execute(''' SELECT param_name, param_value FROM trial_params WHERE trial_id = ? ''', (trial_id,)) params = {row[0]: float(row[1]) for row in cursor.fetchall()} if len(params) < len(DESIGN_VAR_NAMES): continue # Missing parameters source = attrs.get('source', 'unknown') all_data.append({ 'trial_num': trial_num, 'params': params, 'objectives': { 'rel_filtered_rms_40_vs_20': obj_40, 'rel_filtered_rms_60_vs_20': obj_60, 'mfg_90_optician_workload': obj_mfg }, 'source': f'V14_trial_{trial_num}' if source != 'FEA' else f'V14_FEA_{trial_num}' }) logger.info(f"Loaded {len(all_data)} valid trials from V14") finally: conn.close() return all_data # ============================================================================ # FEA Runner # ============================================================================ class FEARunner: """Runs actual FEA simulations.""" def __init__(self, config: Dict[str, Any]): self.config = config self.nx_solver = None self.nx_manager = None self.master_model_dir = SETUP_DIR / "model" def setup(self): """Setup NX and solver.""" logger.info("Setting up NX session...") study_name = self.config.get('study_name', 'm1_mirror_adaptive_V15') try: self.nx_manager, nx_was_started = ensure_nx_running( session_id=study_name, auto_start=True, start_timeout=120 ) logger.info("NX session ready" + (" (started)" if nx_was_started else " (existing)")) except Exception as e: logger.error(f"Failed to setup NX: {e}") raise # Initialize solver nx_settings = self.config.get('nx_settings', {}) nx_install_dir = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506') version_match = re.search(r'NX(\d+)', nx_install_dir) nastran_version = version_match.group(1) if version_match else "2506" self.nx_solver = NXSolver( master_model_dir=str(self.master_model_dir), nx_install_dir=nx_install_dir, nastran_version=nastran_version, timeout=nx_settings.get('simulation_timeout_s', 600), use_iteration_folders=True, study_name="m1_mirror_adaptive_V15" ) def run_fea(self, params: Dict[str, float], trial_num: int) -> Optional[Dict]: """Run FEA and extract objectives.""" if self.nx_solver is None: self.setup() logger.info(f" [FEA {trial_num}] Running simulation...") expressions = {var['expression_name']: params[var['name']] for var in self.config['design_variables']} iter_folder = self.nx_solver.create_iteration_folder( iterations_base_dir=ITERATIONS_DIR, iteration_number=trial_num, expression_updates=expressions ) try: nx_settings = self.config.get('nx_settings', {}) sim_file = iter_folder / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim') t_start = time.time() result = self.nx_solver.run_simulation( sim_file=sim_file, working_dir=iter_folder, expression_updates=expressions, solution_name=nx_settings.get('solution_name', 'Solution 1'), cleanup=False ) solve_time = time.time() - t_start if not result['success']: logger.error(f" [FEA {trial_num}] Solve failed: {result.get('error')}") return None logger.info(f" [FEA {trial_num}] Solved in {solve_time:.1f}s") # Extract objectives op2_path = Path(result['op2_file']) objectives = self._extract_objectives(op2_path) if objectives is None: return None weighted_sum = compute_weighted_sum(objectives) logger.info(f" [FEA {trial_num}] 40-20: {objectives['rel_filtered_rms_40_vs_20']:.2f} nm") logger.info(f" [FEA {trial_num}] 60-20: {objectives['rel_filtered_rms_60_vs_20']:.2f} nm") logger.info(f" [FEA {trial_num}] Mfg: {objectives['mfg_90_optician_workload']:.2f} nm") logger.info(f" [FEA {trial_num}] Weighted Sum: {weighted_sum:.2f}") return { 'trial_num': trial_num, 'params': params, 'objectives': objectives, 'weighted_sum': weighted_sum, 'source': 'FEA', 'solve_time': solve_time } except Exception as e: logger.error(f" [FEA {trial_num}] Error: {e}") import traceback traceback.print_exc() return None def _extract_objectives(self, op2_path: Path) -> Optional[Dict[str, float]]: """Extract objectives using ZernikeExtractor.""" try: zernike_settings = self.config.get('zernike_settings', {}) extractor = ZernikeExtractor( op2_path, bdf_path=None, displacement_unit=zernike_settings.get('displacement_unit', 'mm'), n_modes=zernike_settings.get('n_modes', 50), filter_orders=zernike_settings.get('filter_low_orders', 4) ) ref = zernike_settings.get('reference_subcase', '2') rel_40 = extractor.extract_relative("3", ref) rel_60 = extractor.extract_relative("4", ref) rel_90 = extractor.extract_relative("1", ref) return { 'rel_filtered_rms_40_vs_20': rel_40['relative_filtered_rms_nm'], 'rel_filtered_rms_60_vs_20': rel_60['relative_filtered_rms_nm'], 'mfg_90_optician_workload': rel_90['relative_rms_filter_j1to3'] } except Exception as e: logger.error(f"Zernike extraction failed: {e}") return None def cleanup(self): """Cleanup NX session.""" pass # ============================================================================ # NSGA-II Optimizer # ============================================================================ class NSGAII_Optimizer: """NSGA-II multi-objective optimizer.""" def __init__(self, config: Dict[str, Any], resume: bool = False): self.config = config self.resume = resume self.fea_runner = FEARunner(config) # Load design variable bounds self.design_vars = { v['name']: {'min': v['min'], 'max': v['max']} for v in config['design_variables'] if v.get('enabled', True) } # NSGA-II settings opt_settings = config.get('optimization', {}) self.population_size = opt_settings.get('population_size', 50) self.seed = opt_settings.get('seed', 42) # Study self.study_name = config.get('study_name', 'm1_mirror_adaptive_V15') self.db_path = RESULTS_DIR / "study.db" # Track FEA iteration count self._count_existing_iterations() def _count_existing_iterations(self): """Count existing iteration folders.""" self.fea_count = 0 if ITERATIONS_DIR.exists(): for d in ITERATIONS_DIR.iterdir(): if d.is_dir() and d.name.startswith('iter'): try: num = int(d.name.replace('iter', '')) self.fea_count = max(self.fea_count, num) except ValueError: pass logger.info(f"Existing FEA iterations: {self.fea_count}") def create_study(self) -> optuna.Study: """Create or load Optuna study with NSGA-II sampler.""" sampler = NSGAIISampler( population_size=self.population_size, seed=self.seed ) storage = f"sqlite:///{self.db_path}" if self.resume: try: study = optuna.load_study( study_name=self.study_name, storage=storage, sampler=sampler ) logger.info(f"Resumed study with {len(study.trials)} existing trials") return study except KeyError: logger.info("No existing study found, creating new one") # Create new multi-objective study study = optuna.create_study( study_name=self.study_name, storage=storage, sampler=sampler, directions=["minimize", "minimize", "minimize"], # 3 objectives load_if_exists=True ) # Seed from V14 if new study if len(study.trials) == 0: self._seed_from_v14(study) return study def _seed_from_v14(self, study: optuna.Study): """Seed study from V14 data.""" prior_data = load_trials_from_v14() if not prior_data: logger.warning("No prior data to seed") return logger.info(f"Seeding {len(prior_data)} trials from V14...") seeded = 0 for data in prior_data: try: # Create frozen trial distributions = { name: optuna.distributions.FloatDistribution(bounds['min'], bounds['max']) for name, bounds in self.design_vars.items() } # Create trial with parameters trial = optuna.trial.create_trial( params=data['params'], distributions=distributions, values=[ data['objectives']['rel_filtered_rms_40_vs_20'], data['objectives']['rel_filtered_rms_60_vs_20'], data['objectives']['mfg_90_optician_workload'] ], user_attrs={ 'source': data['source'], 'rel_filtered_rms_40_vs_20': data['objectives']['rel_filtered_rms_40_vs_20'], 'rel_filtered_rms_60_vs_20': data['objectives']['rel_filtered_rms_60_vs_20'], 'mfg_90_optician_workload': data['objectives']['mfg_90_optician_workload'], 'weighted_sum': compute_weighted_sum(data['objectives']) } ) study.add_trial(trial) seeded += 1 except Exception as e: logger.debug(f"Failed to seed trial: {e}") continue logger.info(f"Seeded {seeded} trials from V14") def objective(self, trial: optuna.Trial) -> Tuple[float, float, float]: """NSGA-II objective function.""" # Sample parameters params = {} for name, bounds in self.design_vars.items(): params[name] = trial.suggest_float(name, bounds['min'], bounds['max']) # Increment FEA counter self.fea_count += 1 iter_num = self.fea_count logger.info(f"Trial {trial.number} -> iter{iter_num}") # Run FEA result = self.fea_runner.run_fea(params, iter_num) if result is None: # Failed trial - return high values trial.set_user_attr('source', 'FEA_FAILED') trial.set_user_attr('iter_num', iter_num) return (1e6, 1e6, 1e6) # Store metadata trial.set_user_attr('source', 'FEA') trial.set_user_attr('iter_num', iter_num) trial.set_user_attr('rel_filtered_rms_40_vs_20', result['objectives']['rel_filtered_rms_40_vs_20']) trial.set_user_attr('rel_filtered_rms_60_vs_20', result['objectives']['rel_filtered_rms_60_vs_20']) trial.set_user_attr('mfg_90_optician_workload', result['objectives']['mfg_90_optician_workload']) trial.set_user_attr('weighted_sum', result['weighted_sum']) trial.set_user_attr('solve_time', result['solve_time']) return ( result['objectives']['rel_filtered_rms_40_vs_20'], result['objectives']['rel_filtered_rms_60_vs_20'], result['objectives']['mfg_90_optician_workload'] ) def run(self, n_trials: int): """Run NSGA-II optimization.""" study = self.create_study() # Count existing FEA trials fea_before = sum(1 for t in study.trials if t.user_attrs.get('source') == 'FEA') logger.info("=" * 70) logger.info("NSGA-II MULTI-OBJECTIVE OPTIMIZATION") logger.info("=" * 70) logger.info(f"Study: {self.study_name}") logger.info(f"Total trials in DB: {len(study.trials)}") logger.info(f"Existing FEA trials: {fea_before}") logger.info(f"New FEA trials to run: {n_trials}") logger.info("=" * 70) try: study.optimize( self.objective, n_trials=n_trials, show_progress_bar=True, gc_after_trial=True ) except KeyboardInterrupt: logger.info("Optimization interrupted by user") # Report Pareto front self._report_pareto(study) # Archive best design self._archive_best_design(study) return study def _report_pareto(self, study: optuna.Study): """Report Pareto front results.""" pareto_trials = study.best_trials logger.info("\n" + "=" * 70) logger.info(f"PARETO FRONT: {len(pareto_trials)} non-dominated solutions") logger.info("=" * 70) print(f"\n{'Trial':>6} | {'40vs20':>10} | {'60vs20':>10} | {'MFG':>10} | {'WS':>10} | Source") print("-" * 75) # Sort by weighted sum for display sorted_pareto = sorted(pareto_trials, key=lambda t: 5*t.values[0] + 5*t.values[1] + 1*t.values[2] ) for t in sorted_pareto[:20]: source = t.user_attrs.get('source', 'unknown')[:12] ws = 5*t.values[0] + 5*t.values[1] + 1*t.values[2] print(f"{t.number:>6} | {t.values[0]:>10.2f} | {t.values[1]:>10.2f} | {t.values[2]:>10.2f} | {ws:>10.2f} | {source}") # Save Pareto front to JSON pareto_data = [] for t in pareto_trials: pareto_data.append({ "trial_number": t.number, "objectives": { "rel_filtered_rms_40_vs_20": t.values[0], "rel_filtered_rms_60_vs_20": t.values[1], "mfg_90_optician_workload": t.values[2] }, "weighted_sum": 5*t.values[0] + 5*t.values[1] + 1*t.values[2], "params": dict(t.params), "source": t.user_attrs.get('source', 'unknown'), "iter_num": t.user_attrs.get('iter_num') }) pareto_file = RESULTS_DIR / "pareto_front.json" with open(pareto_file, "w") as f: json.dump(pareto_data, f, indent=2) logger.info(f"\nPareto front saved to: {pareto_file}") # Summary stats if pareto_data: best_40 = min(pareto_data, key=lambda x: x['objectives']['rel_filtered_rms_40_vs_20']) best_60 = min(pareto_data, key=lambda x: x['objectives']['rel_filtered_rms_60_vs_20']) best_mfg = min(pareto_data, key=lambda x: x['objectives']['mfg_90_optician_workload']) best_ws = min(pareto_data, key=lambda x: x['weighted_sum']) logger.info("\nPARETO EXTREMES:") logger.info(f" Best 40vs20: Trial #{best_40['trial_number']} = {best_40['objectives']['rel_filtered_rms_40_vs_20']:.2f} nm") logger.info(f" Best 60vs20: Trial #{best_60['trial_number']} = {best_60['objectives']['rel_filtered_rms_60_vs_20']:.2f} nm") logger.info(f" Best MFG: Trial #{best_mfg['trial_number']} = {best_mfg['objectives']['mfg_90_optician_workload']:.2f} nm") logger.info(f" Best WS: Trial #{best_ws['trial_number']} = {best_ws['weighted_sum']:.2f}") def _archive_best_design(self, study: optuna.Study): """Archive best design (lowest weighted sum from Pareto).""" try: tools_dir = Path(__file__).parent.parent.parent / "tools" sys.path.insert(0, str(tools_dir)) from archive_best_design import archive_best_design as archive_fn result = archive_fn(str(STUDY_DIR)) if result.get("success"): logger.info(f"Archived best design to: {result['archive_path']}") else: logger.info(f"Archive skipped: {result.get('reason', 'unknown')}") except Exception as e: logger.warning(f"Could not archive best design: {e}") # ============================================================================ # Main # ============================================================================ def main(): parser = argparse.ArgumentParser(description="M1 Mirror V15 NSGA-II Optimization") parser.add_argument("--start", action="store_true", help="Start optimization") parser.add_argument("--trials", type=int, default=100, help="Number of FEA trials") parser.add_argument("--resume", action="store_true", help="Resume interrupted run") parser.add_argument("--test", action="store_true", help="Run single test trial") args = parser.parse_args() if not args.start and not args.test: parser.print_help() print("\nUse --start to begin optimization or --test for single trial") return # Load config if not CONFIG_PATH.exists(): print(f"Error: Config not found at {CONFIG_PATH}") sys.exit(1) with open(CONFIG_PATH) as f: config = json.load(f) # Create optimizer optimizer = NSGAII_Optimizer(config, resume=args.resume) # Run n_trials = 1 if args.test else args.trials optimizer.run(n_trials=n_trials) if __name__ == "__main__": main()