Files
Atomizer/studies/m1_mirror_zernike_optimization/run_optimization.py
Antoine ec5e42d733 feat: Add M1 mirror Zernike optimization with correct RMS calculation
Major improvements to telescope mirror optimization workflow:

Assembly FEM Workflow (solve_simulation.py):
- Fixed multi-part assembly FEM update sequence
- Use ImportFromFile() for reliable expression updates
- Add DuplicateNodesCheckBuilder with MergeOccurrenceNodes=True
- Switch to Foreground solve mode for multi-subcase solutions
- Add detailed logging and diagnostics for node merge operations

Zernike RMS Calculation:
- CRITICAL FIX: Use correct surface-based RMS formula
  - Global RMS = sqrt(mean(W^2)) from actual WFE values
  - Filtered RMS = sqrt(mean(W_residual^2)) after removing low-order fit
  - This matches zernike_Post_Script_NX.py (optical standard)
- Previous WRONG formula was: sqrt(sum(coeffs^2))
- Add compute_rms_filter_j1to3() for optician workload metric

Subcase Mapping:
- Fix subcase mapping to match NX model:
  - Subcase 1 = 90 deg (polishing orientation)
  - Subcase 2 = 20 deg (reference)
  - Subcase 3 = 40 deg
  - Subcase 4 = 60 deg

New Study: M1 Mirror Zernike Optimization
- Full optimization config with 11 design variables
- 3 objectives: rel_filtered_rms_40_vs_20, rel_filtered_rms_60_vs_20, mfg_90_optician_workload
- Neural surrogate support for accelerated optimization

Documentation:
- Update ZERNIKE_INTEGRATION.md with correct RMS formula
- Update ASSEMBLY_FEM_WORKFLOW.md with expression import and node merge details
- Add reference scripts from original zernike_Post_Script_NX.py

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 16:30:15 -05:00

1378 lines
46 KiB
Python

