#!/usr/bin/env python3 """ M1 Mirror Zernike Optimization with Neural Acceleration ======================================================== Optimizes telescope primary mirror support structure using Zernike wavefront error metrics. Supports hybrid FEA/Neural workflow for accelerated design space exploration. Objectives: - Minimize relative filtered RMS at 40 deg vs 20 deg - Minimize relative filtered RMS at 60 deg vs 20 deg - Minimize optician workload at 90 deg (polishing orientation) Design Variables (11 total, selectively enabled): - Whiffle tree parameters (min, outer_to_vertical, triangle_closeness) - Lateral support parameters (angles, pivots, closeness) - Mirror blank parameters (backface_angle, inner_circular_rib_dia) Usage: python run_optimization.py --run --trials 40 # FEA only python run_optimization.py --run --trials 500 --enable-nn # With neural python run_optimization.py --train-surrogate # Train NN from data python run_optimization.py --status # Check progress """ import sys import json import time import argparse from pathlib import Path from datetime import datetime from typing import Dict, Any, List, Optional, Tuple import numpy as np # Add parent directories to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) import optuna from optuna.samplers import TPESampler, NSGAIISampler # Atomizer imports from optimization_engine.nx_solver import NXSolver # Note: We use our own Zernike implementation with displacement-based subtraction # instead of the generic ZernikeExtractor to match the original zernike_Post_Script_NX.py # ============================================================================== # Configuration # ============================================================================== STUDY_DIR = Path(__file__).parent SETUP_DIR = STUDY_DIR / "1_setup" RESULTS_DIR = STUDY_DIR / "2_results" MODEL_DIR = SETUP_DIR / "model" CONFIG_PATH = SETUP_DIR / "optimization_config.json" # Ensure directories exist RESULTS_DIR.mkdir(exist_ok=True) def load_config() -> Dict[str, Any]: """Load optimization configuration.""" with open(CONFIG_PATH, 'r') as f: return json.load(f) # ============================================================================== # Zernike Extraction Utilities # ============================================================================== # Displacement unit conversion: mm -> nm for WFE = 2 * Disp_Z NM_PER_MM = 1e6 # 1 mm = 1e6 nm def extract_displacements_from_op2( op2_path: Path, subcases: List[str] = ["1", "2", "3", "4"] ) -> Dict[str, Dict[str, np.ndarray]]: """ Extract raw displacement data from OP2 for all subcases. Returns dict with structure: { '1': {'node_ids': array, 'disp_z': array, 'x': array, 'y': array}, '2': {...}, ... } """ from pyNastran.op2.op2 import OP2 op2 = OP2() op2.read_op2(str(op2_path)) if not op2.displacements: raise RuntimeError("No displacement subcases found in OP2.") results = {} for key, darr in op2.displacements.items(): data = darr.data dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None) if dmat is None: continue ngt = darr.node_gridtype.astype(int) node_ids = ngt if ngt.ndim == 1 else ngt[:, 0] # Try to identify subcase by isubcase isub = getattr(darr, 'isubcase', None) isub_str = str(int(isub)) if isinstance(isub, int) else None if isub_str and isub_str in subcases: results[isub_str] = { 'node_ids': node_ids.astype(int), 'disp_z': dmat[:, 2].astype(np.float64), # Z displacement } return results def compute_zernike_from_wfe( X: np.ndarray, Y: np.ndarray, W_nm: np.ndarray, n_modes: int = 50 ) -> Tuple[np.ndarray, float]: """ Compute Zernike coefficients from WFE surface data. Uses least-squares fit on unit disk. Returns (coefficients, R_max) """ from math import factorial def noll_indices(j: int): if j < 1: raise ValueError("Noll index j must be >= 1") count = 0 n = 0 while True: if n == 0: ms = [0] elif n % 2 == 0: ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)] else: ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))] for m in ms: count += 1 if count == j: return n, m n += 1 def zernike_noll(j, r, th): n, m = noll_indices(j) R = np.zeros_like(r) for s in range((n - abs(m)) // 2 + 1): c = ((-1)**s * factorial(n - s) / (factorial(s) * factorial((n + abs(m)) // 2 - s) * factorial((n - abs(m)) // 2 - s))) R += c * r**(n - 2*s) if m == 0: return R return R * (np.cos(m * th) if m > 0 else np.sin(-m * th)) # Center and normalize coordinates Xc = X - np.mean(X) Yc = Y - np.mean(Y) R_max = float(np.max(np.hypot(Xc, Yc))) r = np.hypot(Xc / R_max, Yc / R_max).astype(np.float32) th = np.arctan2(Yc, Xc).astype(np.float32) # Mask: only use points inside unit disk mask = (r <= 1.0) & ~np.isnan(W_nm) if not np.any(mask): raise RuntimeError("No valid points inside unit disk.") idx = np.nonzero(mask)[0] # Build design matrix in chunks m = int(n_modes) G = np.zeros((m, m), dtype=np.float64) # Z^T Z h = np.zeros((m,), dtype=np.float64) # Z^T v v = W_nm.astype(np.float64) chunk_size = 100000 for start in range(0, len(idx), chunk_size): sl = idx[start:start + chunk_size] r_b, th_b, v_b = r[sl], th[sl], v[sl] Zb = np.column_stack([zernike_noll(j, r_b, th_b).astype(np.float32) for j in range(1, m + 1)]) G += (Zb.T @ Zb).astype(np.float64) h += (Zb.T @ v_b).astype(np.float64) try: coeffs = np.linalg.solve(G, h) except np.linalg.LinAlgError: coeffs = np.linalg.lstsq(G, h, rcond=None)[0] return coeffs, R_max def compute_rms_from_surface( X: np.ndarray, Y: np.ndarray, W_nm: np.ndarray, coeffs: np.ndarray, R_max: float, filter_low_orders: int = 4 ) -> Tuple[float, float]: """ Compute global and filtered RMS from actual WFE surface data. This is the CORRECT approach matching zernike_Post_Script_NX.py: - Global RMS = sqrt(mean(W^2)) - RMS of the actual WFE values - Filtered RMS = sqrt(mean(W_residual^2)) where W_residual = W - low_order_fit The low-order modes (piston, tip, tilt, defocus) are reconstructed from coefficients and subtracted from the original WFE to get the residual. Returns (global_rms, filtered_rms) """ from math import factorial def noll_indices(j: int): if j < 1: raise ValueError("Noll index j must be >= 1") count = 0 n = 0 while True: if n == 0: ms = [0] elif n % 2 == 0: ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)] else: ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))] for m in ms: count += 1 if count == j: return n, m n += 1 def zernike_noll(j, r, th): n, m = noll_indices(j) R = np.zeros_like(r) for s in range((n - abs(m)) // 2 + 1): c = ((-1)**s * factorial(n - s) / (factorial(s) * factorial((n + abs(m)) // 2 - s) * factorial((n - abs(m)) // 2 - s))) R += c * r**(n - 2*s) if m == 0: return R return R * (np.cos(m * th) if m > 0 else np.sin(-m * th)) # Center and normalize coordinates Xc = X - np.mean(X) Yc = Y - np.mean(Y) r = np.hypot(Xc / R_max, Yc / R_max) th = np.arctan2(Yc, Xc) # Build Zernike basis matrix for low-order modes n_modes = len(coeffs) Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, n_modes + 1)]) # Compute residual by removing low-order modes from original WFE # W_res_filt = W_nm - Z[:, :filter_low_orders] @ coeffs[:filter_low_orders] W_res_filt = W_nm - Z[:, :filter_low_orders].dot(coeffs[:filter_low_orders]) # RMS = sqrt(mean(W^2)) - this is the CORRECT formula global_rms = float(np.sqrt((W_nm ** 2).mean())) filtered_rms = float(np.sqrt((W_res_filt ** 2).mean())) return global_rms, filtered_rms def compute_rms_filter_j1to3( X: np.ndarray, Y: np.ndarray, W_nm: np.ndarray, coeffs: np.ndarray, R_max: float ) -> float: """ Compute RMS with only J1-J3 removed (keeps defocus J4). This is the "optician workload" metric for manufacturing. Returns filtered RMS in nm. """ from math import factorial def noll_indices(j: int): if j < 1: raise ValueError("Noll index j must be >= 1") count = 0 n = 0 while True: if n == 0: ms = [0] elif n % 2 == 0: ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)] else: ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))] for m in ms: count += 1 if count == j: return n, m n += 1 def zernike_noll(j, r, th): n, m = noll_indices(j) R = np.zeros_like(r) for s in range((n - abs(m)) // 2 + 1): c = ((-1)**s * factorial(n - s) / (factorial(s) * factorial((n + abs(m)) // 2 - s) * factorial((n - abs(m)) // 2 - s))) R += c * r**(n - 2*s) if m == 0: return R return R * (np.cos(m * th) if m > 0 else np.sin(-m * th)) # Center and normalize coordinates Xc = X - np.mean(X) Yc = Y - np.mean(Y) r = np.hypot(Xc / R_max, Yc / R_max) th = np.arctan2(Yc, Xc) # Build Zernike basis matrix for first 3 modes only Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, 4)]) # Remove only J1-J3 (piston, tip, tilt) - keeps defocus W_res_filt3 = W_nm - Z.dot(coeffs[:3]) return float(np.sqrt((W_res_filt3 ** 2).mean())) def extract_zernike_with_relative( op2_path: Path, bdf_path: Optional[Path] = None, n_modes: int = 50, subcases: List[str] = ["1", "2", "3", "4"], subcase_labels: Dict[str, str] = {"1": "90deg", "2": "20deg", "3": "40deg", "4": "60deg"}, reference_subcase: str = "2" ) -> Dict[str, Dict[str, Any]]: """ Extract Zernike coefficients using displacement-based subtraction for relative metrics. This is the CORRECT approach matching the original zernike_Post_Script_NX.py: 1. Subtract displacements node-by-node between subcases 2. Convert relative displacement to WFE (nm) 3. Fit Zernike to the relative WFE surface 4. Compute RMS from that fit Returns dict with structure: { '1': { 'coefficients': [50 values], # Absolute 'global_rms_nm': float, 'filtered_rms_nm': float, 'relative_coefficients': None, # Reference has no relative 'relative_global_rms_nm': None, 'relative_filtered_rms_nm': None, }, '2': { 'coefficients': [50 values], # Absolute 'relative_coefficients': [50 values], # Relative to reference 'relative_global_rms_nm': float, 'relative_filtered_rms_nm': float, }, ... } """ from pyNastran.op2.op2 import OP2 from pyNastran.bdf.bdf import BDF # Read geometry for node coordinates if bdf_path is None: # Try to find matching BDF/DAT file folder = op2_path.parent base = op2_path.stem for ext in [".dat", ".bdf"]: candidate = folder / (base + ext) if candidate.exists(): bdf_path = candidate break if bdf_path is None: # Search for any BDF/DAT in folder for f in folder.iterdir(): if f.suffix.lower() in [".dat", ".bdf"]: bdf_path = f break if bdf_path is None or not bdf_path.exists(): raise FileNotFoundError(f"No BDF/DAT file found for geometry. OP2: {op2_path}") print(f"[ZERNIKE] Reading geometry from: {bdf_path.name}") bdf = BDF() bdf.read_bdf(str(bdf_path)) node_geo = {int(nid): node.get_position() for nid, node in bdf.nodes.items()} # Read displacements from OP2 print(f"[ZERNIKE] Reading displacements from: {op2_path.name}") op2 = OP2() op2.read_op2(str(op2_path)) if not op2.displacements: raise RuntimeError("No displacement subcases found in OP2.") # Extract displacement data for each subcase disp_data = {} for key, darr in op2.displacements.items(): data = darr.data dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None) if dmat is None: continue ngt = darr.node_gridtype.astype(int) node_ids = ngt if ngt.ndim == 1 else ngt[:, 0] isub = getattr(darr, 'isubcase', None) isub_str = str(int(isub)) if isinstance(isub, int) else None if isub_str and isub_str in subcases: # Build dataframe-like structure with coordinates X = np.array([node_geo.get(int(nid), (np.nan, np.nan, np.nan))[0] for nid in node_ids]) Y = np.array([node_geo.get(int(nid), (np.nan, np.nan, np.nan))[1] for nid in node_ids]) disp_z = dmat[:, 2].astype(np.float64) disp_data[isub_str] = { 'node_ids': node_ids.astype(int), 'X': X, 'Y': Y, 'disp_z': disp_z, } print(f"[ZERNIKE] Extracted subcase {isub_str} ({subcase_labels.get(isub_str, '?')}): {len(node_ids)} nodes") if reference_subcase not in disp_data: raise ValueError(f"Reference subcase {reference_subcase} not found in OP2") # Reference data ref = disp_data[reference_subcase] ref_node_set = set(ref['node_ids']) results = {} for subcase in subcases: if subcase not in disp_data: print(f"[WARN] Subcase {subcase} not found in OP2") results[subcase] = None continue data = disp_data[subcase] X = data['X'] Y = data['Y'] disp_z = data['disp_z'] node_ids = data['node_ids'] # Mask out nodes with invalid geometry valid_mask = ~np.isnan(X) & ~np.isnan(Y) X = X[valid_mask] Y = Y[valid_mask] disp_z = disp_z[valid_mask] node_ids = node_ids[valid_mask] # Absolute WFE = 2 * Disp_Z * scale wfe_abs_nm = 2.0 * disp_z * NM_PER_MM # Fit Zernike to absolute surface coeffs_abs, R_max = compute_zernike_from_wfe(X, Y, wfe_abs_nm, n_modes) # Use CORRECT RMS calculation: sqrt(mean(W^2)) from actual surface global_rms_abs, filtered_rms_abs = compute_rms_from_surface( X, Y, wfe_abs_nm, coeffs_abs, R_max, filter_low_orders=4 ) result = { 'coefficients': coeffs_abs.tolist(), 'global_rms_nm': global_rms_abs, 'filtered_rms_nm': filtered_rms_abs, 'relative_coefficients': None, 'relative_global_rms_nm': None, 'relative_filtered_rms_nm': None, # Store surface data for optician workload calculation 'X': X, 'Y': Y, 'wfe_nm': wfe_abs_nm, 'R_max': R_max, } # Compute relative (vs reference) if not the reference subcase if subcase != reference_subcase: # Find common nodes common_nodes = [] ref_disp_map = dict(zip(ref['node_ids'], ref['disp_z'])) rel_disp_z = [] rel_X = [] rel_Y = [] for i, nid in enumerate(node_ids): if nid in ref_disp_map: # SUBTRACT DISPLACEMENT NODE-BY-NODE (the correct approach!) rel_dz = disp_z[i] - ref_disp_map[nid] rel_disp_z.append(rel_dz) rel_X.append(X[i]) rel_Y.append(Y[i]) rel_disp_z = np.array(rel_disp_z) rel_X = np.array(rel_X) rel_Y = np.array(rel_Y) # Convert relative displacement to WFE rel_wfe_nm = 2.0 * rel_disp_z * NM_PER_MM # Fit Zernike to relative surface coeffs_rel, R_max_rel = compute_zernike_from_wfe(rel_X, rel_Y, rel_wfe_nm, n_modes) # Use CORRECT RMS calculation: sqrt(mean(W^2)) from actual surface global_rms_rel, filtered_rms_rel = compute_rms_from_surface( rel_X, rel_Y, rel_wfe_nm, coeffs_rel, R_max_rel, filter_low_orders=4 ) result['relative_coefficients'] = coeffs_rel.tolist() result['relative_global_rms_nm'] = global_rms_rel result['relative_filtered_rms_nm'] = filtered_rms_rel # Store relative surface data for optician workload result['rel_X'] = rel_X result['rel_Y'] = rel_Y result['rel_wfe_nm'] = rel_wfe_nm result['rel_R_max'] = R_max_rel label = subcase_labels.get(subcase, subcase) ref_label = subcase_labels.get(reference_subcase, reference_subcase) print(f"[ZERNIKE] {label} vs {ref_label}: Global RMS = {global_rms_rel:.2f} nm, Filtered RMS = {filtered_rms_rel:.2f} nm") results[subcase] = result return results def compute_objectives_from_zernike( zernike_data: Dict[str, Dict[str, Any]], config: Dict[str, Any] ) -> Dict[str, float]: """ Compute optimization objectives from Zernike extraction results. Uses the RELATIVE metrics (displacement-based subtraction) for the main objectives. Returns dict with objective values. """ objectives = {} zernike_settings = config.get('zernike_settings', {}) reference_subcase = zernike_settings.get('reference_subcase', '2') polishing_subcase = zernike_settings.get('polishing_subcase', '1') # Map subcase IDs based on config: 1=90deg, 2=20deg(ref), 3=40deg, 4=60deg # Find subcases by their labels subcase_labels = zernike_settings.get('subcase_labels', {}) label_to_subcase = {v: k for k, v in subcase_labels.items()} subcase_40 = label_to_subcase.get('40deg', '3') subcase_60 = label_to_subcase.get('60deg', '4') subcase_90 = label_to_subcase.get('90deg', '1') # Objective 1: Relative filtered RMS at 40 deg vs 20 deg (reference) data_40 = zernike_data.get(subcase_40) if data_40 and data_40.get('relative_filtered_rms_nm') is not None: objectives['rel_filtered_rms_40_vs_20'] = float(data_40['relative_filtered_rms_nm']) else: objectives['rel_filtered_rms_40_vs_20'] = float('inf') # Objective 2: Relative filtered RMS at 60 deg vs 20 deg (reference) data_60 = zernike_data.get(subcase_60) if data_60 and data_60.get('relative_filtered_rms_nm') is not None: objectives['rel_filtered_rms_60_vs_20'] = float(data_60['relative_filtered_rms_nm']) else: objectives['rel_filtered_rms_60_vs_20'] = float('inf') # Objective 3: Optician workload at 90 deg (polishing subcase) # This is the relative (90 deg - 20 deg) with filter J1-J3 removed (keep defocus) # CORRECT: Use surface-based RMS calculation, not coefficient sum data_90 = zernike_data.get(subcase_90) if (data_90 and data_90.get('relative_coefficients') is not None and data_90.get('rel_X') is not None): rel_coeffs_90 = np.array(data_90['relative_coefficients']) rel_X = data_90['rel_X'] rel_Y = data_90['rel_Y'] rel_wfe = data_90['rel_wfe_nm'] rel_R_max = data_90['rel_R_max'] # Use CORRECT surface-based RMS with J1-J3 filter (keeps defocus) rms_filter_j1to3 = compute_rms_filter_j1to3( rel_X, rel_Y, rel_wfe, rel_coeffs_90, rel_R_max ) objectives['mfg_90_optician_workload'] = float(rms_filter_j1to3) else: objectives['mfg_90_optician_workload'] = float('inf') return objectives def compute_weighted_objective( objectives: Dict[str, float], config: Dict[str, Any] ) -> float: """ Compute weighted sum objective from individual metrics. """ obj_configs = {o['name']: o for o in config.get('objectives', [])} total = 0.0 total_weight = 0.0 for name, value in objectives.items(): if name in obj_configs: obj_config = obj_configs[name] weight = obj_config.get('weight', 1.0) target = obj_config.get('target', 1.0) # Normalize by target if target and target > 0: normalized = value / target else: normalized = value total += weight * normalized total_weight += weight if total_weight > 0: return total / total_weight return total # ============================================================================== # FEA Objective Function # ============================================================================== def fea_objective( trial: optuna.Trial, config: Dict[str, Any], nx_solver: NXSolver ) -> Tuple[float, Dict[str, Any]]: """ FEA-based objective function. 1. Suggest design variables 2. Update NX model 3. Run Nastran solve 4. Extract Zernike coefficients 5. Compute objectives Returns (weighted_objective, trial_data) """ # Get enabled design variables design_vars = {} for var in config['design_variables']: if var.get('enabled', False): value = trial.suggest_float( var['name'], var['min'], var['max'] ) design_vars[var['name']] = value else: # Use baseline for disabled variables design_vars[var['name']] = var.get('baseline', (var['min'] + var['max']) / 2) print(f"\n[Trial {trial.number}] Design variables:") for name, value in design_vars.items(): print(f" {name}: {value:.4f}") # Prepare expression updates (map to expression names) t_start = time.time() print(f"[Trial {trial.number}] Running NX simulation...") try: expressions = { var['expression_name']: design_vars[var['name']] for var in config['design_variables'] } # Get file paths nx_settings = config.get('nx_settings', {}) model_dir = Path(nx_settings.get('model_dir', str(MODEL_DIR))) sim_file = model_dir / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim') solution_name = nx_settings.get('solution_name', 'Solution 1') # Run simulation via NXSolver (handles expressions + solve in one call) result = nx_solver.run_simulation( sim_file=sim_file, working_dir=model_dir, expression_updates=expressions, solution_name=solution_name, cleanup=False # Keep OP2 for Zernike extraction ) if not result['success']: error_msg = result.get('error', 'Unknown error') print(f"[Trial {trial.number}] ERROR in solve: {error_msg}") raise optuna.TrialPruned(f"Solve failed: {error_msg}") op2_path = Path(result['op2_file']) except optuna.TrialPruned: raise except Exception as e: print(f"[Trial {trial.number}] ERROR in simulation: {e}") raise optuna.TrialPruned(f"Simulation failed: {e}") solve_time = time.time() - t_start print(f"[Trial {trial.number}] Solve completed in {solve_time:.1f}s") # Extract Zernike coefficients using displacement-based subtraction print(f"[Trial {trial.number}] Extracting Zernike coefficients (displacement-based)...") try: zernike_settings = config.get('zernike_settings', {}) subcases = zernike_settings.get('subcases', ['1', '2', '3', '4']) # CORRECT mapping: 1=90deg (polishing), 2=20deg (reference), 3=40deg, 4=60deg subcase_labels = zernike_settings.get('subcase_labels', {"1": "90deg", "2": "20deg", "3": "40deg", "4": "60deg"}) reference_subcase = zernike_settings.get('reference_subcase', '2') n_modes = zernike_settings.get('n_modes', 50) zernike_data = extract_zernike_with_relative( op2_path, bdf_path=None, # Will auto-detect n_modes=n_modes, subcases=subcases, subcase_labels=subcase_labels, reference_subcase=reference_subcase ) except Exception as e: print(f"[Trial {trial.number}] ERROR extracting Zernike: {e}") import traceback traceback.print_exc() raise optuna.TrialPruned(f"Zernike extraction failed: {e}") # Compute objectives objectives = compute_objectives_from_zernike(zernike_data, config) weighted_obj = compute_weighted_objective(objectives, config) print(f"[Trial {trial.number}] Objectives:") for name, value in objectives.items(): print(f" {name}: {value:.2f} nm") print(f"[Trial {trial.number}] Weighted objective: {weighted_obj:.4f}") # Store trial data trial_data = { 'design_vars': design_vars, 'objectives': objectives, 'weighted_objective': weighted_obj, 'solve_time': solve_time, 'zernike_coefficients': { subcase: data.get('coefficients') if data else None for subcase, data in zernike_data.items() }, 'zernike_relative_coefficients': { subcase: data.get('relative_coefficients') if data else None for subcase, data in zernike_data.items() }, 'source': 'FEA' } # Store in trial user attributes trial.set_user_attr('objectives', objectives) trial.set_user_attr('solve_time', solve_time) trial.set_user_attr('source', 'FEA') # Store Zernike data for neural training (absolute coefficients) for subcase, data in zernike_data.items(): if data and 'coefficients' in data: trial.set_user_attr(f'zernike_coeffs_{subcase}', data['coefficients']) # Also store relative coefficients if data and data.get('relative_coefficients'): trial.set_user_attr(f'zernike_rel_coeffs_{subcase}', data['relative_coefficients']) return weighted_obj, trial_data # ============================================================================== # Neural Surrogate # ============================================================================== class ZernikeSurrogate: """ Neural surrogate for Zernike coefficient prediction. Predicts 50 Zernike coefficients for each of 4 subcases (200 outputs) from design variables input. """ def __init__(self, model_path: Optional[Path] = None): self.model = None self.model_path = model_path self.design_var_names = [] self.design_mean = None self.design_std = None self.output_mean = None self.output_std = None self.subcases = ['20', '40', '60', '90'] self.n_modes = 50 if model_path and model_path.exists(): self.load(model_path) def load(self, model_path: Path): """Load trained model from checkpoint.""" import torch checkpoint = torch.load(model_path, map_location='cpu') self.design_var_names = checkpoint.get('design_var_names', []) self.design_mean = torch.tensor(checkpoint['normalization']['design_mean']) self.design_std = torch.tensor(checkpoint['normalization']['design_std']) self.output_mean = torch.tensor(checkpoint['normalization']['output_mean']) self.output_std = torch.tensor(checkpoint['normalization']['output_std']) self.subcases = checkpoint.get('subcases', ['20', '40', '60', '90']) self.n_modes = checkpoint.get('n_modes', 50) # Build model architecture config = checkpoint['config'] self.model = self._build_model(config) self.model.load_state_dict(checkpoint['model_state_dict']) self.model.eval() print(f"[Surrogate] Loaded model from {model_path}") print(f"[Surrogate] Design vars: {self.design_var_names}") print(f"[Surrogate] Outputs: {len(self.subcases)} subcases x {self.n_modes} coefficients") def _build_model(self, config: Dict[str, Any]): """Build MLP model for Zernike prediction.""" import torch import torch.nn as nn input_dim = len(self.design_var_names) output_dim = len(self.subcases) * self.n_modes hidden_dim = config.get('hidden_channels', 128) num_layers = config.get('num_layers', 4) layers = [] layers.append(nn.Linear(input_dim, hidden_dim)) layers.append(nn.ReLU()) for _ in range(num_layers - 1): layers.append(nn.Linear(hidden_dim, hidden_dim)) layers.append(nn.ReLU()) layers.append(nn.Linear(hidden_dim, output_dim)) return nn.Sequential(*layers) def predict(self, design_vars: Dict[str, float]) -> Dict[str, np.ndarray]: """ Predict Zernike coefficients for all subcases. Returns dict mapping subcase to coefficient array. """ import torch if self.model is None: raise RuntimeError("Model not loaded") # Build input tensor input_values = [design_vars.get(name, 0.0) for name in self.design_var_names] x = torch.tensor(input_values, dtype=torch.float32) # Normalize x_norm = (x - self.design_mean) / self.design_std # Predict with torch.no_grad(): y_norm = self.model(x_norm.unsqueeze(0)) # Denormalize y = y_norm * self.output_std + self.output_mean y = y.squeeze(0).numpy() # Split into subcases results = {} for i, subcase in enumerate(self.subcases): start = i * self.n_modes end = start + self.n_modes results[subcase] = y[start:end] return results def predict_objectives( self, design_vars: Dict[str, float], config: Dict[str, Any] ) -> Dict[str, float]: """Predict objectives from design variables.""" coeffs = self.predict(design_vars) # Convert to format expected by compute_objectives_from_zernike zernike_data = { subcase: {'coefficients': coeffs[subcase].tolist()} for subcase in self.subcases } return compute_objectives_from_zernike(zernike_data, config) def neural_objective( trial: optuna.Trial, config: Dict[str, Any], surrogate: ZernikeSurrogate ) -> float: """Neural surrogate objective function.""" # Suggest design variables design_vars = {} for var in config['design_variables']: if var.get('enabled', False): value = trial.suggest_float(var['name'], var['min'], var['max']) design_vars[var['name']] = value else: design_vars[var['name']] = var.get('baseline', (var['min'] + var['max']) / 2) # Predict with surrogate t_start = time.time() objectives = surrogate.predict_objectives(design_vars, config) inference_time = (time.time() - t_start) * 1000 weighted_obj = compute_weighted_objective(objectives, config) # Store trial attributes trial.set_user_attr('objectives', objectives) trial.set_user_attr('inference_time_ms', inference_time) trial.set_user_attr('source', 'Neural') trial.set_user_attr('neural_predicted', True) return weighted_obj # ============================================================================== # Training # ============================================================================== def train_zernike_surrogate(config: Dict[str, Any]) -> Path: """ Train neural surrogate on FEA trial data. Extracts Zernike coefficients from completed trials and trains an MLP to predict them from design variables. """ import torch import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset print("\n" + "="*60) print("Training Zernike Surrogate Model") print("="*60) # Load study db_path = RESULTS_DIR / "study.db" if not db_path.exists(): raise FileNotFoundError(f"No study database found at {db_path}") storage = optuna.storages.RDBStorage(f'sqlite:///{db_path}') study = optuna.load_study( study_name=config['study_name'], storage=storage ) # Get FEA trials with Zernike data fea_trials = [ t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE and t.user_attrs.get('source') == 'FEA' and t.user_attrs.get('zernike_coeffs_20') is not None ] print(f"Found {len(fea_trials)} FEA trials with Zernike data") if len(fea_trials) < 10: raise ValueError(f"Need at least 10 FEA trials for training, have {len(fea_trials)}") # Get design variable names (enabled ones) design_var_names = [ var['name'] for var in config['design_variables'] if var.get('enabled', False) ] subcases = config.get('zernike_settings', {}).get('subcases', ['20', '40', '60', '90']) n_modes = config.get('zernike_settings', {}).get('n_modes', 50) # Build training data X = [] Y = [] for trial in fea_trials: # Design variables x = [trial.params.get(name, 0.0) for name in design_var_names] X.append(x) # Zernike coefficients for all subcases y = [] for subcase in subcases: coeffs = trial.user_attrs.get(f'zernike_coeffs_{subcase}', [0.0] * n_modes) y.extend(coeffs[:n_modes]) Y.append(y) X = np.array(X, dtype=np.float32) Y = np.array(Y, dtype=np.float32) print(f"Training data shape: X={X.shape}, Y={Y.shape}") # Normalize X_mean = X.mean(axis=0) X_std = X.std(axis=0) + 1e-8 Y_mean = Y.mean(axis=0) Y_std = Y.std(axis=0) + 1e-8 X_norm = (X - X_mean) / X_std Y_norm = (Y - Y_mean) / Y_std # Train/val split n_train = int(0.8 * len(X)) indices = np.random.permutation(len(X)) train_idx = indices[:n_train] val_idx = indices[n_train:] X_train = torch.tensor(X_norm[train_idx]) Y_train = torch.tensor(Y_norm[train_idx]) X_val = torch.tensor(X_norm[val_idx]) Y_val = torch.tensor(Y_norm[val_idx]) # Build model train_config = config.get('surrogate_settings', {}).get('training_config', {}) hidden_dim = train_config.get('hidden_channels', 128) num_layers = train_config.get('num_layers', 4) input_dim = len(design_var_names) output_dim = len(subcases) * n_modes layers = [] layers.append(nn.Linear(input_dim, hidden_dim)) layers.append(nn.ReLU()) for _ in range(num_layers - 1): layers.append(nn.Linear(hidden_dim, hidden_dim)) layers.append(nn.ReLU()) layers.append(nn.Linear(hidden_dim, output_dim)) model = nn.Sequential(*layers) # Training optimizer = torch.optim.Adam( model.parameters(), lr=train_config.get('learning_rate', 0.001) ) criterion = nn.MSELoss() epochs = train_config.get('epochs', 200) batch_size = train_config.get('batch_size', 8) train_dataset = TensorDataset(X_train, Y_train) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) best_val_loss = float('inf') best_state = None print(f"\nTraining for {epochs} epochs...") for epoch in range(epochs): # Train model.train() train_loss = 0.0 for xb, yb in train_loader: optimizer.zero_grad() pred = model(xb) loss = criterion(pred, yb) loss.backward() optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) # Validate model.eval() with torch.no_grad(): val_pred = model(X_val) val_loss = criterion(val_pred, Y_val).item() if val_loss < best_val_loss: best_val_loss = val_loss best_state = model.state_dict().copy() if (epoch + 1) % 20 == 0: print(f" Epoch {epoch+1}/{epochs}: train_loss={train_loss:.6f}, val_loss={val_loss:.6f}") print(f"\nBest validation loss: {best_val_loss:.6f}") # Save model model.load_state_dict(best_state) model_dir = RESULTS_DIR / "zernike_surrogate" model_dir.mkdir(exist_ok=True) model_path = model_dir / "checkpoint_best.pt" checkpoint = { 'model_state_dict': best_state, 'config': train_config, 'design_var_names': design_var_names, 'subcases': subcases, 'n_modes': n_modes, 'normalization': { 'design_mean': X_mean.tolist(), 'design_std': X_std.tolist(), 'output_mean': Y_mean.tolist(), 'output_std': Y_std.tolist(), }, 'best_val_loss': best_val_loss, 'n_training_samples': len(fea_trials), } torch.save(checkpoint, model_path) print(f"\nModel saved to: {model_path}") return model_path # ============================================================================== # Main Optimization # ============================================================================== def run_optimization( n_trials: int = 100, enable_nn: bool = False, resume: bool = True ): """ Run the optimization study. Args: n_trials: Number of trials to run enable_nn: Whether to use neural surrogate resume: Whether to resume existing study """ config = load_config() study_name = config['study_name'] print("\n" + "="*70) print(f"M1 MIRROR ZERNIKE OPTIMIZATION") print("="*70) print(f"Study: {study_name}") print(f"Trials: {n_trials}") print(f"Neural: {'Enabled' if enable_nn else 'Disabled'}") print("="*70) # Print enabled design variables enabled_vars = [v for v in config['design_variables'] if v.get('enabled')] print(f"\nEnabled design variables ({len(enabled_vars)}):") for var in enabled_vars: print(f" {var['name']}: [{var['min']}, {var['max']}] {var.get('units', '')}") # Print objectives print(f"\nObjectives:") for obj in config['objectives']: print(f" {obj['name']}: weight={obj.get('weight', 1)}, target={obj.get('target', 'N/A')} {obj.get('units', '')}") # Setup storage db_path = RESULTS_DIR / "study.db" storage = optuna.storages.RDBStorage( f'sqlite:///{db_path}', heartbeat_interval=60, failed_trial_callback=optuna.storages.RetryFailedTrialCallback(max_retry=3) ) # Create or load study sampler = TPESampler( seed=config['optimization_settings'].get('seed', 42), n_startup_trials=config['optimization_settings'].get('n_startup_trials', 15), multivariate=config['optimization_settings'].get('tpe_multivariate', True) ) if resume: try: study = optuna.load_study( study_name=study_name, storage=storage, sampler=sampler ) print(f"\nResumed study with {len(study.trials)} existing trials") except KeyError: study = optuna.create_study( study_name=study_name, storage=storage, sampler=sampler, direction='minimize' ) print(f"\nCreated new study: {study_name}") else: study = optuna.create_study( study_name=study_name, storage=storage, sampler=sampler, direction='minimize' ) print(f"\nCreated new study: {study_name}") # Initialize NX solver (lazy - only when needed for FEA) nx_solver = None def get_nx_solver(): nonlocal nx_solver if nx_solver is None: import re nx_settings = config.get('nx_settings', {}) # Use model_dir from config if specified, otherwise default to MODEL_DIR model_dir = Path(nx_settings.get('model_dir', str(MODEL_DIR))) sim_file = model_dir / nx_settings.get('sim_file', 'ASSY_M1_assyfem1_sim1.sim') if not sim_file.exists(): raise FileNotFoundError( f"Simulation file not found: {sim_file}\n" f"Please check model_dir in config or copy files to: {MODEL_DIR}" ) # Get NX installation path and extract version nx_install_path = nx_settings.get('nx_install_path', 'C:\\Program Files\\Siemens\\NX2506') # Extract version from path (e.g., "NX2506" -> "2506") version_match = re.search(r'NX(\d+)', nx_install_path) nastran_version = version_match.group(1) if version_match else "2506" nx_solver = NXSolver( nx_install_dir=Path(nx_install_path), nastran_version=nastran_version, timeout=nx_settings.get('op2_timeout_s', 1800), study_name=config['study_name'] ) return nx_solver # Load neural surrogate if enabled surrogate = None if enable_nn: surrogate_path = RESULTS_DIR / "zernike_surrogate" / "checkpoint_best.pt" if surrogate_path.exists(): surrogate = ZernikeSurrogate(surrogate_path) print(f"\nLoaded neural surrogate from {surrogate_path}") else: print(f"\n[WARN] Neural surrogate not found at {surrogate_path}") print("[WARN] Running FEA trials. Train surrogate with --train-surrogate") enable_nn = False # Run trials print(f"\nStarting optimization ({n_trials} trials)...") for i in range(n_trials): trial = study.ask() try: if enable_nn and surrogate is not None: # Neural trial value = neural_objective(trial, config, surrogate) study.tell(trial, value) objectives = trial.user_attrs.get('objectives', {}) print(f"[Neural Trial {trial.number}] weighted_obj={value:.4f} " f"(40vs20={objectives.get('rel_filtered_rms_40_vs_20', 0):.1f}nm, " f"60vs20={objectives.get('rel_filtered_rms_60_vs_20', 0):.1f}nm)") else: # FEA trial solver = get_nx_solver() value, trial_data = fea_objective(trial, config, solver) study.tell(trial, value) except optuna.TrialPruned as e: study.tell(trial, state=optuna.trial.TrialState.PRUNED) print(f"[Trial {trial.number}] Pruned: {e}") except Exception as e: study.tell(trial, state=optuna.trial.TrialState.FAIL) print(f"[Trial {trial.number}] Failed: {e}") import traceback traceback.print_exc() # Summary print("\n" + "="*70) print("OPTIMIZATION COMPLETE") print("="*70) completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] print(f"Completed trials: {len(completed)}") if completed: best = study.best_trial print(f"\nBest trial: #{best.number}") print(f"Best weighted objective: {best.value:.4f}") print("\nBest parameters:") for name, value in best.params.items(): print(f" {name}: {value:.4f}") objectives = best.user_attrs.get('objectives', {}) if objectives: print("\nBest objectives:") for name, value in objectives.items(): print(f" {name}: {value:.2f} nm") def show_status(): """Show current optimization status.""" config = load_config() study_name = config['study_name'] db_path = RESULTS_DIR / "study.db" if not db_path.exists(): print("No study found. Run optimization first.") return storage = optuna.storages.RDBStorage(f'sqlite:///{db_path}') study = optuna.load_study(study_name=study_name, storage=storage) completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] fea_trials = [t for t in completed if t.user_attrs.get('source') == 'FEA'] nn_trials = [t for t in completed if t.user_attrs.get('source') == 'Neural'] print("\n" + "="*60) print(f"STUDY STATUS: {study_name}") print("="*60) print(f"Total trials: {len(study.trials)}") print(f"Completed: {len(completed)}") print(f" FEA trials: {len(fea_trials)}") print(f" Neural trials: {len(nn_trials)}") if completed: best = study.best_trial print(f"\nBest trial: #{best.number} ({best.user_attrs.get('source', 'Unknown')})") print(f"Best value: {best.value:.4f}") objectives = best.user_attrs.get('objectives', {}) if objectives: print("\nBest objectives:") for name, value in objectives.items(): print(f" {name}: {value:.2f} nm") # ============================================================================== # CLI # ============================================================================== def main(): parser = argparse.ArgumentParser( description="M1 Mirror Zernike Optimization with Neural Acceleration" ) parser.add_argument('--run', action='store_true', help='Run optimization') parser.add_argument('--trials', type=int, default=100, help='Number of trials to run') parser.add_argument('--enable-nn', action='store_true', help='Enable neural surrogate') parser.add_argument('--resume', action='store_true', default=True, help='Resume existing study') parser.add_argument('--train-surrogate', action='store_true', help='Train neural surrogate from FEA data') parser.add_argument('--status', action='store_true', help='Show optimization status') args = parser.parse_args() if args.train_surrogate: config = load_config() train_zernike_surrogate(config) elif args.run: run_optimization( n_trials=args.trials, enable_nn=args.enable_nn, resume=args.resume ) elif args.status: show_status() else: parser.print_help() if __name__ == '__main__': main()