docs: Complete M1 mirror optimization campaign V11-V15
## M1 Mirror Campaign Summary - V11-V15 optimization campaign completed (~1,400 FEA evaluations) - Best design: V14 Trial #725 with Weighted Sum = 121.72 - V15 NSGA-II confirmed V14 TPE found optimal solution - Campaign improved from WS=129.33 (V11) to WS=121.72 (V14): -5.9% ## Key Results - 40° tracking: 5.99 nm (target 4.0 nm) - 60° tracking: 13.10 nm (target 10.0 nm) - Manufacturing: 26.28 nm (target 20.0 nm) - Targets not achievable within current design space ## Documentation Added - V15 STUDY_REPORT.md: Detailed NSGA-II results analysis - M1_MIRROR_CAMPAIGN_SUMMARY.md: Full V11-V15 campaign overview - Updated CLAUDE.md, ATOMIZER_CONTEXT.md with NXSolver patterns - Updated 01_CHEATSHEET.md with --resume guidance - Updated OP_01_CREATE_STUDY.md with FEARunner template ## Studies Added - m1_mirror_adaptive_V13: TPE validation (291 trials) - m1_mirror_adaptive_V14: TPE intensive (785 trials, BEST) - m1_mirror_adaptive_V15: NSGA-II exploration (126 new FEA) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
639
studies/m1_mirror_adaptive_V15/run_optimization.py
Normal file
639
studies/m1_mirror_adaptive_V15/run_optimization.py
Normal file
@@ -0,0 +1,639 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
M1 Mirror NSGA-II Multi-Objective Optimization V15
|
||||
===================================================
|
||||
|
||||
NSGA-II multi-objective optimization to explore Pareto trade-offs.
|
||||
Seeds from all V14 trials (785 including V11-V13 seeds).
|
||||
|
||||
Key Features:
|
||||
1. NSGA-II sampler - multi-objective genetic algorithm
|
||||
2. Seeds from all V14 FEA trials (~764 valid trials)
|
||||
3. Three separate objectives (no weighted sum for optimization)
|
||||
4. Returns Pareto front for trade-off analysis
|
||||
|
||||
Objectives:
|
||||
- Objective 1: 40° vs 20° tracking error (minimize)
|
||||
- Objective 2: 60° vs 20° tracking error (minimize)
|
||||
- Objective 3: Manufacturing optician workload (minimize)
|
||||
|
||||
Usage:
|
||||
python run_optimization.py --start
|
||||
python run_optimization.py --start --trials 50
|
||||
python run_optimization.py --start --trials 50 --resume
|
||||
|
||||
Author: Atomizer
|
||||
Created: 2025-12-12
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
import argparse
|
||||
import logging
|
||||
import sqlite3
|
||||
import shutil
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Tuple, Optional, Any
|
||||
from datetime import datetime
|
||||
|
||||
# Add parent directories to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
import optuna
|
||||
from optuna.samplers import NSGAIISampler
|
||||
|
||||
# Atomizer imports
|
||||
from optimization_engine.nx_solver import NXSolver
|
||||
from optimization_engine.utils import ensure_nx_running
|
||||
from optimization_engine.extractors import ZernikeExtractor
|
||||
|
||||
# ============================================================================
|
||||
# Paths
|
||||
# ============================================================================
|
||||
|
||||
STUDY_DIR = Path(__file__).parent
|
||||
SETUP_DIR = STUDY_DIR / "1_setup"
|
||||
ITERATIONS_DIR = STUDY_DIR / "2_iterations"
|
||||
RESULTS_DIR = STUDY_DIR / "3_results"
|
||||
CONFIG_PATH = SETUP_DIR / "optimization_config.json"
|
||||
|
||||
# Source study for seeding
|
||||
V14_DB = STUDY_DIR.parent / "m1_mirror_adaptive_V14" / "3_results" / "study.db"
|
||||
|
||||
# Ensure directories exist
|
||||
ITERATIONS_DIR.mkdir(exist_ok=True)
|
||||
RESULTS_DIR.mkdir(exist_ok=True)
|
||||
|
||||
# Logging
|
||||
LOG_FILE = RESULTS_DIR / "optimization.log"
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s | %(levelname)-8s | %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout),
|
||||
logging.FileHandler(LOG_FILE, mode='a')
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Objective names (for NSGA-II, no weights - each is separate)
|
||||
# ============================================================================
|
||||
|
||||
OBJ_NAMES = [
|
||||
'rel_filtered_rms_40_vs_20',
|
||||
'rel_filtered_rms_60_vs_20',
|
||||
'mfg_90_optician_workload'
|
||||
]
|
||||
|
||||
# Weights only for reference/comparison (not used in optimization)
|
||||
OBJ_WEIGHTS = {
|
||||
'rel_filtered_rms_40_vs_20': 5.0,
|
||||
'rel_filtered_rms_60_vs_20': 5.0,
|
||||
'mfg_90_optician_workload': 1.0
|
||||
}
|
||||
|
||||
DESIGN_VAR_NAMES = [
|
||||
'lateral_inner_angle', 'lateral_outer_angle', 'lateral_outer_pivot',
|
||||
'lateral_inner_pivot', 'lateral_middle_pivot', 'lateral_closeness',
|
||||
'whiffle_min', 'whiffle_outer_to_vertical', 'whiffle_triangle_closeness',
|
||||
'blank_backface_angle', 'inner_circular_rib_dia'
|
||||
]
|
||||
|
||||
|
||||
def compute_weighted_sum(objectives: Dict[str, float]) -> float:
|
||||
"""Compute weighted sum of objectives (for reference only)."""
|
||||
total = 0.0
|
||||
for name, weight in OBJ_WEIGHTS.items():
|
||||
total += weight * objectives.get(name, 1000.0)
|
||||
return total
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Prior Data Loader
|
||||
# ============================================================================
|
||||
|
||||
def load_trials_from_v14() -> List[Dict]:
|
||||
"""Load all valid trials from V14 database."""
|
||||
if not V14_DB.exists():
|
||||
logger.warning(f"V14 database not found: {V14_DB}")
|
||||
return []
|
||||
|
||||
all_data = []
|
||||
conn = sqlite3.connect(str(V14_DB))
|
||||
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
SELECT trial_id, number FROM trials
|
||||
WHERE state = 'COMPLETE'
|
||||
''')
|
||||
trials = cursor.fetchall()
|
||||
|
||||
for trial_id, trial_num in trials:
|
||||
# Get user attributes
|
||||
cursor.execute('''
|
||||
SELECT key, value_json FROM trial_user_attributes
|
||||
WHERE trial_id = ?
|
||||
''', (trial_id,))
|
||||
attrs = {row[0]: json.loads(row[1]) for row in cursor.fetchall()}
|
||||
|
||||
# Get objectives from user attributes
|
||||
obj_40 = attrs.get('rel_filtered_rms_40_vs_20')
|
||||
obj_60 = attrs.get('rel_filtered_rms_60_vs_20')
|
||||
obj_mfg = attrs.get('mfg_90_optician_workload')
|
||||
|
||||
if obj_40 is None or obj_60 is None or obj_mfg is None:
|
||||
continue
|
||||
|
||||
# Skip invalid trials
|
||||
if obj_40 > 1000 or obj_60 > 1000 or obj_mfg > 1000:
|
||||
continue
|
||||
|
||||
# Get params
|
||||
cursor.execute('''
|
||||
SELECT param_name, param_value FROM trial_params
|
||||
WHERE trial_id = ?
|
||||
''', (trial_id,))
|
||||
params = {row[0]: float(row[1]) for row in cursor.fetchall()}
|
||||
|
||||
if len(params) < len(DESIGN_VAR_NAMES):
|
||||
continue # Missing parameters
|
||||
|
||||
source = attrs.get('source', 'unknown')
|
||||
|
||||
all_data.append({
|
||||
'trial_num': trial_num,
|
||||
'params': params,
|
||||
'objectives': {
|
||||
'rel_filtered_rms_40_vs_20': obj_40,
|
||||
'rel_filtered_rms_60_vs_20': obj_60,
|
||||
'mfg_90_optician_workload': obj_mfg
|
||||
},
|
||||
'source': f'V14_trial_{trial_num}' if source != 'FEA' else f'V14_FEA_{trial_num}'
|
||||
})
|
||||
|
||||
logger.info(f"Loaded {len(all_data)} valid trials from V14")
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
return all_data
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# FEA Runner
|
||||
# ============================================================================
|
||||
|
||||
class FEARunner:
|
||||
"""Runs actual FEA simulations."""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.config = config
|
||||
self.nx_solver = None
|
||||
self.nx_manager = None
|
||||
self.master_model_dir = SETUP_DIR / "model"
|
||||
|
||||
def setup(self):
|
||||
"""Setup NX and solver."""
|
||||
logger.info("Setting up NX session...")
|
||||
|
||||
study_name = self.config.get('study_name', 'm1_mirror_adaptive_V15')
|
||||
|
||||
try:
|
||||
self.nx_manager, nx_was_started = ensure_nx_running(
|
||||
session_id=study_name,
|
||||
auto_start=True,
|
||||
start_timeout=120
|
||||
)
|
||||
logger.info("NX session ready" + (" (started)" if nx_was_started else " (existing)"))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to setup NX: {e}")
|
||||
raise
|
||||
|
||||
# Initialize solver
|
||||
nx_settings = self.config.get('nx_settings', {})
|
||||
nx_install_dir = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506')
|
||||
version_match = re.search(r'NX(\d+)', nx_install_dir)
|
||||
nastran_version = version_match.group(1) if version_match else "2506"
|
||||
|
||||
self.nx_solver = NXSolver(
|
||||
master_model_dir=str(self.master_model_dir),
|
||||
nx_install_dir=nx_install_dir,
|
||||
nastran_version=nastran_version,
|
||||
timeout=nx_settings.get('simulation_timeout_s', 600),
|
||||
use_iteration_folders=True,
|
||||
study_name="m1_mirror_adaptive_V15"
|
||||
)
|
||||
|
||||
def run_fea(self, params: Dict[str, float], trial_num: int) -> Optional[Dict]:
|
||||
"""Run FEA and extract objectives."""
|
||||
if self.nx_solver is None:
|
||||
self.setup()
|
||||
|
||||
logger.info(f" [FEA {trial_num}] Running simulation...")
|
||||
|
||||
expressions = {var['expression_name']: params[var['name']]
|
||||
for var in self.config['design_variables']}
|
||||
|
||||
iter_folder = self.nx_solver.create_iteration_folder(
|
||||
iterations_base_dir=ITERATIONS_DIR,
|
||||
iteration_number=trial_num,
|
||||
expression_updates=expressions
|
||||
)
|
||||
|
||||
try:
|
||||
nx_settings = self.config.get('nx_settings', {})
|
||||
sim_file = iter_folder / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim')
|
||||
|
||||
t_start = time.time()
|
||||
|
||||
result = self.nx_solver.run_simulation(
|
||||
sim_file=sim_file,
|
||||
working_dir=iter_folder,
|
||||
expression_updates=expressions,
|
||||
solution_name=nx_settings.get('solution_name', 'Solution 1'),
|
||||
cleanup=False
|
||||
)
|
||||
|
||||
solve_time = time.time() - t_start
|
||||
|
||||
if not result['success']:
|
||||
logger.error(f" [FEA {trial_num}] Solve failed: {result.get('error')}")
|
||||
return None
|
||||
|
||||
logger.info(f" [FEA {trial_num}] Solved in {solve_time:.1f}s")
|
||||
|
||||
# Extract objectives
|
||||
op2_path = Path(result['op2_file'])
|
||||
objectives = self._extract_objectives(op2_path)
|
||||
|
||||
if objectives is None:
|
||||
return None
|
||||
|
||||
weighted_sum = compute_weighted_sum(objectives)
|
||||
|
||||
logger.info(f" [FEA {trial_num}] 40-20: {objectives['rel_filtered_rms_40_vs_20']:.2f} nm")
|
||||
logger.info(f" [FEA {trial_num}] 60-20: {objectives['rel_filtered_rms_60_vs_20']:.2f} nm")
|
||||
logger.info(f" [FEA {trial_num}] Mfg: {objectives['mfg_90_optician_workload']:.2f} nm")
|
||||
logger.info(f" [FEA {trial_num}] Weighted Sum: {weighted_sum:.2f}")
|
||||
|
||||
return {
|
||||
'trial_num': trial_num,
|
||||
'params': params,
|
||||
'objectives': objectives,
|
||||
'weighted_sum': weighted_sum,
|
||||
'source': 'FEA',
|
||||
'solve_time': solve_time
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f" [FEA {trial_num}] Error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return None
|
||||
|
||||
def _extract_objectives(self, op2_path: Path) -> Optional[Dict[str, float]]:
|
||||
"""Extract objectives using ZernikeExtractor."""
|
||||
try:
|
||||
zernike_settings = self.config.get('zernike_settings', {})
|
||||
|
||||
extractor = ZernikeExtractor(
|
||||
op2_path,
|
||||
bdf_path=None,
|
||||
displacement_unit=zernike_settings.get('displacement_unit', 'mm'),
|
||||
n_modes=zernike_settings.get('n_modes', 50),
|
||||
filter_orders=zernike_settings.get('filter_low_orders', 4)
|
||||
)
|
||||
|
||||
ref = zernike_settings.get('reference_subcase', '2')
|
||||
|
||||
rel_40 = extractor.extract_relative("3", ref)
|
||||
rel_60 = extractor.extract_relative("4", ref)
|
||||
rel_90 = extractor.extract_relative("1", ref)
|
||||
|
||||
return {
|
||||
'rel_filtered_rms_40_vs_20': rel_40['relative_filtered_rms_nm'],
|
||||
'rel_filtered_rms_60_vs_20': rel_60['relative_filtered_rms_nm'],
|
||||
'mfg_90_optician_workload': rel_90['relative_rms_filter_j1to3']
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Zernike extraction failed: {e}")
|
||||
return None
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup NX session."""
|
||||
pass
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NSGA-II Optimizer
|
||||
# ============================================================================
|
||||
|
||||
class NSGAII_Optimizer:
|
||||
"""NSGA-II multi-objective optimizer."""
|
||||
|
||||
def __init__(self, config: Dict[str, Any], resume: bool = False):
|
||||
self.config = config
|
||||
self.resume = resume
|
||||
self.fea_runner = FEARunner(config)
|
||||
|
||||
# Load design variable bounds
|
||||
self.design_vars = {
|
||||
v['name']: {'min': v['min'], 'max': v['max']}
|
||||
for v in config['design_variables']
|
||||
if v.get('enabled', True)
|
||||
}
|
||||
|
||||
# NSGA-II settings
|
||||
opt_settings = config.get('optimization', {})
|
||||
self.population_size = opt_settings.get('population_size', 50)
|
||||
self.seed = opt_settings.get('seed', 42)
|
||||
|
||||
# Study
|
||||
self.study_name = config.get('study_name', 'm1_mirror_adaptive_V15')
|
||||
self.db_path = RESULTS_DIR / "study.db"
|
||||
|
||||
# Track FEA iteration count
|
||||
self._count_existing_iterations()
|
||||
|
||||
def _count_existing_iterations(self):
|
||||
"""Count existing iteration folders."""
|
||||
self.fea_count = 0
|
||||
if ITERATIONS_DIR.exists():
|
||||
for d in ITERATIONS_DIR.iterdir():
|
||||
if d.is_dir() and d.name.startswith('iter'):
|
||||
try:
|
||||
num = int(d.name.replace('iter', ''))
|
||||
self.fea_count = max(self.fea_count, num)
|
||||
except ValueError:
|
||||
pass
|
||||
logger.info(f"Existing FEA iterations: {self.fea_count}")
|
||||
|
||||
def create_study(self) -> optuna.Study:
|
||||
"""Create or load Optuna study with NSGA-II sampler."""
|
||||
sampler = NSGAIISampler(
|
||||
population_size=self.population_size,
|
||||
seed=self.seed
|
||||
)
|
||||
|
||||
storage = f"sqlite:///{self.db_path}"
|
||||
|
||||
if self.resume:
|
||||
try:
|
||||
study = optuna.load_study(
|
||||
study_name=self.study_name,
|
||||
storage=storage,
|
||||
sampler=sampler
|
||||
)
|
||||
logger.info(f"Resumed study with {len(study.trials)} existing trials")
|
||||
return study
|
||||
except KeyError:
|
||||
logger.info("No existing study found, creating new one")
|
||||
|
||||
# Create new multi-objective study
|
||||
study = optuna.create_study(
|
||||
study_name=self.study_name,
|
||||
storage=storage,
|
||||
sampler=sampler,
|
||||
directions=["minimize", "minimize", "minimize"], # 3 objectives
|
||||
load_if_exists=True
|
||||
)
|
||||
|
||||
# Seed from V14 if new study
|
||||
if len(study.trials) == 0:
|
||||
self._seed_from_v14(study)
|
||||
|
||||
return study
|
||||
|
||||
def _seed_from_v14(self, study: optuna.Study):
|
||||
"""Seed study from V14 data."""
|
||||
prior_data = load_trials_from_v14()
|
||||
|
||||
if not prior_data:
|
||||
logger.warning("No prior data to seed")
|
||||
return
|
||||
|
||||
logger.info(f"Seeding {len(prior_data)} trials from V14...")
|
||||
|
||||
seeded = 0
|
||||
for data in prior_data:
|
||||
try:
|
||||
# Create frozen trial
|
||||
distributions = {
|
||||
name: optuna.distributions.FloatDistribution(bounds['min'], bounds['max'])
|
||||
for name, bounds in self.design_vars.items()
|
||||
}
|
||||
|
||||
# Create trial with parameters
|
||||
trial = optuna.trial.create_trial(
|
||||
params=data['params'],
|
||||
distributions=distributions,
|
||||
values=[
|
||||
data['objectives']['rel_filtered_rms_40_vs_20'],
|
||||
data['objectives']['rel_filtered_rms_60_vs_20'],
|
||||
data['objectives']['mfg_90_optician_workload']
|
||||
],
|
||||
user_attrs={
|
||||
'source': data['source'],
|
||||
'rel_filtered_rms_40_vs_20': data['objectives']['rel_filtered_rms_40_vs_20'],
|
||||
'rel_filtered_rms_60_vs_20': data['objectives']['rel_filtered_rms_60_vs_20'],
|
||||
'mfg_90_optician_workload': data['objectives']['mfg_90_optician_workload'],
|
||||
'weighted_sum': compute_weighted_sum(data['objectives'])
|
||||
}
|
||||
)
|
||||
|
||||
study.add_trial(trial)
|
||||
seeded += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to seed trial: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"Seeded {seeded} trials from V14")
|
||||
|
||||
def objective(self, trial: optuna.Trial) -> Tuple[float, float, float]:
|
||||
"""NSGA-II objective function."""
|
||||
# Sample parameters
|
||||
params = {}
|
||||
for name, bounds in self.design_vars.items():
|
||||
params[name] = trial.suggest_float(name, bounds['min'], bounds['max'])
|
||||
|
||||
# Increment FEA counter
|
||||
self.fea_count += 1
|
||||
iter_num = self.fea_count
|
||||
|
||||
logger.info(f"Trial {trial.number} -> iter{iter_num}")
|
||||
|
||||
# Run FEA
|
||||
result = self.fea_runner.run_fea(params, iter_num)
|
||||
|
||||
if result is None:
|
||||
# Failed trial - return high values
|
||||
trial.set_user_attr('source', 'FEA_FAILED')
|
||||
trial.set_user_attr('iter_num', iter_num)
|
||||
return (1e6, 1e6, 1e6)
|
||||
|
||||
# Store metadata
|
||||
trial.set_user_attr('source', 'FEA')
|
||||
trial.set_user_attr('iter_num', iter_num)
|
||||
trial.set_user_attr('rel_filtered_rms_40_vs_20', result['objectives']['rel_filtered_rms_40_vs_20'])
|
||||
trial.set_user_attr('rel_filtered_rms_60_vs_20', result['objectives']['rel_filtered_rms_60_vs_20'])
|
||||
trial.set_user_attr('mfg_90_optician_workload', result['objectives']['mfg_90_optician_workload'])
|
||||
trial.set_user_attr('weighted_sum', result['weighted_sum'])
|
||||
trial.set_user_attr('solve_time', result['solve_time'])
|
||||
|
||||
return (
|
||||
result['objectives']['rel_filtered_rms_40_vs_20'],
|
||||
result['objectives']['rel_filtered_rms_60_vs_20'],
|
||||
result['objectives']['mfg_90_optician_workload']
|
||||
)
|
||||
|
||||
def run(self, n_trials: int):
|
||||
"""Run NSGA-II optimization."""
|
||||
study = self.create_study()
|
||||
|
||||
# Count existing FEA trials
|
||||
fea_before = sum(1 for t in study.trials if t.user_attrs.get('source') == 'FEA')
|
||||
|
||||
logger.info("=" * 70)
|
||||
logger.info("NSGA-II MULTI-OBJECTIVE OPTIMIZATION")
|
||||
logger.info("=" * 70)
|
||||
logger.info(f"Study: {self.study_name}")
|
||||
logger.info(f"Total trials in DB: {len(study.trials)}")
|
||||
logger.info(f"Existing FEA trials: {fea_before}")
|
||||
logger.info(f"New FEA trials to run: {n_trials}")
|
||||
logger.info("=" * 70)
|
||||
|
||||
try:
|
||||
study.optimize(
|
||||
self.objective,
|
||||
n_trials=n_trials,
|
||||
show_progress_bar=True,
|
||||
gc_after_trial=True
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Optimization interrupted by user")
|
||||
|
||||
# Report Pareto front
|
||||
self._report_pareto(study)
|
||||
|
||||
# Archive best design
|
||||
self._archive_best_design(study)
|
||||
|
||||
return study
|
||||
|
||||
def _report_pareto(self, study: optuna.Study):
|
||||
"""Report Pareto front results."""
|
||||
pareto_trials = study.best_trials
|
||||
|
||||
logger.info("\n" + "=" * 70)
|
||||
logger.info(f"PARETO FRONT: {len(pareto_trials)} non-dominated solutions")
|
||||
logger.info("=" * 70)
|
||||
|
||||
print(f"\n{'Trial':>6} | {'40vs20':>10} | {'60vs20':>10} | {'MFG':>10} | {'WS':>10} | Source")
|
||||
print("-" * 75)
|
||||
|
||||
# Sort by weighted sum for display
|
||||
sorted_pareto = sorted(pareto_trials, key=lambda t:
|
||||
5*t.values[0] + 5*t.values[1] + 1*t.values[2]
|
||||
)
|
||||
|
||||
for t in sorted_pareto[:20]:
|
||||
source = t.user_attrs.get('source', 'unknown')[:12]
|
||||
ws = 5*t.values[0] + 5*t.values[1] + 1*t.values[2]
|
||||
print(f"{t.number:>6} | {t.values[0]:>10.2f} | {t.values[1]:>10.2f} | {t.values[2]:>10.2f} | {ws:>10.2f} | {source}")
|
||||
|
||||
# Save Pareto front to JSON
|
||||
pareto_data = []
|
||||
for t in pareto_trials:
|
||||
pareto_data.append({
|
||||
"trial_number": t.number,
|
||||
"objectives": {
|
||||
"rel_filtered_rms_40_vs_20": t.values[0],
|
||||
"rel_filtered_rms_60_vs_20": t.values[1],
|
||||
"mfg_90_optician_workload": t.values[2]
|
||||
},
|
||||
"weighted_sum": 5*t.values[0] + 5*t.values[1] + 1*t.values[2],
|
||||
"params": dict(t.params),
|
||||
"source": t.user_attrs.get('source', 'unknown'),
|
||||
"iter_num": t.user_attrs.get('iter_num')
|
||||
})
|
||||
|
||||
pareto_file = RESULTS_DIR / "pareto_front.json"
|
||||
with open(pareto_file, "w") as f:
|
||||
json.dump(pareto_data, f, indent=2)
|
||||
|
||||
logger.info(f"\nPareto front saved to: {pareto_file}")
|
||||
|
||||
# Summary stats
|
||||
if pareto_data:
|
||||
best_40 = min(pareto_data, key=lambda x: x['objectives']['rel_filtered_rms_40_vs_20'])
|
||||
best_60 = min(pareto_data, key=lambda x: x['objectives']['rel_filtered_rms_60_vs_20'])
|
||||
best_mfg = min(pareto_data, key=lambda x: x['objectives']['mfg_90_optician_workload'])
|
||||
best_ws = min(pareto_data, key=lambda x: x['weighted_sum'])
|
||||
|
||||
logger.info("\nPARETO EXTREMES:")
|
||||
logger.info(f" Best 40vs20: Trial #{best_40['trial_number']} = {best_40['objectives']['rel_filtered_rms_40_vs_20']:.2f} nm")
|
||||
logger.info(f" Best 60vs20: Trial #{best_60['trial_number']} = {best_60['objectives']['rel_filtered_rms_60_vs_20']:.2f} nm")
|
||||
logger.info(f" Best MFG: Trial #{best_mfg['trial_number']} = {best_mfg['objectives']['mfg_90_optician_workload']:.2f} nm")
|
||||
logger.info(f" Best WS: Trial #{best_ws['trial_number']} = {best_ws['weighted_sum']:.2f}")
|
||||
|
||||
def _archive_best_design(self, study: optuna.Study):
|
||||
"""Archive best design (lowest weighted sum from Pareto)."""
|
||||
try:
|
||||
tools_dir = Path(__file__).parent.parent.parent / "tools"
|
||||
sys.path.insert(0, str(tools_dir))
|
||||
from archive_best_design import archive_best_design as archive_fn
|
||||
|
||||
result = archive_fn(str(STUDY_DIR))
|
||||
if result.get("success"):
|
||||
logger.info(f"Archived best design to: {result['archive_path']}")
|
||||
else:
|
||||
logger.info(f"Archive skipped: {result.get('reason', 'unknown')}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not archive best design: {e}")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Main
|
||||
# ============================================================================
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="M1 Mirror V15 NSGA-II Optimization")
|
||||
parser.add_argument("--start", action="store_true", help="Start optimization")
|
||||
parser.add_argument("--trials", type=int, default=100, help="Number of FEA trials")
|
||||
parser.add_argument("--resume", action="store_true", help="Resume interrupted run")
|
||||
parser.add_argument("--test", action="store_true", help="Run single test trial")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.start and not args.test:
|
||||
parser.print_help()
|
||||
print("\nUse --start to begin optimization or --test for single trial")
|
||||
return
|
||||
|
||||
# Load config
|
||||
if not CONFIG_PATH.exists():
|
||||
print(f"Error: Config not found at {CONFIG_PATH}")
|
||||
sys.exit(1)
|
||||
|
||||
with open(CONFIG_PATH) as f:
|
||||
config = json.load(f)
|
||||
|
||||
# Create optimizer
|
||||
optimizer = NSGAII_Optimizer(config, resume=args.resume)
|
||||
|
||||
# Run
|
||||
n_trials = 1 if args.test else args.trials
|
||||
optimizer.run(n_trials=n_trials)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user