#!/usr/bin/env python3
"""
M1 Mirror Zernike Optimization with Neural Acceleration
========================================================
Optimizes telescope primary mirror support structure using Zernike
wavefront error metrics. Supports hybrid FEA/Neural workflow for
accelerated design space exploration.
Objectives:
- Minimize relative filtered RMS at 40 deg vs 20 deg
- Minimize relative filtered RMS at 60 deg vs 20 deg
- Minimize optician workload at 90 deg (polishing orientation)
Design Variables (11 total, selectively enabled):
- Whiffle tree parameters (min, outer_to_vertical, triangle_closeness)
- Lateral support parameters (angles, pivots, closeness)
- Mirror blank parameters (backface_angle, inner_circular_rib_dia)
Usage:
python run_optimization.py --run --trials 40 # FEA only
python run_optimization.py --run --trials 500 --enable-nn # With neural
python run_optimization.py --train-surrogate # Train NN from data
python run_optimization.py --status # Check progress
"""
import sys
import json
import time
import argparse
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
import numpy as np
# Add parent directories to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import optuna
from optuna.samplers import TPESampler, NSGAIISampler
# Atomizer imports
from optimization_engine.nx_solver import NXSolver
# Note: We use our own Zernike implementation with displacement-based subtraction
# instead of the generic ZernikeExtractor to match the original zernike_Post_Script_NX.py
# ==============================================================================
# Configuration
# ==============================================================================
STUDY_DIR = Path(__file__).parent
SETUP_DIR = STUDY_DIR / "1_setup"
RESULTS_DIR = STUDY_DIR / "2_results"
MODEL_DIR = SETUP_DIR / "model"
CONFIG_PATH = SETUP_DIR / "optimization_config.json"
# Ensure directories exist
RESULTS_DIR.mkdir(exist_ok=True)
def load_config() -> Dict[str, Any]:
"""Load optimization configuration."""
with open(CONFIG_PATH, 'r') as f:
return json.load(f)
# ==============================================================================
# Zernike Extraction Utilities
# ==============================================================================
# Displacement unit conversion: mm -> nm for WFE = 2 * Disp_Z
NM_PER_MM = 1e6 # 1 mm = 1e6 nm
def extract_displacements_from_op2(
op2_path: Path,
subcases: List[str] = ["1", "2", "3", "4"]
) -> Dict[str, Dict[str, np.ndarray]]:
"""
Extract raw displacement data from OP2 for all subcases.
Returns dict with structure:
{
'1': {'node_ids': array, 'disp_z': array, 'x': array, 'y': array},
'2': {...},
...
}
"""
from pyNastran.op2.op2 import OP2
op2 = OP2()
op2.read_op2(str(op2_path))
if not op2.displacements:
raise RuntimeError("No displacement subcases found in OP2.")
results = {}
for key, darr in op2.displacements.items():
data = darr.data
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
if dmat is None:
continue
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
# Try to identify subcase by isubcase
isub = getattr(darr, 'isubcase', None)
isub_str = str(int(isub)) if isinstance(isub, int) else None
if isub_str and isub_str in subcases:
results[isub_str] = {
'node_ids': node_ids.astype(int),
'disp_z': dmat[:, 2].astype(np.float64), # Z displacement
}
return results
def compute_zernike_from_wfe(
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
n_modes: int = 50
) -> Tuple[np.ndarray, float]:
"""
Compute Zernike coefficients from WFE surface data.
Uses least-squares fit on unit disk.
Returns (coefficients, R_max)
"""
from math import factorial
def noll_indices(j: int):
if j < 1:
raise ValueError("Noll index j must be >= 1")
count = 0
n = 0
while True:
if n == 0:
ms = [0]
elif n % 2 == 0:
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
else:
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
for m in ms:
count += 1
if count == j:
return n, m
n += 1
def zernike_noll(j, r, th):
n, m = noll_indices(j)
R = np.zeros_like(r)
for s in range((n - abs(m)) // 2 + 1):
c = ((-1)**s * factorial(n - s) /
(factorial(s) *
factorial((n + abs(m)) // 2 - s) *
factorial((n - abs(m)) // 2 - s)))
R += c * r**(n - 2*s)
if m == 0:
return R
return R * (np.cos(m * th) if m > 0 else np.sin(-m * th))
# Center and normalize coordinates
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
R_max = float(np.max(np.hypot(Xc, Yc)))
r = np.hypot(Xc / R_max, Yc / R_max).astype(np.float32)
th = np.arctan2(Yc, Xc).astype(np.float32)
# Mask: only use points inside unit disk
mask = (r <= 1.0) & ~np.isnan(W_nm)
if not np.any(mask):
raise RuntimeError("No valid points inside unit disk.")
idx = np.nonzero(mask)[0]
# Build design matrix in chunks
m = int(n_modes)
G = np.zeros((m, m), dtype=np.float64) # Z^T Z
h = np.zeros((m,), dtype=np.float64) # Z^T v
v = W_nm.astype(np.float64)
chunk_size = 100000
for start in range(0, len(idx), chunk_size):
sl = idx[start:start + chunk_size]
r_b, th_b, v_b = r[sl], th[sl], v[sl]
Zb = np.column_stack([zernike_noll(j, r_b, th_b).astype(np.float32)
for j in range(1, m + 1)])
G += (Zb.T @ Zb).astype(np.float64)
h += (Zb.T @ v_b).astype(np.float64)
try:
coeffs = np.linalg.solve(G, h)
except np.linalg.LinAlgError:
coeffs = np.linalg.lstsq(G, h, rcond=None)[0]
return coeffs, R_max
def compute_rms_from_surface(
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
coeffs: np.ndarray,
R_max: float,
filter_low_orders: int = 4
) -> Tuple[float, float]:
"""
Compute global and filtered RMS from actual WFE surface data.
This is the CORRECT approach matching zernike_Post_Script_NX.py:
- Global RMS = sqrt(mean(W^2)) - RMS of the actual WFE values
- Filtered RMS = sqrt(mean(W_residual^2)) where W_residual = W - low_order_fit
The low-order modes (piston, tip, tilt, defocus) are reconstructed from
coefficients and subtracted from the original WFE to get the residual.
Returns (global_rms, filtered_rms)
"""
from math import factorial
def noll_indices(j: int):
if j < 1:
raise ValueError("Noll index j must be >= 1")
count = 0
n = 0
while True:
if n == 0:
ms = [0]
elif n % 2 == 0:
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
else:
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
for m in ms:
count += 1
if count == j:
return n, m
n += 1
def zernike_noll(j, r, th):
n, m = noll_indices(j)
R = np.zeros_like(r)
for s in range((n - abs(m)) // 2 + 1):
c = ((-1)**s * factorial(n - s) /
(factorial(s) *
factorial((n + abs(m)) // 2 - s) *
factorial((n - abs(m)) // 2 - s)))
R += c * r**(n - 2*s)
if m == 0:
return R
return R * (np.cos(m * th) if m > 0 else np.sin(-m * th))
# Center and normalize coordinates
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
r = np.hypot(Xc / R_max, Yc / R_max)
th = np.arctan2(Yc, Xc)
# Build Zernike basis matrix for low-order modes
n_modes = len(coeffs)
Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, n_modes + 1)])
# Compute residual by removing low-order modes from original WFE
# W_res_filt = W_nm - Z[:, :filter_low_orders] @ coeffs[:filter_low_orders]
W_res_filt = W_nm - Z[:, :filter_low_orders].dot(coeffs[:filter_low_orders])
# RMS = sqrt(mean(W^2)) - this is the CORRECT formula
global_rms = float(np.sqrt((W_nm ** 2).mean()))
filtered_rms = float(np.sqrt((W_res_filt ** 2).mean()))
return global_rms, filtered_rms
def compute_rms_filter_j1to3(
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
coeffs: np.ndarray,
R_max: float
) -> float:
"""
Compute RMS with only J1-J3 removed (keeps defocus J4).
This is the "optician workload" metric for manufacturing.
Returns filtered RMS in nm.
"""
from math import factorial
def noll_indices(j: int):
if j < 1:
raise ValueError("Noll index j must be >= 1")
count = 0
n = 0
while True:
if n == 0:
ms = [0]
elif n % 2 == 0:
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
else:
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
for m in ms:
count += 1
if count == j:
return n, m
n += 1
def zernike_noll(j, r, th):
n, m = noll_indices(j)
R = np.zeros_like(r)
for s in range((n - abs(m)) // 2 + 1):
c = ((-1)**s * factorial(n - s) /
(factorial(s) *
factorial((n + abs(m)) // 2 - s) *
factorial((n - abs(m)) // 2 - s)))
R += c * r**(n - 2*s)
if m == 0:
return R
return R * (np.cos(m * th) if m > 0 else np.sin(-m * th))
# Center and normalize coordinates
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
r = np.hypot(Xc / R_max, Yc / R_max)
th = np.arctan2(Yc, Xc)
# Build Zernike basis matrix for first 3 modes only
Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, 4)])
# Remove only J1-J3 (piston, tip, tilt) - keeps defocus
W_res_filt3 = W_nm - Z.dot(coeffs[:3])
return float(np.sqrt((W_res_filt3 ** 2).mean()))
def extract_zernike_with_relative(
op2_path: Path,
bdf_path: Optional[Path] = None,
n_modes: int = 50,
subcases: List[str] = ["1", "2", "3", "4"],
subcase_labels: Dict[str, str] = {"1": "20deg", "2": "40deg", "3": "60deg", "4": "90deg"},
reference_subcase: str = "1"
) -> Dict[str, Dict[str, Any]]:
"""
Extract Zernike coefficients using displacement-based subtraction for relative metrics.
This is the CORRECT approach matching the original zernike_Post_Script_NX.py:
1. Subtract displacements node-by-node between subcases
2. Convert relative displacement to WFE (nm)
3. Fit Zernike to the relative WFE surface
4. Compute RMS from that fit
Returns dict with structure:
{
'1': {
'coefficients': [50 values], # Absolute
'global_rms_nm': float,
'filtered_rms_nm': float,
'relative_coefficients': None, # Reference has no relative
'relative_global_rms_nm': None,
'relative_filtered_rms_nm': None,
},
'2': {
'coefficients': [50 values], # Absolute
'relative_coefficients': [50 values], # Relative to reference
'relative_global_rms_nm': float,
'relative_filtered_rms_nm': float,
},
...
}
"""
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
# Read geometry for node coordinates
if bdf_path is None:
# Try to find matching BDF/DAT file
folder = op2_path.parent
base = op2_path.stem
for ext in [".dat", ".bdf"]:
candidate = folder / (base + ext)
if candidate.exists():
bdf_path = candidate
break
if bdf_path is None:
# Search for any BDF/DAT in folder
for f in folder.iterdir():
if f.suffix.lower() in [".dat", ".bdf"]:
bdf_path = f
break
if bdf_path is None or not bdf_path.exists():
raise FileNotFoundError(f"No BDF/DAT file found for geometry. OP2: {op2_path}")
print(f"[ZERNIKE] Reading geometry from: {bdf_path.name}")
bdf = BDF()
bdf.read_bdf(str(bdf_path))
node_geo = {int(nid): node.get_position() for nid, node in bdf.nodes.items()}
# Read displacements from OP2
print(f"[ZERNIKE] Reading displacements from: {op2_path.name}")
op2 = OP2()
op2.read_op2(str(op2_path))
if not op2.displacements:
raise RuntimeError("No displacement subcases found in OP2.")
# Extract displacement data for each subcase
disp_data = {}
for key, darr in op2.displacements.items():
data = darr.data
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
if dmat is None:
continue
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
isub = getattr(darr, 'isubcase', None)
isub_str = str(int(isub)) if isinstance(isub, int) else None
if isub_str and isub_str in subcases:
# Build dataframe-like structure with coordinates
X = np.array([node_geo.get(int(nid), (np.nan, np.nan, np.nan))[0] for nid in node_ids])
Y = np.array([node_geo.get(int(nid), (np.nan, np.nan, np.nan))[1] for nid in node_ids])
disp_z = dmat[:, 2].astype(np.float64)
disp_data[isub_str] = {
'node_ids': node_ids.astype(int),
'X': X,
'Y': Y,
'disp_z': disp_z,
}
print(f"[ZERNIKE] Extracted subcase {isub_str} ({subcase_labels.get(isub_str, '?')}): {len(node_ids)} nodes")
if reference_subcase not in disp_data:
raise ValueError(f"Reference subcase {reference_subcase} not found in OP2")
# Reference data
ref = disp_data[reference_subcase]
ref_node_set = set(ref['node_ids'])
results = {}
for subcase in subcases:
if subcase not in disp_data:
print(f"[WARN] Subcase {subcase} not found in OP2")
results[subcase] = None
continue
data = disp_data[subcase]
X = data['X']
Y = data['Y']
disp_z = data['disp_z']
node_ids = data['node_ids']
# Mask out nodes with invalid geometry
valid_mask = ~np.isnan(X) & ~np.isnan(Y)
X = X[valid_mask]
Y = Y[valid_mask]
disp_z = disp_z[valid_mask]
node_ids = node_ids[valid_mask]
# Absolute WFE = 2 * Disp_Z * scale
wfe_abs_nm = 2.0 * disp_z * NM_PER_MM
# Fit Zernike to absolute surface
coeffs_abs, R_max = compute_zernike_from_wfe(X, Y, wfe_abs_nm, n_modes)
# Use CORRECT RMS calculation: sqrt(mean(W^2)) from actual surface
global_rms_abs, filtered_rms_abs = compute_rms_from_surface(
X, Y, wfe_abs_nm, coeffs_abs, R_max, filter_low_orders=4
)
result = {
'coefficients': coeffs_abs.tolist(),
'global_rms_nm': global_rms_abs,
'filtered_rms_nm': filtered_rms_abs,
'relative_coefficients': None,
'relative_global_rms_nm': None,
'relative_filtered_rms_nm': None,
# Store surface data for optician workload calculation
'X': X,
'Y': Y,
'wfe_nm': wfe_abs_nm,
'R_max': R_max,
}
# Compute relative (vs reference) if not the reference subcase
if subcase != reference_subcase:
# Find common nodes
common_nodes = []
ref_disp_map = dict(zip(ref['node_ids'], ref['disp_z']))
rel_disp_z = []
rel_X = []
rel_Y = []
for i, nid in enumerate(node_ids):
if nid in ref_disp_map:
# SUBTRACT DISPLACEMENT NODE-BY-NODE (the correct approach!)
rel_dz = disp_z[i] - ref_disp_map[nid]
rel_disp_z.append(rel_dz)
rel_X.append(X[i])
rel_Y.append(Y[i])
rel_disp_z = np.array(rel_disp_z)
rel_X = np.array(rel_X)
rel_Y = np.array(rel_Y)
# Convert relative displacement to WFE
rel_wfe_nm = 2.0 * rel_disp_z * NM_PER_MM
# Fit Zernike to relative surface
coeffs_rel, R_max_rel = compute_zernike_from_wfe(rel_X, rel_Y, rel_wfe_nm, n_modes)
# Use CORRECT RMS calculation: sqrt(mean(W^2)) from actual surface
global_rms_rel, filtered_rms_rel = compute_rms_from_surface(
rel_X, rel_Y, rel_wfe_nm, coeffs_rel, R_max_rel, filter_low_orders=4
)
result['relative_coefficients'] = coeffs_rel.tolist()
result['relative_global_rms_nm'] = global_rms_rel
result['relative_filtered_rms_nm'] = filtered_rms_rel
# Store relative surface data for optician workload
result['rel_X'] = rel_X
result['rel_Y'] = rel_Y
result['rel_wfe_nm'] = rel_wfe_nm
result['rel_R_max'] = R_max_rel
label = subcase_labels.get(subcase, subcase)
ref_label = subcase_labels.get(reference_subcase, reference_subcase)
print(f"[ZERNIKE] {label} vs {ref_label}: Global RMS = {global_rms_rel:.2f} nm, Filtered RMS = {filtered_rms_rel:.2f} nm")
results[subcase] = result
return results
def compute_objectives_from_zernike(
zernike_data: Dict[str, Dict[str, Any]],
config: Dict[str, Any]
) -> Dict[str, float]:
"""
Compute optimization objectives from Zernike extraction results.
Uses the RELATIVE metrics (displacement-based subtraction) for the main objectives.
Returns dict with objective values.
"""
objectives = {}
zernike_settings = config.get('zernike_settings', {})
reference_subcase = zernike_settings.get('reference_subcase', '2')
polishing_subcase = zernike_settings.get('polishing_subcase', '1')
# Map subcase IDs based on config: 1=90deg, 2=20deg(ref), 3=40deg, 4=60deg
# Find subcases by their labels
subcase_labels = zernike_settings.get('subcase_labels', {})
label_to_subcase = {v: k for k, v in subcase_labels.items()}
subcase_40 = label_to_subcase.get('40deg', '3')
subcase_60 = label_to_subcase.get('60deg', '4')
subcase_90 = label_to_subcase.get('90deg', '1')
# Objective 1: Relative filtered RMS at 40 deg vs 20 deg (reference)
data_40 = zernike_data.get(subcase_40)
if data_40 and data_40.get('relative_filtered_rms_nm') is not None:
objectives['rel_filtered_rms_40_vs_20'] = float(data_40['relative_filtered_rms_nm'])
else:
objectives['rel_filtered_rms_40_vs_20'] = float('inf')
# Objective 2: Relative filtered RMS at 60 deg vs 20 deg (reference)
data_60 = zernike_data.get(subcase_60)
if data_60 and data_60.get('relative_filtered_rms_nm') is not None:
objectives['rel_filtered_rms_60_vs_20'] = float(data_60['relative_filtered_rms_nm'])
else:
objectives['rel_filtered_rms_60_vs_20'] = float('inf')
# Objective 3: Optician workload at 90 deg (polishing subcase)
# This is the relative (90 deg - 20 deg) with filter J1-J3 removed (keep defocus)
# CORRECT: Use surface-based RMS calculation, not coefficient sum
data_90 = zernike_data.get(subcase_90)
if (data_90 and data_90.get('relative_coefficients') is not None
and data_90.get('rel_X') is not None):
rel_coeffs_90 = np.array(data_90['relative_coefficients'])
rel_X = data_90['rel_X']
rel_Y = data_90['rel_Y']
rel_wfe = data_90['rel_wfe_nm']
rel_R_max = data_90['rel_R_max']
# Use CORRECT surface-based RMS with J1-J3 filter (keeps defocus)
rms_filter_j1to3 = compute_rms_filter_j1to3(
rel_X, rel_Y, rel_wfe, rel_coeffs_90, rel_R_max
)
objectives['mfg_90_optician_workload'] = float(rms_filter_j1to3)
else:
objectives['mfg_90_optician_workload'] = float('inf')
return objectives
def compute_weighted_objective(
objectives: Dict[str, float],
config: Dict[str, Any]
) -> float:
"""
Compute weighted sum objective from individual metrics.
"""
obj_configs = {o['name']: o for o in config.get('objectives', [])}
total = 0.0
total_weight = 0.0
for name, value in objectives.items():
if name in obj_configs:
obj_config = obj_configs[name]
weight = obj_config.get('weight', 1.0)
target = obj_config.get('target', 1.0)
# Normalize by target
if target and target > 0:
normalized = value / target
else:
normalized = value
total += weight * normalized
total_weight += weight
if total_weight > 0:
return total / total_weight
return total
# ==============================================================================
# FEA Objective Function
# ==============================================================================
def fea_objective(
trial: optuna.Trial,
config: Dict[str, Any],
nx_solver: NXSolver
) -> Tuple[float, Dict[str, Any]]:
"""
FEA-based objective function.
1. Suggest design variables
2. Update NX model
3. Run Nastran solve
4. Extract Zernike coefficients
5. Compute objectives
Returns (weighted_objective, trial_data)
"""
# Get enabled design variables
design_vars = {}
for var in config['design_variables']:
if var.get('enabled', False):
value = trial.suggest_float(
var['name'],
var['min'],
var['max']
)
design_vars[var['name']] = value
else:
# Use baseline for disabled variables
design_vars[var['name']] = var.get('baseline', (var['min'] + var['max']) / 2)
print(f"\n[Trial {trial.number}] Design variables:")
for name, value in design_vars.items():
print(f" {name}: {value:.4f}")
# Prepare expression updates (map to expression names)
t_start = time.time()
print(f"[Trial {trial.number}] Running NX simulation...")
try:
expressions = {
var['expression_name']: design_vars[var['name']]
for var in config['design_variables']
}
# Get file paths
nx_settings = config.get('nx_settings', {})
model_dir = Path(nx_settings.get('model_dir', str(MODEL_DIR)))
sim_file = model_dir / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim')
solution_name = nx_settings.get('solution_name', 'Solution 1')
# Run simulation via NXSolver (handles expressions + solve in one call)
result = nx_solver.run_simulation(
sim_file=sim_file,
working_dir=model_dir,
expression_updates=expressions,
solution_name=solution_name,
cleanup=False # Keep OP2 for Zernike extraction
)
if not result['success']:
error_msg = result.get('error', 'Unknown error')
print(f"[Trial {trial.number}] ERROR in solve: {error_msg}")
raise optuna.TrialPruned(f"Solve failed: {error_msg}")
op2_path = Path(result['op2_file'])
except optuna.TrialPruned:
raise
except Exception as e:
print(f"[Trial {trial.number}] ERROR in simulation: {e}")
raise optuna.TrialPruned(f"Simulation failed: {e}")
solve_time = time.time() - t_start
print(f"[Trial {trial.number}] Solve completed in {solve_time:.1f}s")
# Extract Zernike coefficients using displacement-based subtraction
print(f"[Trial {trial.number}] Extracting Zernike coefficients (displacement-based)...")
try:
zernike_settings = config.get('zernike_settings', {})
subcases = zernike_settings.get('subcases', ['1', '2', '3', '4'])
subcase_labels = zernike_settings.get('subcase_labels', {"1": "20deg", "2": "40deg", "3": "60deg", "4": "90deg"})
reference_subcase = zernike_settings.get('reference_subcase', '1')
n_modes = zernike_settings.get('n_modes', 50)
zernike_data = extract_zernike_with_relative(
op2_path,
bdf_path=None, # Will auto-detect
n_modes=n_modes,
subcases=subcases,
subcase_labels=subcase_labels,
reference_subcase=reference_subcase
)
except Exception as e:
print(f"[Trial {trial.number}] ERROR extracting Zernike: {e}")
import traceback
traceback.print_exc()
raise optuna.TrialPruned(f"Zernike extraction failed: {e}")
# Compute objectives
objectives = compute_objectives_from_zernike(zernike_data, config)
weighted_obj = compute_weighted_objective(objectives, config)
print(f"[Trial {trial.number}] Objectives:")
for name, value in objectives.items():
print(f" {name}: {value:.2f} nm")
print(f"[Trial {trial.number}] Weighted objective: {weighted_obj:.4f}")
# Store trial data
trial_data = {
'design_vars': design_vars,
'objectives': objectives,
'weighted_objective': weighted_obj,
'solve_time': solve_time,
'zernike_coefficients': {
subcase: data.get('coefficients') if data else None
for subcase, data in zernike_data.items()
},
'zernike_relative_coefficients': {
subcase: data.get('relative_coefficients') if data else None
for subcase, data in zernike_data.items()
},
'source': 'FEA'
}
# Store in trial user attributes
trial.set_user_attr('objectives', objectives)
trial.set_user_attr('solve_time', solve_time)
trial.set_user_attr('source', 'FEA')
# Store Zernike data for neural training (absolute coefficients)
for subcase, data in zernike_data.items():
if data and 'coefficients' in data:
trial.set_user_attr(f'zernike_coeffs_{subcase}', data['coefficients'])
# Also store relative coefficients
if data and data.get('relative_coefficients'):
trial.set_user_attr(f'zernike_rel_coeffs_{subcase}', data['relative_coefficients'])
return weighted_obj, trial_data
# ==============================================================================
# Neural Surrogate
# ==============================================================================
class ZernikeSurrogate:
"""
Neural surrogate for Zernike coefficient prediction.
Predicts 50 Zernike coefficients for each of 4 subcases (200 outputs)
from design variables input.
"""
def __init__(self, model_path: Optional[Path] = None):
self.model = None
self.model_path = model_path
self.design_var_names = []
self.design_mean = None
self.design_std = None
self.output_mean = None
self.output_std = None
self.subcases = ['20', '40', '60', '90']
self.n_modes = 50
if model_path and model_path.exists():
self.load(model_path)
def load(self, model_path: Path):
"""Load trained model from checkpoint."""
import torch
checkpoint = torch.load(model_path, map_location='cpu')
self.design_var_names = checkpoint.get('design_var_names', [])
self.design_mean = torch.tensor(checkpoint['normalization']['design_mean'])
self.design_std = torch.tensor(checkpoint['normalization']['design_std'])
self.output_mean = torch.tensor(checkpoint['normalization']['output_mean'])
self.output_std = torch.tensor(checkpoint['normalization']['output_std'])
self.subcases = checkpoint.get('subcases', ['20', '40', '60', '90'])
self.n_modes = checkpoint.get('n_modes', 50)
# Build model architecture
config = checkpoint['config']
self.model = self._build_model(config)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model.eval()
print(f"[Surrogate] Loaded model from {model_path}")
print(f"[Surrogate] Design vars: {self.design_var_names}")
print(f"[Surrogate] Outputs: {len(self.subcases)} subcases x {self.n_modes} coefficients")
def _build_model(self, config: Dict[str, Any]):
"""Build MLP model for Zernike prediction."""
import torch
import torch.nn as nn
input_dim = len(self.design_var_names)
output_dim = len(self.subcases) * self.n_modes
hidden_dim = config.get('hidden_channels', 128)
num_layers = config.get('num_layers', 4)
layers = []
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.ReLU())
for _ in range(num_layers - 1):
layers.append(nn.Linear(hidden_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Linear(hidden_dim, output_dim))
return nn.Sequential(*layers)
def predict(self, design_vars: Dict[str, float]) -> Dict[str, np.ndarray]:
"""
Predict Zernike coefficients for all subcases.
Returns dict mapping subcase to coefficient array.
"""
import torch
if self.model is None:
raise RuntimeError("Model not loaded")
# Build input tensor
input_values = [design_vars.get(name, 0.0) for name in self.design_var_names]
x = torch.tensor(input_values, dtype=torch.float32)
# Normalize
x_norm = (x - self.design_mean) / self.design_std
# Predict
with torch.no_grad():
y_norm = self.model(x_norm.unsqueeze(0))
# Denormalize
y = y_norm * self.output_std + self.output_mean
y = y.squeeze(0).numpy()
# Split into subcases
results = {}
for i, subcase in enumerate(self.subcases):
start = i * self.n_modes
end = start + self.n_modes
results[subcase] = y[start:end]
return results
def predict_objectives(
self,
design_vars: Dict[str, float],
config: Dict[str, Any]
) -> Dict[str, float]:
"""Predict objectives from design variables."""
coeffs = self.predict(design_vars)
# Convert to format expected by compute_objectives_from_zernike
zernike_data = {
subcase: {'coefficients': coeffs[subcase].tolist()}
for subcase in self.subcases
}
return compute_objectives_from_zernike(zernike_data, config)
def neural_objective(
trial: optuna.Trial,
config: Dict[str, Any],
surrogate: ZernikeSurrogate
) -> float:
"""Neural surrogate objective function."""
# Suggest design variables
design_vars = {}
for var in config['design_variables']:
if var.get('enabled', False):
value = trial.suggest_float(var['name'], var['min'], var['max'])
design_vars[var['name']] = value
else:
design_vars[var['name']] = var.get('baseline', (var['min'] + var['max']) / 2)
# Predict with surrogate
t_start = time.time()
objectives = surrogate.predict_objectives(design_vars, config)
inference_time = (time.time() - t_start) * 1000
weighted_obj = compute_weighted_objective(objectives, config)
# Store trial attributes
trial.set_user_attr('objectives', objectives)
trial.set_user_attr('inference_time_ms', inference_time)
trial.set_user_attr('source', 'Neural')
trial.set_user_attr('neural_predicted', True)
return weighted_obj
# ==============================================================================
# Training
# ==============================================================================
def train_zernike_surrogate(config: Dict[str, Any]) -> Path:
"""
Train neural surrogate on FEA trial data.
Extracts Zernike coefficients from completed trials and trains
an MLP to predict them from design variables.
"""
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
print("\n" + "="*60)
print("Training Zernike Surrogate Model")
print("="*60)
# Load study
db_path = RESULTS_DIR / "study.db"
if not db_path.exists():
raise FileNotFoundError(f"No study database found at {db_path}")
storage = optuna.storages.RDBStorage(f'sqlite:///{db_path}')
study = optuna.load_study(
study_name=config['study_name'],
storage=storage
)
# Get FEA trials with Zernike data
fea_trials = [
t for t in study.trials
if t.state == optuna.trial.TrialState.COMPLETE
and t.user_attrs.get('source') == 'FEA'
and t.user_attrs.get('zernike_coeffs_20') is not None
]
print(f"Found {len(fea_trials)} FEA trials with Zernike data")
if len(fea_trials) < 10:
raise ValueError(f"Need at least 10 FEA trials for training, have {len(fea_trials)}")
# Get design variable names (enabled ones)
design_var_names = [
var['name'] for var in config['design_variables']
if var.get('enabled', False)
]
subcases = config.get('zernike_settings', {}).get('subcases', ['20', '40', '60', '90'])
n_modes = config.get('zernike_settings', {}).get('n_modes', 50)
# Build training data
X = []
Y = []
for trial in fea_trials:
# Design variables
x = [trial.params.get(name, 0.0) for name in design_var_names]
X.append(x)
# Zernike coefficients for all subcases
y = []
for subcase in subcases:
coeffs = trial.user_attrs.get(f'zernike_coeffs_{subcase}', [0.0] * n_modes)
y.extend(coeffs[:n_modes])
Y.append(y)
X = np.array(X, dtype=np.float32)
Y = np.array(Y, dtype=np.float32)
print(f"Training data shape: X={X.shape}, Y={Y.shape}")
# Normalize
X_mean = X.mean(axis=0)
X_std = X.std(axis=0) + 1e-8
Y_mean = Y.mean(axis=0)
Y_std = Y.std(axis=0) + 1e-8
X_norm = (X - X_mean) / X_std
Y_norm = (Y - Y_mean) / Y_std
# Train/val split
n_train = int(0.8 * len(X))
indices = np.random.permutation(len(X))
train_idx = indices[:n_train]
val_idx = indices[n_train:]
X_train = torch.tensor(X_norm[train_idx])
Y_train = torch.tensor(Y_norm[train_idx])
X_val = torch.tensor(X_norm[val_idx])
Y_val = torch.tensor(Y_norm[val_idx])
# Build model
train_config = config.get('surrogate_settings', {}).get('training_config', {})
hidden_dim = train_config.get('hidden_channels', 128)
num_layers = train_config.get('num_layers', 4)
input_dim = len(design_var_names)
output_dim = len(subcases) * n_modes
layers = []
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.ReLU())
for _ in range(num_layers - 1):
layers.append(nn.Linear(hidden_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Linear(hidden_dim, output_dim))
model = nn.Sequential(*layers)
# Training
optimizer = torch.optim.Adam(
model.parameters(),
lr=train_config.get('learning_rate', 0.001)
)
criterion = nn.MSELoss()
epochs = train_config.get('epochs', 200)
batch_size = train_config.get('batch_size', 8)
train_dataset = TensorDataset(X_train, Y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
best_val_loss = float('inf')
best_state = None
print(f"\nTraining for {epochs} epochs...")
for epoch in range(epochs):
# Train
model.train()
train_loss = 0.0
for xb, yb in train_loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# Validate
model.eval()
with torch.no_grad():
val_pred = model(X_val)
val_loss = criterion(val_pred, Y_val).item()
if val_loss < best_val_loss:
best_val_loss = val_loss
best_state = model.state_dict().copy()
if (epoch + 1) % 20 == 0:
print(f" Epoch {epoch+1}/{epochs}: train_loss={train_loss:.6f}, val_loss={val_loss:.6f}")
print(f"\nBest validation loss: {best_val_loss:.6f}")
# Save model
model.load_state_dict(best_state)
model_dir = RESULTS_DIR / "zernike_surrogate"
model_dir.mkdir(exist_ok=True)
model_path = model_dir / "checkpoint_best.pt"
checkpoint = {
'model_state_dict': best_state,
'config': train_config,
'design_var_names': design_var_names,
'subcases': subcases,
'n_modes': n_modes,
'normalization': {
'design_mean': X_mean.tolist(),
'design_std': X_std.tolist(),
'output_mean': Y_mean.tolist(),
'output_std': Y_std.tolist(),
},
'best_val_loss': best_val_loss,
'n_training_samples': len(fea_trials),
}
torch.save(checkpoint, model_path)
print(f"\nModel saved to: {model_path}")
return model_path
# ==============================================================================
# Main Optimization
# ==============================================================================
def run_optimization(
n_trials: int = 100,
enable_nn: bool = False,
resume: bool = True
):
"""
Run the optimization study.
Args:
n_trials: Number of trials to run
enable_nn: Whether to use neural surrogate
resume: Whether to resume existing study
"""
config = load_config()
study_name = config['study_name']
print("\n" + "="*70)
print(f"M1 MIRROR ZERNIKE OPTIMIZATION")
print("="*70)
print(f"Study: {study_name}")
print(f"Trials: {n_trials}")
print(f"Neural: {'Enabled' if enable_nn else 'Disabled'}")
print("="*70)
# Print enabled design variables
enabled_vars = [v for v in config['design_variables'] if v.get('enabled')]
print(f"\nEnabled design variables ({len(enabled_vars)}):")
for var in enabled_vars:
print(f" {var['name']}: [{var['min']}, {var['max']}] {var.get('units', '')}")
# Print objectives
print(f"\nObjectives:")
for obj in config['objectives']:
print(f" {obj['name']}: weight={obj.get('weight', 1)}, target={obj.get('target', 'N/A')} {obj.get('units', '')}")
# Setup storage
db_path = RESULTS_DIR / "study.db"
storage = optuna.storages.RDBStorage(
f'sqlite:///{db_path}',
heartbeat_interval=60,
failed_trial_callback=optuna.storages.RetryFailedTrialCallback(max_retry=3)
)
# Create or load study
sampler = TPESampler(
seed=config['optimization_settings'].get('seed', 42),
n_startup_trials=config['optimization_settings'].get('n_startup_trials', 15),
multivariate=config['optimization_settings'].get('tpe_multivariate', True)
)
if resume:
try:
study = optuna.load_study(
study_name=study_name,
storage=storage,
sampler=sampler
)
print(f"\nResumed study with {len(study.trials)} existing trials")
except KeyError:
study = optuna.create_study(
study_name=study_name,
storage=storage,
sampler=sampler,
direction='minimize'
)
print(f"\nCreated new study: {study_name}")
else:
study = optuna.create_study(
study_name=study_name,
storage=storage,
sampler=sampler,
direction='minimize'
)
print(f"\nCreated new study: {study_name}")
# Initialize NX solver (lazy - only when needed for FEA)
nx_solver = None
def get_nx_solver():
nonlocal nx_solver
if nx_solver is None:
import re
nx_settings = config.get('nx_settings', {})
# Use model_dir from config if specified, otherwise default to MODEL_DIR
model_dir = Path(nx_settings.get('model_dir', str(MODEL_DIR)))
sim_file = model_dir / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim')
if not sim_file.exists():
raise FileNotFoundError(
f"Simulation file not found: {sim_file}\n"
f"Please check model_dir in config or copy files to: {MODEL_DIR}"
)
# Get NX installation path and extract version
nx_install_path = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506')
# Extract version from path (e.g., "NX2506" -> "2506")
version_match = re.search(r'NX(\d+)', nx_install_path)
nastran_version = version_match.group(1) if version_match else "2506"
nx_solver = NXSolver(
nx_install_dir=Path(nx_install_path),
nastran_version=nastran_version,
timeout=nx_settings.get('op2_timeout_s', 1800),
study_name=config['study_name']
)
return nx_solver
# Load neural surrogate if enabled
surrogate = None
if enable_nn:
surrogate_path = RESULTS_DIR / "zernike_surrogate" / "checkpoint_best.pt"
if surrogate_path.exists():
surrogate = ZernikeSurrogate(surrogate_path)
print(f"\nLoaded neural surrogate from {surrogate_path}")
else:
print(f"\n[WARN] Neural surrogate not found at {surrogate_path}")
print("[WARN] Running FEA trials. Train surrogate with --train-surrogate")
enable_nn = False
# Run trials
print(f"\nStarting optimization ({n_trials} trials)...")
for i in range(n_trials):
trial = study.ask()
try:
if enable_nn and surrogate is not None:
# Neural trial
value = neural_objective(trial, config, surrogate)
study.tell(trial, value)
objectives = trial.user_attrs.get('objectives', {})
print(f"[Neural Trial {trial.number}] weighted_obj={value:.4f} "
f"(40vs20={objectives.get('rel_filtered_rms_40_vs_20', 0):.1f}nm, "
f"60vs20={objectives.get('rel_filtered_rms_60_vs_20', 0):.1f}nm)")
else:
# FEA trial
solver = get_nx_solver()
value, trial_data = fea_objective(trial, config, solver)
study.tell(trial, value)
except optuna.TrialPruned as e:
study.tell(trial, state=optuna.trial.TrialState.PRUNED)
print(f"[Trial {trial.number}] Pruned: {e}")
except Exception as e:
study.tell(trial, state=optuna.trial.TrialState.FAIL)
print(f"[Trial {trial.number}] Failed: {e}")
import traceback
traceback.print_exc()
# Summary
print("\n" + "="*70)
print("OPTIMIZATION COMPLETE")
print("="*70)
completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
print(f"Completed trials: {len(completed)}")
if completed:
best = study.best_trial
print(f"\nBest trial: #{best.number}")
print(f"Best weighted objective: {best.value:.4f}")
print("\nBest parameters:")
for name, value in best.params.items():
print(f" {name}: {value:.4f}")
objectives = best.user_attrs.get('objectives', {})
if objectives:
print("\nBest objectives:")
for name, value in objectives.items():
print(f" {name}: {value:.2f} nm")
def show_status():
"""Show current optimization status."""
config = load_config()
study_name = config['study_name']
db_path = RESULTS_DIR / "study.db"
if not db_path.exists():
print("No study found. Run optimization first.")
return
storage = optuna.storages.RDBStorage(f'sqlite:///{db_path}')
study = optuna.load_study(study_name=study_name, storage=storage)
completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
fea_trials = [t for t in completed if t.user_attrs.get('source') == 'FEA']
nn_trials = [t for t in completed if t.user_attrs.get('source') == 'Neural']
print("\n" + "="*60)
print(f"STUDY STATUS: {study_name}")
print("="*60)
print(f"Total trials: {len(study.trials)}")
print(f"Completed: {len(completed)}")
print(f" FEA trials: {len(fea_trials)}")
print(f" Neural trials: {len(nn_trials)}")
if completed:
best = study.best_trial
print(f"\nBest trial: #{best.number} ({best.user_attrs.get('source', 'Unknown')})")
print(f"Best value: {best.value:.4f}")
objectives = best.user_attrs.get('objectives', {})
if objectives:
print("\nBest objectives:")
for name, value in objectives.items():
print(f" {name}: {value:.2f} nm")
# ==============================================================================
# CLI
# ==============================================================================
def main():
parser = argparse.ArgumentParser(
description="M1 Mirror Zernike Optimization with Neural Acceleration"
)
parser.add_argument('--run', action='store_true',
help='Run optimization')
parser.add_argument('--trials', type=int, default=100,
help='Number of trials to run')
parser.add_argument('--enable-nn', action='store_true',
help='Enable neural surrogate')
parser.add_argument('--resume', action='store_true', default=True,
help='Resume existing study')
parser.add_argument('--train-surrogate', action='store_true',
help='Train neural surrogate from FEA data')
parser.add_argument('--status', action='store_true',
help='Show optimization status')
args = parser.parse_args()
if args.train_surrogate:
config = load_config()
train_zernike_surrogate(config)
elif args.run:
run_optimization(
n_trials=args.trials,
enable_nn=args.enable_nn,
resume=args.resume
)
elif args.status:
show_status()
else:
parser.print_help()
if __name__ == '__main__':
main()