Files
Atomizer/studies/m1_mirror_adaptive_V14/run_optimization.py
Antoine 1bb201e0b7 feat: Add post-optimization tools and mandatory best design archiving
New Tools (tools/):
- analyze_study.py: Generate comprehensive optimization reports
- find_best_iteration.py: Find best iteration folder, optionally copy it
- archive_best_design.py: Archive best design to 3_results/best_design_archive/<timestamp>/

Protocol Updates:
- OP_02_RUN_OPTIMIZATION.md v1.1: Add mandatory archive_best_design step
  in Post-Run Actions. This MUST be done after every optimization run.

V14 Updates:
- run_optimization.py: Auto-archive best design at end of optimization
- optimization_config.json: Expand bounds for V14 continuation
  - lateral_outer_angle: min 13->11 deg (was at 4.7%)
  - lateral_inner_pivot: min 7->5 mm (was at 8.1%)
  - lateral_middle_pivot: max 23->27 mm (was at 99.4%)
  - whiffle_min: max 60->72 mm (was at 96.3%)

Usage:
  python tools/analyze_study.py m1_mirror_adaptive_V14
  python tools/find_best_iteration.py m1_mirror_adaptive_V14
  python tools/archive_best_design.py m1_mirror_adaptive_V14

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 10:28:35 -05:00

666 lines
24 KiB
Python

