#!/usr/bin/env python3 """ M1 Mirror SAT3_Trajectory - Trajectory-Based Optimization (TPE) ================================================================ First implementation of Zernike Trajectory Method for M1 mirror optimization. Key Features: 1. TPE sampler - 100 trials, 15 startup 2. Trajectory analysis across 5 angles (20°, 30°, 40°, 50°, 60°) 3. Primary objective: total_filtered_rms_nm (integrated RMS across operating range) 4. Logged objectives (weight=0): coma_rms_nm, astigmatism_rms_nm, trefoil_rms_nm, spherical_rms_nm 5. Full wiffle tree + lateral support parameters (11 design variables) 6. OPD correction with focal_length=22000mm and annular aperture (inner_radius=135.75mm) Usage: python run_optimization.py --start python run_optimization.py --start --trials 100 python run_optimization.py --start --trials 100 --resume python run_optimization.py --test # Single trial test Author: Atomizer Created: 2026-01-29 """ import sys import os import subprocess LICENSE_SERVER = "28000@dalidou;28000@100.80.199.40" os.environ['SPLM_LICENSE_SERVER'] = LICENSE_SERVER print(f"[LICENSE] SPLM_LICENSE_SERVER set to: {LICENSE_SERVER}") # Add Atomizer root to path STUDY_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(STUDY_DIR))) sys.path.insert(0, PROJECT_ROOT) import json import time import argparse import logging import shutil import re from pathlib import Path from typing import Dict, Optional, Any from datetime import datetime # ============================================================================ # Dashboard Auto-Launch # ============================================================================ def launch_dashboard(): """Launch the Atomizer dashboard in background.""" dashboard_dir = Path(PROJECT_ROOT) / "atomizer-dashboard" start_script = dashboard_dir / "start-dashboard.bat" if not start_script.exists(): print(f"[DASHBOARD] Warning: start-dashboard.bat not found at {start_script}") return False try: subprocess.Popen( ["cmd", "/c", "start", "/min", str(start_script)], cwd=str(dashboard_dir), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print("[DASHBOARD] Launched in background") print("[DASHBOARD] Frontend: http://localhost:5173") print("[DASHBOARD] Backend: http://localhost:8000") return True except Exception as e: print(f"[DASHBOARD] Failed to launch: {e}") return False import optuna from optuna.samplers import TPESampler # Atomizer imports from optimization_engine.nx.solver import NXSolver from optimization_engine.extractors.extract_zernike_trajectory import extract_zernike_trajectory # ============================================================================ # Paths # ============================================================================ STUDY_DIR = Path(__file__).parent SETUP_DIR = STUDY_DIR / "1_setup" MODEL_DIR = SETUP_DIR / "model" ITERATIONS_DIR = STUDY_DIR / "2_iterations" RESULTS_DIR = STUDY_DIR / "3_results" CONFIG_PATH = SETUP_DIR / "optimization_config.json" # Ensure directories exist ITERATIONS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) # Logging LOG_FILE = RESULTS_DIR / "optimization.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, mode='a') ] ) logger = logging.getLogger(__name__) # ============================================================================ # Configuration # ============================================================================ with open(CONFIG_PATH) as f: CONFIG = json.load(f) STUDY_NAME = CONFIG["study_name"] # Primary objective is total_filtered_rms_nm (weight=1.0) # All other objectives are logged only (weight=0) def compute_weighted_sum(objectives: Dict[str, float]) -> float: """Compute weighted sum - only total_filtered_rms_nm has weight=1.0.""" return objectives.get('total_filtered_rms_nm', 1000.0) # Hard constraint: blank_mass <= 120kg MAX_BLANK_MASS_KG = 120.0 CONSTRAINT_PENALTY = 1e6 # Trajectory settings REFERENCE_ANGLE = CONFIG['extraction_method'].get('reference_angle', 20.0) FOCAL_LENGTH = CONFIG['extraction_method'].get('focal_length', 22000.0) INNER_RADIUS_MM = CONFIG['extraction_method'].get('inner_radius', 135.75) def check_mass_constraint(mass_kg: float) -> tuple: """Check if mass constraint is satisfied.""" if mass_kg <= MAX_BLANK_MASS_KG: return True, 0.0 else: return False, mass_kg - MAX_BLANK_MASS_KG # ============================================================================ # FEA Runner with Trajectory Extraction # ============================================================================ class FEARunner: """Runs FEA simulations with Zernike Trajectory extraction.""" def __init__(self, config: Dict[str, Any]): self.config = config self.nx_solver = None self.master_model_dir = MODEL_DIR # Get fixed parameter values self.fixed_params = {} for fp in config.get('fixed_parameters', []): self.fixed_params[fp['name']] = fp['value'] def setup(self): """Setup NX solver (assumes NX is already running).""" study_name = self.config.get('study_name', 'SAT3_Trajectory') nx_settings = self.config.get('nx_settings', {}) nx_install_dir = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\DesigncenterNX2512') version_match = re.search(r'NX(\d+)|DesigncenterNX(\d+)', nx_install_dir) nastran_version = (version_match.group(1) or version_match.group(2)) if version_match else "2512" self.nx_solver = NXSolver( master_model_dir=str(self.master_model_dir), nx_install_dir=nx_install_dir, nastran_version=nastran_version, timeout=nx_settings.get('simulation_timeout_s', 600), use_iteration_folders=True, study_name=study_name ) logger.info(f"[NX] Solver ready (Nastran {nastran_version})") def run_fea(self, params: Dict[str, float], trial_num: int) -> Optional[Dict]: """Run FEA and extract objectives using Zernike Trajectory Method.""" if self.nx_solver is None: self.setup() logger.info(f" [FEA {trial_num}] Running simulation...") # Build expressions expressions = {} for name, value in self.fixed_params.items(): expressions[name] = value for var in self.config['design_variables']: if var.get('enabled', True) and var['name'] in params: expressions[var['expression_name']] = params[var['name']] iter_folder = self.nx_solver.create_iteration_folder( iterations_base_dir=ITERATIONS_DIR, iteration_number=trial_num, expression_updates=expressions ) try: nx_settings = self.config.get('nx_settings', {}) sim_file = iter_folder / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim') t_start = time.time() result = self.nx_solver.run_simulation( sim_file=sim_file, working_dir=iter_folder, expression_updates=expressions, solution_name=nx_settings.get('solution_name', 'Solution 1'), cleanup=False ) solve_time = time.time() - t_start if not result['success']: logger.error(f" [FEA {trial_num}] Solve failed: {result.get('error')}") return None logger.info(f" [FEA {trial_num}] Solved in {solve_time:.1f}s") # Extract objectives using Zernike Trajectory Method op2_path = Path(result['op2_file']) objectives = self._extract_objectives_trajectory(op2_path, iter_folder) if objectives is None: return None # Check constraint mass_kg = objectives['mass_kg'] is_feasible, violation = check_mass_constraint(mass_kg) if is_feasible: weighted_sum = compute_weighted_sum(objectives) constraint_status = "OK" else: weighted_sum = compute_weighted_sum(objectives) + CONSTRAINT_PENALTY * violation constraint_status = f"VIOLATED (+{violation:.1f}kg)" logger.info(f" [FEA {trial_num}] Total RMS: {objectives['total_filtered_rms_nm']:.2f} nm (PRIMARY)") logger.info(f" [FEA {trial_num}] Coma RMS: {objectives['coma_rms_nm']:.2f} nm (logged)") logger.info(f" [FEA {trial_num}] Astig RMS: {objectives['astigmatism_rms_nm']:.2f} nm (logged)") logger.info(f" [FEA {trial_num}] Trefoil RMS: {objectives['trefoil_rms_nm']:.2f} nm (logged)") logger.info(f" [FEA {trial_num}] Spher RMS: {objectives['spherical_rms_nm']:.2f} nm (logged)") logger.info(f" [FEA {trial_num}] R² fit: {objectives['linear_fit_r2']:.4f} (physics validation)") logger.info(f" [FEA {trial_num}] Mass: {objectives['mass_kg']:.3f} kg [Constraint: {constraint_status}]") logger.info(f" [FEA {trial_num}] Weighted Sum: {weighted_sum:.2f}") return { 'trial_num': trial_num, 'params': params, 'objectives': objectives, 'weighted_sum': weighted_sum, 'is_feasible': is_feasible, 'constraint_violation': violation, 'source': 'FEA_ZernikeTrajectory', 'solve_time': solve_time, 'iter_folder': str(iter_folder) } except Exception as e: logger.error(f" [FEA {trial_num}] Error: {e}") import traceback traceback.print_exc() return None def _extract_objectives_trajectory(self, op2_path: Path, iter_folder: Path) -> Optional[Dict]: """ Extract objectives using Zernike Trajectory Method. Analyzes 5 elevation angles (20, 30, 40, 50, 60 deg) and provides: - total_filtered_rms_nm: Integrated RMS across operating range - Mode-specific RMS: coma, astigmatism, trefoil, spherical - linear_fit_r2: Physics model validation """ try: # Run trajectory extraction result = extract_zernike_trajectory( op2_file=op2_path, reference_angle=REFERENCE_ANGLE, focal_length=FOCAL_LENGTH, unit='mm' ) # Extract mass from temp file mass_kg = 0.0 mass_file = iter_folder / "_temp_mass.txt" if mass_file.exists(): try: with open(mass_file, 'r') as f: mass_kg = float(f.read().strip()) except Exception as mass_err: logger.warning(f" Could not read mass file: {mass_err}") if mass_kg == 0: props_file = iter_folder / "_temp_part_properties.json" if props_file.exists(): try: with open(props_file, 'r') as f: props = json.load(f) mass_kg = props.get('mass_kg', 0) except Exception: pass objectives = { 'total_filtered_rms_nm': result['total_filtered_rms_nm'], 'coma_rms_nm': result['coma_rms_nm'], 'astigmatism_rms_nm': result['astigmatism_rms_nm'], 'trefoil_rms_nm': result['trefoil_rms_nm'], 'spherical_rms_nm': result['spherical_rms_nm'], 'linear_fit_r2': result['linear_fit_r2'], 'mass_kg': mass_kg } return objectives except Exception as e: logger.error(f"Trajectory extraction failed: {e}") import traceback traceback.print_exc() return None # ============================================================================ # TPE Optimizer # ============================================================================ class TPEOptimizer: """TPE optimizer for trajectory-based optimization.""" def __init__(self, config: Dict[str, Any], resume: bool = False): self.config = config self.resume = resume self.fea_runner = FEARunner(config) # Load design variable bounds self.design_vars = { v['name']: {'min': v['min'], 'max': v['max'], 'baseline': v.get('baseline')} for v in config['design_variables'] if v.get('enabled', True) } # TPE settings opt_settings = config.get('optimization', {}) self.n_startup_trials = opt_settings.get('n_startup_trials', 15) self.seed = opt_settings.get('seed', 42) # Study self.study_name = config.get('study_name', 'SAT3_Trajectory') self.db_path = RESULTS_DIR / "study.db" # Track best self.best_weighted_sum = float('inf') self.best_trial_info = None # Track FEA count self._count_existing_iterations() def _count_existing_iterations(self): """Count existing iteration folders.""" self.fea_count = 0 if ITERATIONS_DIR.exists(): for d in ITERATIONS_DIR.iterdir(): if d.is_dir() and d.name.startswith('iter'): self.fea_count += 1 logger.info(f"[INIT] Found {self.fea_count} existing FEA runs") def objective_function(self, trial: optuna.Trial) -> float: """Optuna objective function.""" # Sample parameters params = {} for name, bounds in self.design_vars.items(): params[name] = trial.suggest_float(name, bounds['min'], bounds['max']) # Run FEA self.fea_count += 1 result = self.fea_runner.run_fea(params, self.fea_count) if result is None: raise optuna.TrialPruned() # Log all objectives (TPE only optimizes the return value, but we want all logged) for obj_name, obj_value in result['objectives'].items(): trial.set_user_attr(obj_name, obj_value) trial.set_user_attr('is_feasible', result['is_feasible']) trial.set_user_attr('solve_time', result['solve_time']) trial.set_user_attr('source', result['source']) # Track best if result['weighted_sum'] < self.best_weighted_sum: self.best_weighted_sum = result['weighted_sum'] self.best_trial_info = result logger.info(f" [NEW BEST] Trial {result['trial_num']}: {result['weighted_sum']:.2f}") return result['weighted_sum'] def run(self, n_trials: int = 100): """Run TPE optimization.""" logger.info("="*80) logger.info(f"Starting TPE Trajectory Optimization: {self.study_name}") logger.info(f"Design Variables: {len(self.design_vars)}") logger.info(f"n_trials: {n_trials}, n_startup_trials: {self.n_startup_trials}") logger.info("="*80) # Create or load study storage = f"sqlite:///{self.db_path}" if self.resume: logger.info(f"[RESUME] Loading existing study from {self.db_path}") study = optuna.load_study(study_name=self.study_name, storage=storage) else: logger.info(f"[NEW] Creating new study at {self.db_path}") sampler = TPESampler( n_startup_trials=self.n_startup_trials, seed=self.seed ) study = optuna.create_study( study_name=self.study_name, storage=storage, sampler=sampler, direction='minimize', load_if_exists=False ) # Run optimization study.optimize(self.objective_function, n_trials=n_trials) logger.info("="*80) logger.info(f"Optimization Complete!") logger.info(f"Best weighted sum: {self.best_weighted_sum:.2f}") if self.best_trial_info: logger.info(f"Best objectives:") for k, v in self.best_trial_info['objectives'].items(): logger.info(f" {k}: {v:.3f}") logger.info("="*80) # ============================================================================ # Main Entry Point # ============================================================================ def main(): parser = argparse.ArgumentParser(description="SAT3 Trajectory Optimization (TPE)") parser.add_argument('--start', action='store_true', help='Start optimization') parser.add_argument('--trials', type=int, default=100, help='Number of trials (default: 100)') parser.add_argument('--resume', action='store_true', help='Resume existing study') parser.add_argument('--test', action='store_true', help='Run single FEA test') parser.add_argument('--no-dashboard', action='store_true', help="Don't auto-launch dashboard") args = parser.parse_args() if args.start: # Launch dashboard unless disabled if not args.no_dashboard: launch_dashboard() time.sleep(2) optimizer = TPEOptimizer(CONFIG, resume=args.resume) optimizer.run(n_trials=args.trials) elif args.test: logger.info("[TEST] Running single FEA trial...") runner = FEARunner(CONFIG) # Use baseline values params = {v['name']: v['baseline'] for v in CONFIG['design_variables'] if v.get('enabled', True)} result = runner.run_fea(params, trial_num=0) if result: logger.info("[TEST] Success!") else: logger.error("[TEST] Failed") sys.exit(1) else: parser.print_help() if __name__ == '__main__': main()