## Protocol 13: Adaptive Multi-Objective Optimization - Iterative FEA + Neural Network surrogate workflow - Initial FEA sampling, NN training, NN-accelerated search - FEA validation of top NN predictions, retraining loop - adaptive_state.json tracks iteration history and best values - M1 mirror study (V11) with 103 FEA, 3000 NN trials ## Dashboard Visualization Enhancements - Added Plotly.js interactive charts (parallel coords, Pareto, convergence) - Lazy loading with React.lazy() for performance - Code splitting: plotly.js-basic-dist (~1MB vs 3.5MB) - Chart library toggle (Recharts default, Plotly on-demand) - ExpandableChart component for full-screen modal views - ConsoleOutput component for real-time log viewing ## Documentation - Protocol 13 detailed documentation - Dashboard visualization guide - Plotly components README - Updated run-optimization skill with Mode 5 (adaptive) ## Bug Fixes - Fixed TypeScript errors in dashboard components - Fixed Card component to accept ReactNode title - Removed unused imports across components 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
983 lines
36 KiB
Python
983 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
M1 Mirror Adaptive Surrogate Optimization V11
|
|
==============================================
|
|
|
|
Adaptive optimization with neural surrogate and real FEA validation.
|
|
|
|
Key Features:
|
|
1. Cross-links V10 FEA data for initial surrogate training
|
|
2. Uses CORRECT relative filtered RMS from ZernikeExtractor
|
|
3. Actually RUNS FEA for validation (not just loads existing)
|
|
4. Tags trials as 'FEA' vs 'NN' for dashboard differentiation
|
|
5. Runs until convergence (no hard budget cap)
|
|
|
|
Workflow:
|
|
1. Load V10 FEA data -> Train initial surrogate
|
|
2. NN exploration (1000s of trials) -> Select promising candidates
|
|
3. FEA validation (5 per iteration) -> Update best
|
|
4. Retrain surrogate -> Repeat until convergence
|
|
|
|
Usage:
|
|
python run_optimization.py --start
|
|
python run_optimization.py --start --fea-batch 3 --patience 7
|
|
|
|
Dashboard:
|
|
FEA trials shown as circles (blue)
|
|
NN trials shown as crosses (orange)
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import time
|
|
import argparse
|
|
import logging
|
|
import sqlite3
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional, Any
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
# Add parent directories to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
import optuna
|
|
from optuna.samplers import TPESampler
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
# Atomizer imports
|
|
from optimization_engine.nx_solver import NXSolver
|
|
from optimization_engine.utils import ensure_nx_running
|
|
from optimization_engine.extractors import ZernikeExtractor
|
|
|
|
# ============================================================================
|
|
# Paths
|
|
# ============================================================================
|
|
|
|
STUDY_DIR = Path(__file__).parent
|
|
SETUP_DIR = STUDY_DIR / "1_setup"
|
|
ITERATIONS_DIR = STUDY_DIR / "2_iterations"
|
|
RESULTS_DIR = STUDY_DIR / "3_results"
|
|
CONFIG_PATH = SETUP_DIR / "optimization_config.json"
|
|
|
|
# V10 paths (source data)
|
|
V10_DIR = STUDY_DIR.parent / "m1_mirror_zernike_optimization_V10"
|
|
V10_DB = V10_DIR / "3_results" / "study.db"
|
|
V10_ITERATIONS = V10_DIR / "2_iterations"
|
|
V10_MODEL_DIR = V10_DIR / "1_setup" / "model"
|
|
|
|
# Ensure directories exist
|
|
ITERATIONS_DIR.mkdir(exist_ok=True)
|
|
RESULTS_DIR.mkdir(exist_ok=True)
|
|
|
|
# Logging
|
|
LOG_FILE = RESULTS_DIR / "optimization.log"
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s | %(levelname)-8s | %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler(LOG_FILE, mode='a')
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
@dataclass
|
|
class AdaptiveConfig:
|
|
"""Adaptive optimization configuration."""
|
|
max_iterations: int = 100
|
|
surrogate_trials_per_iter: int = 1000
|
|
fea_batch_size: int = 5
|
|
strategy: str = 'hybrid'
|
|
exploration_ratio: float = 0.3
|
|
convergence_threshold: float = 0.3
|
|
patience: int = 5
|
|
min_training_samples: int = 30
|
|
retrain_epochs: int = 300
|
|
|
|
# Objective weights
|
|
weight_40_vs_20: float = 5.0
|
|
weight_60_vs_20: float = 5.0
|
|
weight_mfg: float = 1.0
|
|
|
|
# Targets for normalization
|
|
target_40_vs_20: float = 4.0
|
|
target_60_vs_20: float = 10.0
|
|
target_mfg: float = 20.0
|
|
|
|
|
|
@dataclass
|
|
class AdaptiveState:
|
|
"""Tracks optimization state."""
|
|
iteration: int = 0
|
|
total_fea_count: int = 0
|
|
total_nn_count: int = 0
|
|
|
|
best_40_vs_20: float = float('inf')
|
|
best_60_vs_20: float = float('inf')
|
|
best_mfg: float = float('inf')
|
|
best_weighted: float = float('inf')
|
|
best_params: Dict = field(default_factory=dict)
|
|
|
|
no_improvement_count: int = 0
|
|
history: List[Dict] = field(default_factory=list)
|
|
|
|
|
|
# ============================================================================
|
|
# Neural Surrogate
|
|
# ============================================================================
|
|
|
|
class ZernikeSurrogateModel(nn.Module):
|
|
"""Multi-output surrogate for 3 objectives."""
|
|
|
|
def __init__(self, input_dim: int = 11, output_dim: int = 3,
|
|
hidden_dims: List[int] = None, dropout: float = 0.1):
|
|
super().__init__()
|
|
|
|
if hidden_dims is None:
|
|
hidden_dims = [128, 256, 256, 128, 64]
|
|
|
|
layers = []
|
|
prev_dim = input_dim
|
|
|
|
for hidden_dim in hidden_dims:
|
|
layers.extend([
|
|
nn.Linear(prev_dim, hidden_dim),
|
|
nn.BatchNorm1d(hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Dropout(dropout)
|
|
])
|
|
prev_dim = hidden_dim
|
|
|
|
layers.append(nn.Linear(prev_dim, output_dim))
|
|
self.network = nn.Sequential(*layers)
|
|
self._init_weights()
|
|
|
|
def _init_weights(self):
|
|
for m in self.modules():
|
|
if isinstance(m, nn.Linear):
|
|
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
|
|
if m.bias is not None:
|
|
nn.init.constant_(m.bias, 0)
|
|
|
|
def forward(self, x):
|
|
return self.network(x)
|
|
|
|
|
|
class ZernikeSurrogate:
|
|
"""Surrogate wrapper with training and MC Dropout inference."""
|
|
|
|
DESIGN_VAR_NAMES = [
|
|
'lateral_inner_angle', 'lateral_outer_angle', 'lateral_outer_pivot',
|
|
'lateral_inner_pivot', 'lateral_middle_pivot', 'lateral_closeness',
|
|
'whiffle_min', 'whiffle_outer_to_vertical', 'whiffle_triangle_closeness',
|
|
'blank_backface_angle', 'inner_circular_rib_dia'
|
|
]
|
|
|
|
OBJ_NAMES = ['rel_filtered_rms_40_vs_20', 'rel_filtered_rms_60_vs_20', 'mfg_90_optician_workload']
|
|
|
|
def __init__(self, device: str = None):
|
|
self.device = torch.device(device or ('cuda' if torch.cuda.is_available() else 'cpu'))
|
|
self.model = None
|
|
self.design_mean = None
|
|
self.design_std = None
|
|
self.obj_mean = None
|
|
self.obj_std = None
|
|
|
|
def train_from_data(self, fea_data: List[Dict], epochs: int = 300,
|
|
lr: float = 1e-3, batch_size: int = 16):
|
|
"""Train surrogate from FEA data."""
|
|
if len(fea_data) < 10:
|
|
raise ValueError(f"Need at least 10 samples, got {len(fea_data)}")
|
|
|
|
logger.info(f"Training surrogate on {len(fea_data)} FEA samples...")
|
|
|
|
# Prepare data
|
|
X = np.array([[d['params'][name] for name in self.DESIGN_VAR_NAMES]
|
|
for d in fea_data])
|
|
Y = np.array([[d['objectives'][name] for name in self.OBJ_NAMES]
|
|
for d in fea_data])
|
|
|
|
# Normalize
|
|
self.design_mean = X.mean(axis=0)
|
|
self.design_std = X.std(axis=0) + 1e-8
|
|
self.obj_mean = Y.mean(axis=0)
|
|
self.obj_std = Y.std(axis=0) + 1e-8
|
|
|
|
X_norm = (X - self.design_mean) / self.design_std
|
|
Y_norm = (Y - self.obj_mean) / self.obj_std
|
|
|
|
# Create model
|
|
self.model = ZernikeSurrogateModel(
|
|
input_dim=len(self.DESIGN_VAR_NAMES),
|
|
output_dim=len(self.OBJ_NAMES)
|
|
).to(self.device)
|
|
|
|
X_tensor = torch.tensor(X_norm, dtype=torch.float32, device=self.device)
|
|
Y_tensor = torch.tensor(Y_norm, dtype=torch.float32, device=self.device)
|
|
|
|
optimizer = torch.optim.AdamW(self.model.parameters(), lr=lr, weight_decay=1e-4)
|
|
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
|
|
|
|
self.model.train()
|
|
best_loss = float('inf')
|
|
|
|
for epoch in range(epochs):
|
|
indices = torch.randperm(len(X_tensor))
|
|
epoch_loss = 0.0
|
|
n_batches = 0
|
|
|
|
for i in range(0, len(indices), batch_size):
|
|
batch_idx = indices[i:i+batch_size]
|
|
if len(batch_idx) < 2:
|
|
continue
|
|
|
|
optimizer.zero_grad()
|
|
pred = self.model(X_tensor[batch_idx])
|
|
loss = nn.functional.mse_loss(pred, Y_tensor[batch_idx])
|
|
loss.backward()
|
|
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
|
|
optimizer.step()
|
|
|
|
epoch_loss += loss.item()
|
|
n_batches += 1
|
|
|
|
scheduler.step()
|
|
avg_loss = epoch_loss / max(n_batches, 1)
|
|
if avg_loss < best_loss:
|
|
best_loss = avg_loss
|
|
|
|
if (epoch + 1) % 100 == 0:
|
|
logger.info(f" Epoch {epoch+1}/{epochs}: Loss = {avg_loss:.6f}")
|
|
|
|
self.model.eval()
|
|
|
|
# Compute R^2
|
|
with torch.no_grad():
|
|
pred = self.model(X_tensor).cpu().numpy()
|
|
pred_denorm = pred * self.obj_std + self.obj_mean
|
|
|
|
for i, name in enumerate(self.OBJ_NAMES):
|
|
ss_res = np.sum((Y[:, i] - pred_denorm[:, i])**2)
|
|
ss_tot = np.sum((Y[:, i] - Y[:, i].mean())**2)
|
|
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
|
|
logger.info(f" {name} R^2: {r2:.4f}")
|
|
|
|
def predict(self, params: Dict[str, float]) -> Dict[str, float]:
|
|
"""Predict objectives from design parameters."""
|
|
self.model.eval()
|
|
|
|
x = np.array([params[name] for name in self.DESIGN_VAR_NAMES])
|
|
x_norm = (x - self.design_mean) / self.design_std
|
|
x_tensor = torch.tensor(x_norm, dtype=torch.float32, device=self.device)
|
|
|
|
with torch.no_grad():
|
|
pred_norm = self.model(x_tensor.unsqueeze(0)).cpu().numpy().flatten()
|
|
|
|
pred = pred_norm * self.obj_std + self.obj_mean
|
|
return {name: float(pred[i]) for i, name in enumerate(self.OBJ_NAMES)}
|
|
|
|
def predict_with_uncertainty(self, params: Dict[str, float],
|
|
n_samples: int = 30) -> Tuple[Dict[str, float], float]:
|
|
"""Predict with MC Dropout uncertainty."""
|
|
x = np.array([params[name] for name in self.DESIGN_VAR_NAMES])
|
|
x_norm = (x - self.design_mean) / self.design_std
|
|
x_tensor = torch.tensor(x_norm, dtype=torch.float32, device=self.device)
|
|
|
|
# Enable dropout only
|
|
self.model.eval()
|
|
for module in self.model.modules():
|
|
if isinstance(module, nn.Dropout):
|
|
module.train()
|
|
|
|
predictions = []
|
|
with torch.no_grad():
|
|
for _ in range(n_samples):
|
|
pred_norm = self.model(x_tensor.unsqueeze(0)).cpu().numpy().flatten()
|
|
pred = pred_norm * self.obj_std + self.obj_mean
|
|
predictions.append(pred)
|
|
|
|
self.model.eval()
|
|
|
|
predictions = np.array(predictions)
|
|
mean = predictions.mean(axis=0)
|
|
std = predictions.std(axis=0)
|
|
|
|
mean_pred = {name: float(mean[i]) for i, name in enumerate(self.OBJ_NAMES)}
|
|
uncertainty = float(np.sum(std))
|
|
|
|
return mean_pred, uncertainty
|
|
|
|
def save(self, path: Path):
|
|
"""Save model."""
|
|
torch.save({
|
|
'model_state_dict': self.model.state_dict(),
|
|
'design_mean': self.design_mean.tolist(),
|
|
'design_std': self.design_std.tolist(),
|
|
'obj_mean': self.obj_mean.tolist(),
|
|
'obj_std': self.obj_std.tolist()
|
|
}, path)
|
|
|
|
def load(self, path: Path):
|
|
"""Load model."""
|
|
checkpoint = torch.load(path, map_location=self.device, weights_only=False)
|
|
|
|
self.model = ZernikeSurrogateModel(
|
|
input_dim=len(self.DESIGN_VAR_NAMES),
|
|
output_dim=len(self.OBJ_NAMES)
|
|
).to(self.device)
|
|
self.model.load_state_dict(checkpoint['model_state_dict'])
|
|
self.model.eval()
|
|
|
|
self.design_mean = np.array(checkpoint['design_mean'])
|
|
self.design_std = np.array(checkpoint['design_std'])
|
|
self.obj_mean = np.array(checkpoint['obj_mean'])
|
|
self.obj_std = np.array(checkpoint['obj_std'])
|
|
|
|
|
|
# ============================================================================
|
|
# V10 Data Loader
|
|
# ============================================================================
|
|
|
|
def load_v10_fea_data() -> List[Dict]:
|
|
"""Load FEA data from V10 study database."""
|
|
if not V10_DB.exists():
|
|
logger.error(f"V10 database not found: {V10_DB}")
|
|
return []
|
|
|
|
conn = sqlite3.connect(str(V10_DB))
|
|
fea_data = []
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT trial_id, number FROM trials
|
|
WHERE state = 'COMPLETE'
|
|
''')
|
|
trials = cursor.fetchall()
|
|
|
|
for trial_id, trial_num in trials:
|
|
# Get params
|
|
cursor.execute('''
|
|
SELECT param_name, param_value FROM trial_params
|
|
WHERE trial_id = ?
|
|
''', (trial_id,))
|
|
params = {name: float(value) for name, value in cursor.fetchall()}
|
|
|
|
if not params:
|
|
continue
|
|
|
|
# Get objectives
|
|
cursor.execute('''
|
|
SELECT key, value_json FROM trial_user_attributes
|
|
WHERE trial_id = ? AND key = 'objectives'
|
|
''', (trial_id,))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
objectives = json.loads(result[1])
|
|
|
|
if all(k in objectives for k in ZernikeSurrogate.OBJ_NAMES):
|
|
fea_data.append({
|
|
'trial_num': trial_num,
|
|
'params': params,
|
|
'objectives': objectives,
|
|
'source': 'V10_FEA'
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading V10 data: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
logger.info(f"Loaded {len(fea_data)} FEA trials from V10")
|
|
return fea_data
|
|
|
|
|
|
# ============================================================================
|
|
# FEA Runner
|
|
# ============================================================================
|
|
|
|
class FEARunner:
|
|
"""Runs actual FEA simulations."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.nx_solver = None
|
|
self.nx_manager = None
|
|
self.master_model_dir = None
|
|
|
|
def setup(self):
|
|
"""Setup NX and copy master model."""
|
|
logger.info("Setting up NX session and model...")
|
|
|
|
# Copy master model from V10 if not exists
|
|
local_model_dir = SETUP_DIR / "model"
|
|
if not local_model_dir.exists():
|
|
logger.info(f"Copying master model from V10...")
|
|
shutil.copytree(V10_MODEL_DIR, local_model_dir)
|
|
|
|
self.master_model_dir = local_model_dir
|
|
|
|
# Setup NX
|
|
study_name = self.config.get('study_name', 'm1_adaptive_V11')
|
|
|
|
try:
|
|
self.nx_manager, nx_was_started = ensure_nx_running(
|
|
session_id=study_name,
|
|
auto_start=True,
|
|
start_timeout=120
|
|
)
|
|
logger.info("NX session ready" + (" (started)" if nx_was_started else " (existing)"))
|
|
except Exception as e:
|
|
logger.error(f"Failed to setup NX: {e}")
|
|
raise
|
|
|
|
# Initialize solver
|
|
import re
|
|
nx_settings = self.config.get('nx_settings', {})
|
|
nx_install_dir = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506')
|
|
version_match = re.search(r'NX(\d+)', nx_install_dir)
|
|
nastran_version = version_match.group(1) if version_match else "2506"
|
|
|
|
self.nx_solver = NXSolver(
|
|
master_model_dir=str(self.master_model_dir),
|
|
nx_install_dir=nx_install_dir,
|
|
nastran_version=nastran_version,
|
|
timeout=nx_settings.get('simulation_timeout_s', 600),
|
|
use_iteration_folders=True,
|
|
study_name="m1_mirror_adaptive_V11"
|
|
)
|
|
|
|
def run_fea(self, params: Dict[str, float], trial_num: int) -> Optional[Dict]:
|
|
"""Run FEA and extract objectives."""
|
|
if self.nx_solver is None:
|
|
self.setup()
|
|
|
|
logger.info(f" [FEA {trial_num}] Running simulation...")
|
|
|
|
expressions = {var['expression_name']: params[var['name']]
|
|
for var in self.config['design_variables']}
|
|
|
|
iter_folder = self.nx_solver.create_iteration_folder(
|
|
iterations_base_dir=ITERATIONS_DIR,
|
|
iteration_number=trial_num,
|
|
expression_updates=expressions
|
|
)
|
|
|
|
try:
|
|
nx_settings = self.config.get('nx_settings', {})
|
|
sim_file = iter_folder / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim')
|
|
|
|
t_start = time.time()
|
|
|
|
result = self.nx_solver.run_simulation(
|
|
sim_file=sim_file,
|
|
working_dir=iter_folder,
|
|
expression_updates=expressions,
|
|
solution_name=nx_settings.get('solution_name', 'Solution 1'),
|
|
cleanup=False
|
|
)
|
|
|
|
solve_time = time.time() - t_start
|
|
|
|
if not result['success']:
|
|
logger.error(f" [FEA {trial_num}] Solve failed: {result.get('error')}")
|
|
return None
|
|
|
|
logger.info(f" [FEA {trial_num}] Solved in {solve_time:.1f}s")
|
|
|
|
# Extract objectives
|
|
op2_path = Path(result['op2_file'])
|
|
objectives = self._extract_objectives(op2_path)
|
|
|
|
if objectives is None:
|
|
return None
|
|
|
|
logger.info(f" [FEA {trial_num}] 40-20: {objectives['rel_filtered_rms_40_vs_20']:.2f} nm")
|
|
logger.info(f" [FEA {trial_num}] 60-20: {objectives['rel_filtered_rms_60_vs_20']:.2f} nm")
|
|
logger.info(f" [FEA {trial_num}] Mfg: {objectives['mfg_90_optician_workload']:.2f} nm")
|
|
|
|
return {
|
|
'trial_num': trial_num,
|
|
'params': params,
|
|
'objectives': objectives,
|
|
'source': 'FEA',
|
|
'solve_time': solve_time
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f" [FEA {trial_num}] Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def _extract_objectives(self, op2_path: Path) -> Optional[Dict[str, float]]:
|
|
"""Extract objectives using ZernikeExtractor."""
|
|
try:
|
|
zernike_settings = self.config.get('zernike_settings', {})
|
|
|
|
extractor = ZernikeExtractor(
|
|
op2_path,
|
|
bdf_path=None,
|
|
displacement_unit=zernike_settings.get('displacement_unit', 'mm'),
|
|
n_modes=zernike_settings.get('n_modes', 50),
|
|
filter_orders=zernike_settings.get('filter_low_orders', 4)
|
|
)
|
|
|
|
ref = zernike_settings.get('reference_subcase', '2')
|
|
|
|
rel_40 = extractor.extract_relative("3", ref)
|
|
rel_60 = extractor.extract_relative("4", ref)
|
|
rel_90 = extractor.extract_relative("1", ref)
|
|
|
|
return {
|
|
'rel_filtered_rms_40_vs_20': rel_40['relative_filtered_rms_nm'],
|
|
'rel_filtered_rms_60_vs_20': rel_60['relative_filtered_rms_nm'],
|
|
'mfg_90_optician_workload': rel_90['relative_rms_filter_j1to3']
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Zernike extraction failed: {e}")
|
|
return None
|
|
|
|
def cleanup(self):
|
|
"""Cleanup NX session."""
|
|
if self.nx_manager:
|
|
if self.nx_manager.can_close_nx():
|
|
self.nx_manager.close_nx_if_allowed()
|
|
self.nx_manager.cleanup()
|
|
|
|
|
|
# ============================================================================
|
|
# Adaptive Optimizer
|
|
# ============================================================================
|
|
|
|
class AdaptiveOptimizer:
|
|
"""Main adaptive optimization loop."""
|
|
|
|
def __init__(self, config: AdaptiveConfig, study_config: Dict[str, Any]):
|
|
self.config = config
|
|
self.study_config = study_config
|
|
self.state = AdaptiveState()
|
|
|
|
self.surrogate = ZernikeSurrogate()
|
|
self.fea_runner = FEARunner(study_config)
|
|
|
|
# Load V10 data
|
|
self.fea_data = load_v10_fea_data()
|
|
self.state.total_fea_count = len(self.fea_data)
|
|
|
|
# Study database
|
|
self.db_path = RESULTS_DIR / "study.db"
|
|
self.storage = optuna.storages.RDBStorage(f'sqlite:///{self.db_path}')
|
|
|
|
def compute_weighted_objective(self, objectives: Dict[str, float]) -> float:
|
|
"""Compute normalized weighted objective."""
|
|
w1, w2, w3 = self.config.weight_40_vs_20, self.config.weight_60_vs_20, self.config.weight_mfg
|
|
t1, t2, t3 = self.config.target_40_vs_20, self.config.target_60_vs_20, self.config.target_mfg
|
|
|
|
weighted = (w1 * objectives['rel_filtered_rms_40_vs_20'] / t1 +
|
|
w2 * objectives['rel_filtered_rms_60_vs_20'] / t2 +
|
|
w3 * objectives['mfg_90_optician_workload'] / t3) / (w1 + w2 + w3)
|
|
|
|
return weighted
|
|
|
|
def run(self):
|
|
"""Run adaptive optimization."""
|
|
logger.info("\n" + "=" * 70)
|
|
logger.info("M1 MIRROR ADAPTIVE SURROGATE OPTIMIZATION V11")
|
|
logger.info("=" * 70)
|
|
logger.info(f"V10 FEA data: {len(self.fea_data)} trials")
|
|
logger.info(f"Strategy: {self.config.strategy}")
|
|
logger.info(f"NN trials/iter: {self.config.surrogate_trials_per_iter}")
|
|
logger.info(f"FEA batch/iter: {self.config.fea_batch_size}")
|
|
logger.info(f"Patience: {self.config.patience}")
|
|
|
|
start_time = time.time()
|
|
|
|
# Initialize best from V10 data
|
|
if self.fea_data:
|
|
for d in self.fea_data:
|
|
weighted = self.compute_weighted_objective(d['objectives'])
|
|
if weighted < self.state.best_weighted:
|
|
self.state.best_weighted = weighted
|
|
self.state.best_40_vs_20 = d['objectives']['rel_filtered_rms_40_vs_20']
|
|
self.state.best_60_vs_20 = d['objectives']['rel_filtered_rms_60_vs_20']
|
|
self.state.best_mfg = d['objectives']['mfg_90_optician_workload']
|
|
self.state.best_params = d['params']
|
|
|
|
logger.info(f"\nBest from V10 data:")
|
|
logger.info(f" 40-20: {self.state.best_40_vs_20:.2f} nm")
|
|
logger.info(f" 60-20: {self.state.best_60_vs_20:.2f} nm")
|
|
logger.info(f" Mfg: {self.state.best_mfg:.2f} nm")
|
|
|
|
# Store V10 FEA data in Optuna for dashboard (only if not already done)
|
|
self._store_v10_fea_in_optuna()
|
|
|
|
# Train initial surrogate
|
|
if len(self.fea_data) < self.config.min_training_samples:
|
|
logger.error(f"Not enough V10 data ({len(self.fea_data)}), need {self.config.min_training_samples}")
|
|
return
|
|
|
|
self.surrogate.train_from_data(self.fea_data, epochs=self.config.retrain_epochs)
|
|
self.surrogate.save(RESULTS_DIR / "surrogate_initial.pt")
|
|
|
|
# Main loop
|
|
while self.state.iteration < self.config.max_iterations:
|
|
self.state.iteration += 1
|
|
|
|
logger.info(f"\n{'=' * 70}")
|
|
logger.info(f"ITERATION {self.state.iteration}")
|
|
logger.info(f"FEA: {self.state.total_fea_count}, NN: {self.state.total_nn_count}")
|
|
logger.info(f"{'=' * 70}")
|
|
|
|
# 1. Surrogate exploration
|
|
candidates = self._surrogate_exploration()
|
|
|
|
# 2. Select candidates
|
|
selected = self._select_candidates(candidates)
|
|
|
|
# 3. FEA validation
|
|
new_results = self._fea_validation(selected)
|
|
|
|
if new_results:
|
|
# 4. Update state
|
|
improved = self._update_state(new_results)
|
|
|
|
# 5. Retrain
|
|
self.surrogate.train_from_data(self.fea_data, epochs=self.config.retrain_epochs)
|
|
self.surrogate.save(RESULTS_DIR / f"surrogate_iter{self.state.iteration}.pt")
|
|
|
|
if not improved:
|
|
self.state.no_improvement_count += 1
|
|
else:
|
|
self.state.no_improvement_count = 0
|
|
else:
|
|
self.state.no_improvement_count += 1
|
|
|
|
self._save_state()
|
|
|
|
# Check convergence
|
|
if self.state.no_improvement_count >= self.config.patience:
|
|
logger.info(f"\nCONVERGED after {self.config.patience} iterations without improvement")
|
|
break
|
|
|
|
# Final
|
|
elapsed = time.time() - start_time
|
|
self._print_final_results(elapsed)
|
|
self.fea_runner.cleanup()
|
|
|
|
def _surrogate_exploration(self) -> List[Dict]:
|
|
"""Fast NN exploration."""
|
|
logger.info(f"\nNN exploration ({self.config.surrogate_trials_per_iter} trials)...")
|
|
|
|
candidates = []
|
|
|
|
def objective(trial: optuna.Trial) -> float:
|
|
params = {}
|
|
for var in self.study_config['design_variables']:
|
|
if var.get('enabled', False):
|
|
params[var['name']] = trial.suggest_float(var['name'], var['min'], var['max'])
|
|
|
|
pred, uncertainty = self.surrogate.predict_with_uncertainty(params)
|
|
weighted = self.compute_weighted_objective(pred)
|
|
|
|
candidates.append({
|
|
'params': params,
|
|
'predicted': pred,
|
|
'weighted': weighted,
|
|
'uncertainty': uncertainty
|
|
})
|
|
|
|
# Tag as NN trial
|
|
trial.set_user_attr('source', 'NN')
|
|
trial.set_user_attr('predicted_40_vs_20', pred['rel_filtered_rms_40_vs_20'])
|
|
trial.set_user_attr('predicted_60_vs_20', pred['rel_filtered_rms_60_vs_20'])
|
|
trial.set_user_attr('predicted_mfg', pred['mfg_90_optician_workload'])
|
|
|
|
return weighted
|
|
|
|
study = optuna.create_study(
|
|
study_name=f"v11_iter{self.state.iteration}_nn",
|
|
storage=self.storage,
|
|
direction='minimize',
|
|
sampler=TPESampler(n_startup_trials=50, seed=42 + self.state.iteration),
|
|
load_if_exists=True
|
|
)
|
|
|
|
study.optimize(objective, n_trials=self.config.surrogate_trials_per_iter,
|
|
show_progress_bar=True)
|
|
|
|
self.state.total_nn_count += self.config.surrogate_trials_per_iter
|
|
|
|
logger.info(f" Best predicted: {study.best_value:.4f}")
|
|
|
|
return candidates
|
|
|
|
def _select_candidates(self, candidates: List[Dict]) -> List[Dict]:
|
|
"""Select candidates for FEA."""
|
|
n = self.config.fea_batch_size
|
|
|
|
if self.config.strategy == 'best':
|
|
selected = sorted(candidates, key=lambda c: c['weighted'])[:n]
|
|
elif self.config.strategy == 'uncertain':
|
|
selected = sorted(candidates, key=lambda c: -c['uncertainty'])[:n]
|
|
else: # hybrid
|
|
n_best = int(n * (1 - self.config.exploration_ratio))
|
|
n_uncertain = n - n_best
|
|
sorted_best = sorted(candidates, key=lambda c: c['weighted'])
|
|
sorted_uncertain = sorted(candidates, key=lambda c: -c['uncertainty'])
|
|
selected = sorted_best[:n_best] + sorted_uncertain[:n_uncertain]
|
|
|
|
logger.info(f"\nSelected {len(selected)} candidates for FEA")
|
|
return selected
|
|
|
|
def _store_v10_fea_in_optuna(self):
|
|
"""Store V10 FEA data in Optuna so it appears in dashboard."""
|
|
try:
|
|
# Check if already done
|
|
try:
|
|
existing_study = optuna.load_study(
|
|
study_name="v11_fea",
|
|
storage=self.storage
|
|
)
|
|
if len(existing_study.trials) >= len(self.fea_data):
|
|
logger.info(f"V10 FEA data already in Optuna ({len(existing_study.trials)} trials)")
|
|
return
|
|
except KeyError:
|
|
pass # Study doesn't exist yet
|
|
|
|
logger.info(f"Storing {len(self.fea_data)} V10 FEA trials in Optuna...")
|
|
|
|
fea_study = optuna.create_study(
|
|
study_name="v11_fea",
|
|
storage=self.storage,
|
|
direction='minimize',
|
|
load_if_exists=True
|
|
)
|
|
|
|
for d in self.fea_data:
|
|
trial = fea_study.ask()
|
|
|
|
# Set parameters
|
|
for var in self.study_config['design_variables']:
|
|
if var.get('enabled', False) and var['name'] in d['params']:
|
|
trial.suggest_float(var['name'], var['min'], var['max'])
|
|
trial.params[var['name']] = d['params'][var['name']]
|
|
|
|
weighted = self.compute_weighted_objective(d['objectives'])
|
|
|
|
# Set user attributes - tag as V10_FEA to distinguish
|
|
trial.set_user_attr('source', 'V10_FEA')
|
|
trial.set_user_attr('design_vars', d['params'])
|
|
trial.set_user_attr('rel_filtered_rms_40_vs_20', d['objectives']['rel_filtered_rms_40_vs_20'])
|
|
trial.set_user_attr('rel_filtered_rms_60_vs_20', d['objectives']['rel_filtered_rms_60_vs_20'])
|
|
trial.set_user_attr('mfg_90_optician_workload', d['objectives']['mfg_90_optician_workload'])
|
|
|
|
fea_study.tell(trial, weighted)
|
|
|
|
logger.info(f"Stored {len(self.fea_data)} V10 FEA trials in Optuna")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store V10 FEA data in Optuna: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def _store_fea_trial_in_optuna(self, result: Dict):
|
|
"""Store FEA trial in Optuna database so it appears in dashboard."""
|
|
try:
|
|
# Get or create FEA study
|
|
fea_study = optuna.create_study(
|
|
study_name="v11_fea",
|
|
storage=self.storage,
|
|
direction='minimize',
|
|
load_if_exists=True
|
|
)
|
|
|
|
# Create a trial with the FEA results
|
|
trial = fea_study.ask()
|
|
|
|
# Set parameters (design variables)
|
|
for var in self.study_config['design_variables']:
|
|
if var.get('enabled', False) and var['name'] in result['params']:
|
|
trial.suggest_float(var['name'], var['min'], var['max'])
|
|
# Override with actual value
|
|
trial.params[var['name']] = result['params'][var['name']]
|
|
|
|
# Compute weighted objective
|
|
weighted = self.compute_weighted_objective(result['objectives'])
|
|
|
|
# Set user attributes
|
|
trial.set_user_attr('source', 'FEA')
|
|
trial.set_user_attr('design_vars', result['params'])
|
|
trial.set_user_attr('rel_filtered_rms_40_vs_20', result['objectives']['rel_filtered_rms_40_vs_20'])
|
|
trial.set_user_attr('rel_filtered_rms_60_vs_20', result['objectives']['rel_filtered_rms_60_vs_20'])
|
|
trial.set_user_attr('mfg_90_optician_workload', result['objectives']['mfg_90_optician_workload'])
|
|
trial.set_user_attr('solve_time', result.get('solve_time', 0))
|
|
|
|
# Tell the study the result
|
|
fea_study.tell(trial, weighted)
|
|
|
|
logger.info(f" [FEA {result['trial_num']}] Stored in Optuna (weighted={weighted:.4f})")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store FEA trial in Optuna: {e}")
|
|
|
|
def _fea_validation(self, candidates: List[Dict]) -> List[Dict]:
|
|
"""Run real FEA on candidates."""
|
|
logger.info("\nRunning FEA validation...")
|
|
|
|
new_results = []
|
|
for i, cand in enumerate(candidates):
|
|
trial_num = self.state.total_fea_count + i + 1
|
|
result = self.fea_runner.run_fea(cand['params'], trial_num)
|
|
if result:
|
|
new_results.append(result)
|
|
# Store in Optuna for dashboard visibility
|
|
self._store_fea_trial_in_optuna(result)
|
|
|
|
return new_results
|
|
|
|
def _update_state(self, new_results: List[Dict]) -> bool:
|
|
"""Update state with new FEA results."""
|
|
self.fea_data.extend(new_results)
|
|
self.state.total_fea_count = len(self.fea_data)
|
|
|
|
improved = False
|
|
|
|
for result in new_results:
|
|
weighted = self.compute_weighted_objective(result['objectives'])
|
|
|
|
if weighted < self.state.best_weighted:
|
|
improvement = self.state.best_weighted - weighted
|
|
self.state.best_weighted = weighted
|
|
self.state.best_40_vs_20 = result['objectives']['rel_filtered_rms_40_vs_20']
|
|
self.state.best_60_vs_20 = result['objectives']['rel_filtered_rms_60_vs_20']
|
|
self.state.best_mfg = result['objectives']['mfg_90_optician_workload']
|
|
self.state.best_params = result['params']
|
|
|
|
if improvement > 0.01: # Normalized threshold
|
|
improved = True
|
|
logger.info(f"\n *** NEW BEST! ***")
|
|
logger.info(f" 40-20: {self.state.best_40_vs_20:.2f} nm")
|
|
logger.info(f" 60-20: {self.state.best_60_vs_20:.2f} nm")
|
|
logger.info(f" Mfg: {self.state.best_mfg:.2f} nm")
|
|
|
|
self.state.history.append({
|
|
'iteration': self.state.iteration,
|
|
'fea_count': self.state.total_fea_count,
|
|
'nn_count': self.state.total_nn_count,
|
|
'best_40_vs_20': self.state.best_40_vs_20,
|
|
'best_60_vs_20': self.state.best_60_vs_20,
|
|
'best_mfg': self.state.best_mfg,
|
|
'best_weighted': self.state.best_weighted,
|
|
'improved': improved
|
|
})
|
|
|
|
return improved
|
|
|
|
def _save_state(self):
|
|
"""Save state to JSON."""
|
|
with open(RESULTS_DIR / "adaptive_state.json", 'w') as f:
|
|
json.dump({
|
|
'iteration': self.state.iteration,
|
|
'total_fea_count': self.state.total_fea_count,
|
|
'total_nn_count': self.state.total_nn_count,
|
|
'best_40_vs_20': self.state.best_40_vs_20,
|
|
'best_60_vs_20': self.state.best_60_vs_20,
|
|
'best_mfg': self.state.best_mfg,
|
|
'best_weighted': self.state.best_weighted,
|
|
'best_params': self.state.best_params,
|
|
'history': self.state.history
|
|
}, f, indent=2)
|
|
|
|
def _print_final_results(self, elapsed: float):
|
|
"""Print final results."""
|
|
logger.info("\n" + "=" * 70)
|
|
logger.info("OPTIMIZATION COMPLETE")
|
|
logger.info("=" * 70)
|
|
logger.info(f"Time: {elapsed/60:.1f} min")
|
|
logger.info(f"Iterations: {self.state.iteration}")
|
|
logger.info(f"FEA evaluations: {self.state.total_fea_count}")
|
|
logger.info(f"NN evaluations: {self.state.total_nn_count}")
|
|
logger.info(f"\nBest FEA-validated:")
|
|
logger.info(f" 40-20: {self.state.best_40_vs_20:.2f} nm")
|
|
logger.info(f" 60-20: {self.state.best_60_vs_20:.2f} nm")
|
|
logger.info(f" Mfg: {self.state.best_mfg:.2f} nm")
|
|
logger.info(f"\nBest params:")
|
|
for name, value in sorted(self.state.best_params.items()):
|
|
logger.info(f" {name}: {value:.4f}")
|
|
|
|
# Save final results
|
|
with open(RESULTS_DIR / 'final_results.json', 'w') as f:
|
|
json.dump({
|
|
'summary': {
|
|
'iterations': self.state.iteration,
|
|
'fea_count': self.state.total_fea_count,
|
|
'nn_count': self.state.total_nn_count,
|
|
'best_40_vs_20': self.state.best_40_vs_20,
|
|
'best_60_vs_20': self.state.best_60_vs_20,
|
|
'best_mfg': self.state.best_mfg
|
|
},
|
|
'best_params': self.state.best_params,
|
|
'history': self.state.history
|
|
}, f, indent=2)
|
|
|
|
|
|
# ============================================================================
|
|
# Main
|
|
# ============================================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='M1 Mirror Adaptive V11')
|
|
parser.add_argument('--start', action='store_true', help='Start optimization')
|
|
parser.add_argument('--fea-batch', type=int, default=5, help='FEA per iteration')
|
|
parser.add_argument('--nn-trials', type=int, default=1000, help='NN trials per iteration')
|
|
parser.add_argument('--patience', type=int, default=5, help='Iterations without improvement')
|
|
parser.add_argument('--strategy', type=str, default='hybrid',
|
|
choices=['best', 'uncertain', 'hybrid'])
|
|
args = parser.parse_args()
|
|
|
|
if not args.start:
|
|
print("M1 Mirror Adaptive Surrogate Optimization V11")
|
|
print("=" * 50)
|
|
print("\nUsage:")
|
|
print(" python run_optimization.py --start")
|
|
print(" python run_optimization.py --start --fea-batch 3 --patience 7")
|
|
print("\nThis will:")
|
|
print(" 1. Load 90 FEA trials from V10")
|
|
print(" 2. Train neural surrogate")
|
|
print(" 3. Run adaptive optimization with FEA validation")
|
|
print(" 4. Tag trials as FEA/NN for dashboard")
|
|
return
|
|
|
|
with open(CONFIG_PATH, 'r') as f:
|
|
study_config = json.load(f)
|
|
|
|
adaptive_config = AdaptiveConfig(
|
|
surrogate_trials_per_iter=args.nn_trials,
|
|
fea_batch_size=args.fea_batch,
|
|
patience=args.patience,
|
|
strategy=args.strategy
|
|
)
|
|
|
|
optimizer = AdaptiveOptimizer(adaptive_config, study_config)
|
|
optimizer.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|