#!/usr/bin/env python3
"""
M1 Mirror TPE FEA Optimization V14
===================================
TPE (Tree-structured Parzen Estimator) optimization seeded from V11+V12+V13.
Uses weighted-sum objective for efficient single-objective convergence.
Key Features:
1. TPE sampler - efficient Bayesian optimization for single objectives
2. Seeds from all prior FEA trials (~150+ from V11, V12, V13)
3. Weighted-sum objective: 5*obj_40 + 5*obj_60 + 1*obj_mfg
4. Individual objectives tracked as user attributes
Usage:
python run_optimization.py --start
python run_optimization.py --start --trials 50
python run_optimization.py --start --trials 50 --resume
For 8-hour overnight run (~55 trials at 8-9 min/trial):
python run_optimization.py --start --trials 55
"""
import sys
import os
import json
import time
import argparse
import logging
import sqlite3
import shutil
import re
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
import numpy as np
# Add parent directories to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import optuna
from optuna.samplers import TPESampler
# Atomizer imports
from optimization_engine.nx_solver import NXSolver
from optimization_engine.utils import ensure_nx_running
from optimization_engine.extractors import ZernikeExtractor
# ============================================================================
# Paths
# ============================================================================
STUDY_DIR = Path(__file__).parent
SETUP_DIR = STUDY_DIR / "1_setup"
ITERATIONS_DIR = STUDY_DIR / "2_iterations"
RESULTS_DIR = STUDY_DIR / "3_results"
CONFIG_PATH = SETUP_DIR / "optimization_config.json"
# Source studies for seeding
V11_DB = STUDY_DIR.parent / "m1_mirror_adaptive_V11" / "3_results" / "study.db"
V12_DB = STUDY_DIR.parent / "m1_mirror_adaptive_V12" / "3_results" / "study.db"
V13_DB = STUDY_DIR.parent / "m1_mirror_adaptive_V13" / "3_results" / "study.db"
# Ensure directories exist
ITERATIONS_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)
# Logging
LOG_FILE = RESULTS_DIR / "optimization.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(LOG_FILE, mode='a')
]
)
logger = logging.getLogger(__name__)
# ============================================================================
# Objective names and weights
# ============================================================================
OBJ_NAMES = [
'rel_filtered_rms_40_vs_20',
'rel_filtered_rms_60_vs_20',
'mfg_90_optician_workload'
]
# Weights for weighted-sum objective (from config)
OBJ_WEIGHTS = {
'rel_filtered_rms_40_vs_20': 5.0,
'rel_filtered_rms_60_vs_20': 5.0,
'mfg_90_optician_workload': 1.0
}
DESIGN_VAR_NAMES = [
'lateral_inner_angle', 'lateral_outer_angle', 'lateral_outer_pivot',
'lateral_inner_pivot', 'lateral_middle_pivot', 'lateral_closeness',
'whiffle_min', 'whiffle_outer_to_vertical', 'whiffle_triangle_closeness',
'blank_backface_angle', 'inner_circular_rib_dia'
]
def compute_weighted_sum(objectives: Dict[str, float]) -> float:
"""Compute weighted sum of objectives."""
total = 0.0
for name, weight in OBJ_WEIGHTS.items():
total += weight * objectives.get(name, 1000.0)
return total
# ============================================================================
# Prior Data Loader
# ============================================================================
def load_fea_trials_from_db(db_path: Path, label: str) -> List[Dict]:
"""Load FEA trials from an Optuna database."""
if not db_path.exists():
logger.warning(f"{label} database not found: {db_path}")
return []
fea_data = []
conn = sqlite3.connect(str(db_path))
try:
cursor = conn.cursor()
cursor.execute('''
SELECT trial_id, number FROM trials
WHERE state = 'COMPLETE'
''')
trials = cursor.fetchall()
for trial_id, trial_num in trials:
# Get user attributes
cursor.execute('''
SELECT key, value_json FROM trial_user_attributes
WHERE trial_id = ?
''', (trial_id,))
attrs = {row[0]: json.loads(row[1]) for row in cursor.fetchall()}
# Check if FEA trial (source contains 'FEA')
source = attrs.get('source', 'FEA')
if isinstance(source, str) and 'FEA' not in source:
continue # Skip NN trials
# Get params
cursor.execute('''
SELECT param_name, param_value FROM trial_params
WHERE trial_id = ?
''', (trial_id,))
params = {name: float(value) for name, value in cursor.fetchall()}
if not params:
continue
# Get objectives (stored as individual attributes or in 'objectives')
objectives = {}
if 'objectives' in attrs:
objectives = attrs['objectives']
else:
# Try individual attributes
for obj_name in OBJ_NAMES:
if obj_name in attrs:
objectives[obj_name] = attrs[obj_name]
if all(k in objectives for k in OBJ_NAMES):
fea_data.append({
'trial_num': trial_num,
'params': params,
'objectives': objectives,
'source': f'{label}_{source}'
})
except Exception as e:
logger.error(f"Error loading {label} data: {e}")
finally:
conn.close()
logger.info(f"Loaded {len(fea_data)} FEA trials from {label}")
return fea_data
def load_all_prior_fea_data() -> List[Dict]:
"""Load FEA trials from V11, V12, and V13."""
all_data = []
# V11 data
v11_data = load_fea_trials_from_db(V11_DB, "V11")
all_data.extend(v11_data)
# V12 data
v12_data = load_fea_trials_from_db(V12_DB, "V12")
all_data.extend(v12_data)
# V13 data
v13_data = load_fea_trials_from_db(V13_DB, "V13")
all_data.extend(v13_data)
logger.info(f"Total prior FEA trials: {len(all_data)}")
return all_data
# ============================================================================
# FEA Runner
# ============================================================================
class FEARunner:
"""Runs actual FEA simulations."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.nx_solver = None
self.nx_manager = None
self.master_model_dir = SETUP_DIR / "model"
def setup(self):
"""Setup NX and solver."""
logger.info("Setting up NX session...")
study_name = self.config.get('study_name', 'm1_mirror_adaptive_V14')
try:
self.nx_manager, nx_was_started = ensure_nx_running(
session_id=study_name,
auto_start=True,
start_timeout=120
)
logger.info("NX session ready" + (" (started)" if nx_was_started else " (existing)"))
except Exception as e:
logger.error(f"Failed to setup NX: {e}")
raise
# Initialize solver
nx_settings = self.config.get('nx_settings', {})
nx_install_dir = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506')
version_match = re.search(r'NX(\d+)', nx_install_dir)
nastran_version = version_match.group(1) if version_match else "2506"
self.nx_solver = NXSolver(
master_model_dir=str(self.master_model_dir),
nx_install_dir=nx_install_dir,
nastran_version=nastran_version,
timeout=nx_settings.get('simulation_timeout_s', 600),
use_iteration_folders=True,
study_name="m1_mirror_adaptive_V14"
)
def run_fea(self, params: Dict[str, float], trial_num: int) -> Optional[Dict]:
"""Run FEA and extract objectives."""
if self.nx_solver is None:
self.setup()
logger.info(f" [FEA {trial_num}] Running simulation...")
expressions = {var['expression_name']: params[var['name']]
for var in self.config['design_variables']}
iter_folder = self.nx_solver.create_iteration_folder(
iterations_base_dir=ITERATIONS_DIR,
iteration_number=trial_num,
expression_updates=expressions
)
try:
nx_settings = self.config.get('nx_settings', {})
sim_file = iter_folder / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim')
t_start = time.time()
result = self.nx_solver.run_simulation(
sim_file=sim_file,
working_dir=iter_folder,
expression_updates=expressions,
solution_name=nx_settings.get('solution_name', 'Solution 1'),
cleanup=False
)
solve_time = time.time() - t_start
if not result['success']:
logger.error(f" [FEA {trial_num}] Solve failed: {result.get('error')}")
return None
logger.info(f" [FEA {trial_num}] Solved in {solve_time:.1f}s")
# Extract objectives
op2_path = Path(result['op2_file'])
objectives = self._extract_objectives(op2_path)
if objectives is None:
return None
weighted_sum = compute_weighted_sum(objectives)
logger.info(f" [FEA {trial_num}] 40-20: {objectives['rel_filtered_rms_40_vs_20']:.2f} nm")
logger.info(f" [FEA {trial_num}] 60-20: {objectives['rel_filtered_rms_60_vs_20']:.2f} nm")
logger.info(f" [FEA {trial_num}] Mfg: {objectives['mfg_90_optician_workload']:.2f} nm")
logger.info(f" [FEA {trial_num}] Weighted Sum: {weighted_sum:.2f}")
return {
'trial_num': trial_num,
'params': params,
'objectives': objectives,
'weighted_sum': weighted_sum,
'source': 'FEA',
'solve_time': solve_time
}
except Exception as e:
logger.error(f" [FEA {trial_num}] Error: {e}")
import traceback
traceback.print_exc()
return None
def _extract_objectives(self, op2_path: Path) -> Optional[Dict[str, float]]:
"""Extract objectives using ZernikeExtractor."""
try:
zernike_settings = self.config.get('zernike_settings', {})
extractor = ZernikeExtractor(
op2_path,
bdf_path=None,
displacement_unit=zernike_settings.get('displacement_unit', 'mm'),
n_modes=zernike_settings.get('n_modes', 50),
filter_orders=zernike_settings.get('filter_low_orders', 4)
)
ref = zernike_settings.get('reference_subcase', '2')
rel_40 = extractor.extract_relative("3", ref)
rel_60 = extractor.extract_relative("4", ref)
rel_90 = extractor.extract_relative("1", ref)
return {
'rel_filtered_rms_40_vs_20': rel_40['relative_filtered_rms_nm'],
'rel_filtered_rms_60_vs_20': rel_60['relative_filtered_rms_nm'],
'mfg_90_optician_workload': rel_90['relative_rms_filter_j1to3']
}
except Exception as e:
logger.error(f"Zernike extraction failed: {e}")
return None
def cleanup(self):
"""Cleanup NX session."""
if self.nx_manager:
if self.nx_manager.can_close_nx():
self.nx_manager.close_nx_if_allowed()
self.nx_manager.cleanup()
# ============================================================================
# TPE Optimizer
# ============================================================================
class TPEOptimizer:
"""TPE-based FEA optimizer with weighted-sum objective."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.fea_runner = FEARunner(config)
# Load prior data for seeding
self.prior_data = load_all_prior_fea_data()
# Database
self.db_path = RESULTS_DIR / "study.db"
self.storage = optuna.storages.RDBStorage(f'sqlite:///{self.db_path}')
# State
self.trial_count = 0
self.best_value = float('inf')
self.best_trial = None
def _get_next_trial_number(self) -> int:
"""Get the next trial number based on existing iterations."""
existing = list(ITERATIONS_DIR.glob("iter*"))
if not existing:
return 1
max_num = max(int(p.name.replace("iter", "")) for p in existing)
return max_num + 1
def seed_from_prior(self, study: optuna.Study):
"""Seed the study with prior FEA trials."""
if not self.prior_data:
logger.warning("No prior data to seed from")
return
logger.info(f"Seeding study with {len(self.prior_data)} prior FEA trials...")
for i, d in enumerate(self.prior_data):
try:
# Create a trial with the prior data
distributions = {}
for var in self.config['design_variables']:
if var.get('enabled', False):
distributions[var['name']] = optuna.distributions.FloatDistribution(
var['min'], var['max']
)
# Compute weighted sum for the prior trial
weighted_sum = compute_weighted_sum(d['objectives'])
# Create frozen trial
frozen_trial = optuna.trial.create_trial(
params=d['params'],
distributions=distributions,
values=[weighted_sum], # Single objective for TPE
user_attrs={
'source': d.get('source', 'prior_FEA'),
'rel_filtered_rms_40_vs_20': d['objectives']['rel_filtered_rms_40_vs_20'],
'rel_filtered_rms_60_vs_20': d['objectives']['rel_filtered_rms_60_vs_20'],
'mfg_90_optician_workload': d['objectives']['mfg_90_optician_workload'],
'weighted_sum': weighted_sum,
}
)
study.add_trial(frozen_trial)
# Track best
if weighted_sum < self.best_value:
self.best_value = weighted_sum
self.best_trial = d
except Exception as e:
logger.warning(f"Failed to seed trial {i}: {e}")
logger.info(f"Seeded {len(study.trials)} trials")
if self.best_trial:
logger.info(f"Best prior: weighted_sum={self.best_value:.2f}")
logger.info(f" 40-20: {self.best_trial['objectives']['rel_filtered_rms_40_vs_20']:.2f} nm")
logger.info(f" 60-20: {self.best_trial['objectives']['rel_filtered_rms_60_vs_20']:.2f} nm")
logger.info(f" Mfg: {self.best_trial['objectives']['mfg_90_optician_workload']:.2f} nm")
def run(self, n_trials: int = 50, resume: bool = False):
"""Run TPE optimization."""
logger.info("\n" + "=" * 70)
logger.info("M1 MIRROR TPE FEA OPTIMIZATION V14")
logger.info("=" * 70)
logger.info(f"Prior FEA trials: {len(self.prior_data)}")
logger.info(f"New trials to run: {n_trials}")
logger.info(f"Objectives: {OBJ_NAMES}")
logger.info(f"Weights: {OBJ_WEIGHTS}")
start_time = time.time()
# Create or load study with TPE sampler
tpe_settings = self.config.get('tpe_settings', {})
sampler = TPESampler(
n_startup_trials=tpe_settings.get('n_startup_trials', 10),
n_ei_candidates=tpe_settings.get('n_ei_candidates', 24),
multivariate=tpe_settings.get('multivariate', True),
constant_liar=tpe_settings.get('constant_liar', True),
seed=tpe_settings.get('seed', 42)
)
study = optuna.create_study(
study_name="v14_tpe",
storage=self.storage,
direction='minimize', # Single objective - minimize weighted sum
sampler=sampler,
load_if_exists=resume
)
# Seed with prior data if starting fresh
if not resume or len(study.trials) == 0:
self.seed_from_prior(study)
self.trial_count = self._get_next_trial_number()
logger.info(f"Starting from trial {self.trial_count}")
# Run optimization
def objective(trial: optuna.Trial) -> float:
# Sample parameters
params = {}
for var in self.config['design_variables']:
if var.get('enabled', False):
params[var['name']] = trial.suggest_float(var['name'], var['min'], var['max'])
# Run FEA
result = self.fea_runner.run_fea(params, self.trial_count)
self.trial_count += 1
if result is None:
# Return worst-case value for failed trials
return 10000.0
# Store objectives as user attributes
trial.set_user_attr('source', 'FEA')
trial.set_user_attr('rel_filtered_rms_40_vs_20', result['objectives']['rel_filtered_rms_40_vs_20'])
trial.set_user_attr('rel_filtered_rms_60_vs_20', result['objectives']['rel_filtered_rms_60_vs_20'])
trial.set_user_attr('mfg_90_optician_workload', result['objectives']['mfg_90_optician_workload'])
trial.set_user_attr('weighted_sum', result['weighted_sum'])
trial.set_user_attr('solve_time', result.get('solve_time', 0))
# Track best
if result['weighted_sum'] < self.best_value:
self.best_value = result['weighted_sum']
logger.info(f" [NEW BEST] Weighted Sum: {self.best_value:.2f}")
return result['weighted_sum']
# Run
try:
study.optimize(
objective,
n_trials=n_trials,
show_progress_bar=True,
gc_after_trial=True
)
except KeyboardInterrupt:
logger.info("\nOptimization interrupted by user")
finally:
self.fea_runner.cleanup()
# Print results
elapsed = time.time() - start_time
self._print_results(study, elapsed)
def _print_results(self, study: optuna.Study, elapsed: float):
"""Print optimization results."""
logger.info("\n" + "=" * 70)
logger.info("OPTIMIZATION COMPLETE")
logger.info("=" * 70)
logger.info(f"Time: {elapsed/60:.1f} min ({elapsed/3600:.2f} hours)")
logger.info(f"Total trials: {len(study.trials)}")
# Get best trial
best_trial = study.best_trial
logger.info(f"\nBest Trial: #{best_trial.number}")
logger.info(f" Weighted Sum: {best_trial.value:.2f}")
logger.info(f" 40-20: {best_trial.user_attrs.get('rel_filtered_rms_40_vs_20', 'N/A'):.2f} nm")
logger.info(f" 60-20: {best_trial.user_attrs.get('rel_filtered_rms_60_vs_20', 'N/A'):.2f} nm")
logger.info(f" Mfg: {best_trial.user_attrs.get('mfg_90_optician_workload', 'N/A'):.2f} nm")
logger.info("\nBest Parameters:")
for name, value in best_trial.params.items():
logger.info(f" {name}: {value:.4f}")
# Find top 10 trials
sorted_trials = sorted(
[t for t in study.trials if t.value is not None and t.value < 10000],
key=lambda t: t.value
)[:10]
logger.info("\nTop 10 Trials:")
logger.info("-" * 90)
logger.info(f"{'#':>4} {'WeightedSum':>12} {'40-20 (nm)':>12} {'60-20 (nm)':>12} {'Mfg (nm)':>12} {'Source':>10}")
logger.info("-" * 90)
for trial in sorted_trials:
source = trial.user_attrs.get('source', 'unknown')[:10]
logger.info(
f"{trial.number:>4} "
f"{trial.value:>12.2f} "
f"{trial.user_attrs.get('rel_filtered_rms_40_vs_20', 0):>12.2f} "
f"{trial.user_attrs.get('rel_filtered_rms_60_vs_20', 0):>12.2f} "
f"{trial.user_attrs.get('mfg_90_optician_workload', 0):>12.2f} "
f"{source:>10}"
)
# Save results
results = {
'summary': {
'total_trials': len(study.trials),
'best_weighted_sum': best_trial.value,
'elapsed_hours': elapsed / 3600
},
'best_trial': {
'number': best_trial.number,
'params': best_trial.params,
'objectives': {
'rel_filtered_rms_40_vs_20': best_trial.user_attrs.get('rel_filtered_rms_40_vs_20'),
'rel_filtered_rms_60_vs_20': best_trial.user_attrs.get('rel_filtered_rms_60_vs_20'),
'mfg_90_optician_workload': best_trial.user_attrs.get('mfg_90_optician_workload'),
},
'weighted_sum': best_trial.value
},
'top_10': [
{
'trial': t.number,
'weighted_sum': t.value,
'params': t.params,
'objectives': {
'rel_filtered_rms_40_vs_20': t.user_attrs.get('rel_filtered_rms_40_vs_20'),
'rel_filtered_rms_60_vs_20': t.user_attrs.get('rel_filtered_rms_60_vs_20'),
'mfg_90_optician_workload': t.user_attrs.get('mfg_90_optician_workload'),
}
}
for t in sorted_trials
]
}
with open(RESULTS_DIR / 'final_results.json', 'w') as f:
json.dump(results, f, indent=2)
logger.info(f"\nResults saved to {RESULTS_DIR / 'final_results.json'}")
# Archive best design
self._archive_best_design()
def _archive_best_design(self):
"""Archive the best design iteration folder."""
try:
# Import archive tool
tools_dir = Path(__file__).parent.parent.parent / "tools"
sys.path.insert(0, str(tools_dir))
from archive_best_design import archive_best_design
logger.info("\n" + "-" * 70)
logger.info("ARCHIVING BEST DESIGN")
logger.info("-" * 70)
result = archive_best_design(str(Path(__file__).parent))
if result.get('success'):
logger.info(f"[OK] Best design archived to: {result['archive_path']}")
logger.info(f" Trial #{result['trial_number']}, WS={result['weighted_sum']:.2f}")
else:
logger.warning(f"[WARN] Archive skipped: {result.get('reason', 'Unknown')}")
except Exception as e:
logger.error(f"[ERROR] Failed to archive best design: {e}")
# ============================================================================
# Main
# ============================================================================
def main():
parser = argparse.ArgumentParser(description='M1 Mirror TPE V14')
parser.add_argument('--start', action='store_true', help='Start optimization')
parser.add_argument('--trials', type=int, default=50, help='Number of new FEA trials')
parser.add_argument('--resume', action='store_true', help='Resume from existing study')
args = parser.parse_args()
if not args.start:
print("M1 Mirror TPE FEA Optimization V14")
print("=" * 50)
print("\nUsage:")
print(" python run_optimization.py --start")
print(" python run_optimization.py --start --trials 55")
print(" python run_optimization.py --start --trials 55 --resume")
print("\nFor 8-hour overnight run (~55 trials at 8-9 min/trial):")
print(" python run_optimization.py --start --trials 55")
print("\nThis will:")
print(" 1. Load FEA trials from V11, V12, V13 databases")
print(" 2. Seed TPE with all prior FEA data")
print(" 3. Run TPE optimization with weighted-sum objective")
print(" 4. Weights: 5*obj_40 + 5*obj_60 + 1*obj_mfg")
return
with open(CONFIG_PATH, 'r') as f:
config = json.load(f)
optimizer = TPEOptimizer(config)
optimizer.run(n_trials=args.trials, resume=args.resume)
if __name__ == "__main__":
main()