From 99be370faddb37e83db8f3601a97fdd125881493 Mon Sep 17 00:00:00 2001 From: Antoine Date: Thu, 29 Jan 2026 16:02:07 +0000 Subject: [PATCH] feat(extractors): add Zernike trajectory analysis for mode-specific optimization --- .../extractors/extract_zernike_trajectory.py | 599 ++++++++++++++++++ 1 file changed, 599 insertions(+) create mode 100644 optimization_engine/extractors/extract_zernike_trajectory.py diff --git a/optimization_engine/extractors/extract_zernike_trajectory.py b/optimization_engine/extractors/extract_zernike_trajectory.py new file mode 100644 index 00000000..87d7e549 --- /dev/null +++ b/optimization_engine/extractors/extract_zernike_trajectory.py @@ -0,0 +1,599 @@ +""" +Zernike Trajectory Extractor for Elevation-Dependent Deformation Analysis +========================================================================== + +This extractor analyzes how Zernike coefficients evolve across multiple elevation +angles, enabling: +1. Mode-specific optimization objectives (coma, astigmatism, trefoil separately) +2. Integrated RMS across the full operating range (not just discrete angles) +3. Sensitivity analysis (which modes respond to axial vs lateral loads) + +PHYSICS BASIS: +-------------- +At elevation angle θ from horizontal, the gravity vector decomposes into: +- Axial component (along optical axis): ∝ sin(θ) +- Lateral component (perpendicular): ∝ cos(θ) + +For a linear elastic structure, the deformation is linear in these load components. +Therefore, each Zernike coefficient follows: + + c_j(θ) = a_j·(sinθ - sin θ_ref) + b_j·(cosθ - cos θ_ref) + +where: +- θ_ref = reference angle (typically 20° for polishing/measurement) +- a_j = sensitivity of mode j to axial load change +- b_j = sensitivity of mode j to lateral load change + +The sensitivity matrix S = [a_j, b_j] tells us which modes respond to which loads. + +USAGE: +------ + from optimization_engine.extractors import extract_zernike_trajectory + + # Full trajectory analysis + result = extract_zernike_trajectory( + op2_file, + subcases=['20', '30', '40', '50', '60'], + reference_angle=20.0, + focal_length=22000.0 + ) + + # Get mode-specific objectives + coma_rms = result['coma_rms_nm'] + trefoil_rms = result['trefoil_rms_nm'] + + # Get sensitivity matrix + sensitivity = result['sensitivity_matrix'] + # sensitivity['coma']['axial'] = how coma responds to axial load + +Author: Mario (Clawdbot) for Atomizer Framework +Date: 2026-01-29 +""" + +from pathlib import Path +from typing import Dict, Any, Optional, List, Tuple, Union +import numpy as np +from numpy.linalg import lstsq + +# Import base Zernike functionality +from .extract_zernike import ( + compute_zernike_coefficients, + zernike_noll, + zernike_label, + read_node_geometry, + find_geometry_file, + extract_displacements_by_subcase, + UNIT_TO_NM, + DEFAULT_N_MODES, + DEFAULT_FILTER_ORDERS, +) + +try: + from pyNastran.op2.op2 import OP2 + from pyNastran.bdf.bdf import BDF +except ImportError: + raise ImportError("pyNastran is required. Install with: pip install pyNastran") + + +# ============================================================================ +# Mode Groupings (Noll indices) +# ============================================================================ + +MODE_GROUPS = { + 'astigmatism': [5, 6], # Primary astigmatism + 'coma': [7, 8], # Primary coma + 'trefoil': [9, 10], # Trefoil + 'spherical': [11], # Primary spherical + 'secondary_astig': [12, 13], # Secondary astigmatism + 'secondary_coma': [14, 15], # Secondary coma + 'quadrafoil': [16, 17], # Quadrafoil + 'secondary_trefoil': [18, 19], # Secondary trefoil + 'secondary_spherical': [22], # Secondary spherical +} + +# Human-readable names for reporting +MODE_NAMES = { + 'astigmatism': 'Astigmatism (Z5,Z6)', + 'coma': 'Coma (Z7,Z8)', + 'trefoil': 'Trefoil (Z9,Z10)', + 'spherical': 'Spherical (Z11)', + 'secondary_astig': 'Secondary Astigmatism (Z12,Z13)', + 'secondary_coma': 'Secondary Coma (Z14,Z15)', + 'quadrafoil': 'Quadrafoil (Z16,Z17)', + 'secondary_trefoil': 'Secondary Trefoil (Z18,Z19)', + 'secondary_spherical': 'Secondary Spherical (Z22)', +} + + +# ============================================================================ +# Trajectory Parameter Functions +# ============================================================================ + +def compute_trajectory_params( + angles_deg: np.ndarray, + reference_angle: float = 20.0 +) -> np.ndarray: + """ + Compute trajectory parameters τ(θ) = [sin(θ) - sin(θ_ref), cos(θ) - cos(θ_ref)] + + Args: + angles_deg: Array of elevation angles in degrees + reference_angle: Reference angle in degrees (default 20°) + + Returns: + Array of shape (N, 2) with [alpha, beta] for each angle + """ + angles_rad = np.deg2rad(angles_deg) + ref_rad = np.deg2rad(reference_angle) + + alpha = np.sin(angles_rad) - np.sin(ref_rad) # Axial load change + beta = np.cos(angles_rad) - np.cos(ref_rad) # Lateral load change + + return np.column_stack([alpha, beta]) + + +def fit_sensitivity_matrix( + coefficients: np.ndarray, + trajectory_params: np.ndarray +) -> Tuple[np.ndarray, np.ndarray, float]: + """ + Fit the sensitivity matrix S such that c(θ) = S @ τ(θ) + + Uses least-squares fitting for each Zernike mode. + + Args: + coefficients: Array of shape (N_angles, N_modes) with Zernike coefficients + trajectory_params: Array of shape (N_angles, 2) with [alpha, beta] + + Returns: + sensitivity: Array of shape (N_modes, 2) with [a_j, b_j] for each mode + fitted_coeffs: Predicted coefficients from the linear model + r_squared: Overall R² fit quality + """ + n_angles, n_modes = coefficients.shape + + sensitivity = np.zeros((n_modes, 2)) + fitted_coeffs = np.zeros_like(coefficients) + + ss_res_total = 0.0 + ss_tot_total = 0.0 + + for j in range(n_modes): + c_j = coefficients[:, j] + + # Least squares: c_j = T @ [a_j, b_j] + # Use lstsq for robustness + result = lstsq(trajectory_params, c_j, rcond=None) + sensitivity[j, :] = result[0] + + # Compute fitted values + fitted_coeffs[:, j] = trajectory_params @ sensitivity[j, :] + + # R² calculation + ss_res = np.sum((c_j - fitted_coeffs[:, j])**2) + ss_tot = np.sum((c_j - np.mean(c_j))**2) + + ss_res_total += ss_res + ss_tot_total += ss_tot + + # Overall R² + r_squared = 1.0 - (ss_res_total / max(ss_tot_total, 1e-12)) + + return sensitivity, fitted_coeffs, r_squared + + +def compute_mode_group_rms( + coefficients: np.ndarray, + mode_indices: List[int], + n_modes_total: int = DEFAULT_N_MODES +) -> float: + """ + Compute integrated RMS for a group of Zernike modes across all angles. + + RMS = sqrt(mean(sum of squared coefficients across modes and angles)) + + Args: + coefficients: Array of shape (N_angles, N_modes) + mode_indices: List of Noll indices (1-based) for this mode group + n_modes_total: Total number of modes fitted + + Returns: + RMS in same units as coefficients (nm) + """ + # Convert Noll indices (1-based) to array indices (0-based) + # But we skip first 4 modes (piston, tip, tilt, defocus) so index = noll - 5 + indices = [] + for noll in mode_indices: + if noll >= 5 and noll < 5 + coefficients.shape[1]: + indices.append(noll - 5) # Offset because we start from mode 5 + + if not indices: + return 0.0 + + # Sum of squared coefficients for this mode group + sq_sum = np.sum(coefficients[:, indices]**2) + + # Average across angles and modes, then sqrt + n_angles = coefficients.shape[0] + rms = np.sqrt(sq_sum / n_angles) + + return float(rms) + + +# ============================================================================ +# Main Extractor Class +# ============================================================================ + +class ZernikeTrajectoryExtractor: + """ + Extracts Zernike trajectory metrics from multi-subcase FEA results. + + This extractor: + 1. Computes Zernike coefficients for each elevation angle + 2. Fits the linear trajectory model c_j(θ) = a_j·α(θ) + b_j·β(θ) + 3. Computes mode-specific integrated RMS values + 4. Provides sensitivity analysis + + Attributes: + op2_file: Path to OP2 results file + bdf_file: Path to BDF/DAT geometry file + focal_length: Parabola focal length for OPD correction (optional) + reference_angle: Reference elevation angle (default 20°) + """ + + def __init__( + self, + op2_file: Union[str, Path], + bdf_file: Optional[Union[str, Path]] = None, + focal_length: Optional[float] = None, + reference_angle: float = 20.0, + unit: str = 'mm', + n_modes: int = DEFAULT_N_MODES, + ): + self.op2_path = Path(op2_file) + self.focal_length = focal_length + self.reference_angle = reference_angle + self.unit = unit + self.nm_scale = UNIT_TO_NM.get(unit, 1e6) # Default mm -> nm + self.n_modes = n_modes + + # Find geometry file + if bdf_file: + self.bdf_path = Path(bdf_file) + else: + self.bdf_path = find_geometry_file(self.op2_path) + + # Load data + self._load_data() + + def _load_data(self): + """Load OP2 and geometry files.""" + print(f"[ZernikeTrajectory] Loading OP2: {self.op2_path}") + self.op2 = OP2() + self.op2.read_op2(str(self.op2_path)) + + print(f"[ZernikeTrajectory] Loading geometry: {self.bdf_path}") + self.node_coords = read_node_geometry(self.bdf_path) + + # Get available subcases + self.available_subcases = list(self.op2.displacements.keys()) + print(f"[ZernikeTrajectory] Available subcases: {self.available_subcases}") + + def extract_subcase(self, subcase_id: int) -> Dict[str, Any]: + """ + Extract Zernike coefficients for a single subcase. + + Args: + subcase_id: Subcase ID + + Returns: + Dict with coefficients and metrics + """ + # Get displacements + displacements = extract_displacements_by_subcase( + self.op2, subcase_id, self.node_coords + ) + + if displacements is None or len(displacements) == 0: + raise ValueError(f"No displacements found for subcase {subcase_id}") + + # Extract coordinates and displacements + node_ids = list(displacements.keys()) + x = np.array([self.node_coords[nid][0] for nid in node_ids]) + y = np.array([self.node_coords[nid][1] for nid in node_ids]) + z = np.array([self.node_coords[nid][2] for nid in node_ids]) + + dx = np.array([displacements[nid][0] for nid in node_ids]) + dy = np.array([displacements[nid][1] for nid in node_ids]) + dz = np.array([displacements[nid][2] for nid in node_ids]) + + # Compute WFE (surface error * 2 for reflective surface) + # For now, use Z-displacement (can add OPD correction if focal_length provided) + surface_error = dz # In original units (mm) + + if self.focal_length is not None: + # Apply OPD correction for lateral displacements + r0_sq = x**2 + y**2 + r_def_sq = (x + dx)**2 + (y + dy)**2 + delta_r_sq = r_def_sq - r0_sq + + # Parabola correction (concave mirror) + dz_parabola = -delta_r_sq / (4.0 * self.focal_length) + surface_error = dz - dz_parabola + + # Convert to WFE in nm + wfe = surface_error * 2.0 * self.nm_scale + + # Fit Zernike coefficients (excluding first 4 modes by convention) + coeffs = compute_zernike_coefficients( + x + dx if self.focal_length else x, # Use deformed coords if OPD + y + dy if self.focal_length else y, + wfe, + n_modes=self.n_modes + 4, # Include modes 1-4 for fitting + filter_orders=0 # Don't filter, we'll handle it + ) + + # Return coefficients starting from mode 5 (index 4) + return { + 'coefficients': coeffs[4:self.n_modes + 4], # Modes 5 to n_modes+4 + 'raw_coefficients': coeffs, + 'n_nodes': len(node_ids), + 'wfe_array': wfe, + } + + def extract_trajectory( + self, + subcases: Optional[List[int]] = None, + angles: Optional[List[float]] = None, + ) -> Dict[str, Any]: + """ + Extract full trajectory analysis across multiple subcases. + + Args: + subcases: List of subcase IDs. If None, auto-detect from OP2. + angles: Corresponding elevation angles in degrees. + If None, assumes subcase IDs are angles (e.g., 20, 40, 60). + + Returns: + Dict with trajectory analysis results + """ + # Determine subcases and angles + if subcases is None: + subcases = sorted(self.available_subcases) + + if angles is None: + # Assume subcase IDs are angles + angles = [float(sc) for sc in subcases] + + if len(subcases) != len(angles): + raise ValueError("subcases and angles must have same length") + + print(f"[ZernikeTrajectory] Analyzing {len(subcases)} subcases: {subcases}") + print(f"[ZernikeTrajectory] Elevation angles: {angles}") + + # Extract coefficients for each subcase + all_coeffs = [] + for sc in subcases: + result = self.extract_subcase(sc) + all_coeffs.append(result['coefficients']) + + # Stack into array: (N_angles, N_modes) + coefficients = np.vstack(all_coeffs) + n_angles, n_modes = coefficients.shape + + print(f"[ZernikeTrajectory] Coefficient matrix shape: {coefficients.shape}") + + # Compute relative coefficients (relative to reference angle) + ref_idx = None + for i, ang in enumerate(angles): + if abs(ang - self.reference_angle) < 0.1: + ref_idx = i + break + + if ref_idx is not None: + # Make all coefficients relative to reference + ref_coeffs = coefficients[ref_idx, :] + coefficients_relative = coefficients - ref_coeffs + else: + # No exact reference found, use as-is + print(f"[ZernikeTrajectory] Warning: Reference angle {self.reference_angle}° not in data") + coefficients_relative = coefficients + + # Compute trajectory parameters + angles_arr = np.array(angles) + trajectory_params = compute_trajectory_params(angles_arr, self.reference_angle) + + # Fit sensitivity matrix + sensitivity, fitted_coeffs, r_squared = fit_sensitivity_matrix( + coefficients_relative, trajectory_params + ) + + print(f"[ZernikeTrajectory] Linear fit R² = {r_squared:.4f}") + + # Compute mode-specific integrated RMS + mode_rms = {} + for group_name, noll_indices in MODE_GROUPS.items(): + rms = compute_mode_group_rms(coefficients_relative, noll_indices, n_modes) + mode_rms[group_name] = rms + + # Compute total filtered RMS (modes 5+) + total_filtered_rms = np.sqrt(np.mean(coefficients_relative**2)) + + # Build sensitivity matrix dict + sensitivity_dict = {} + for group_name, noll_indices in MODE_GROUPS.items(): + indices = [n - 5 for n in noll_indices if 5 <= n < 5 + n_modes] + if indices: + axial = np.sqrt(np.sum(sensitivity[indices, 0]**2)) + lateral = np.sqrt(np.sum(sensitivity[indices, 1]**2)) + total = np.sqrt(axial**2 + lateral**2) + sensitivity_dict[group_name] = { + 'axial': float(axial), + 'lateral': float(lateral), + 'total': float(total), + } + + # Rank modes by sensitivity + mode_ranking = sorted( + sensitivity_dict.keys(), + key=lambda k: sensitivity_dict[k]['total'], + reverse=True + ) + + # Dominant mode + dominant_mode = mode_ranking[0] if mode_ranking else 'unknown' + + # Build result + result = { + # Mode-specific integrated RMS (nm) + 'astigmatism_rms_nm': mode_rms.get('astigmatism', 0.0), + 'coma_rms_nm': mode_rms.get('coma', 0.0), + 'trefoil_rms_nm': mode_rms.get('trefoil', 0.0), + 'spherical_rms_nm': mode_rms.get('spherical', 0.0), + 'secondary_astig_rms_nm': mode_rms.get('secondary_astig', 0.0), + 'secondary_coma_rms_nm': mode_rms.get('secondary_coma', 0.0), + 'quadrafoil_rms_nm': mode_rms.get('quadrafoil', 0.0), + + # Total filtered RMS + 'total_filtered_rms_nm': float(total_filtered_rms), + + # Model quality + 'linear_fit_r2': float(r_squared), + + # Sensitivity analysis + 'sensitivity_matrix': sensitivity_dict, + 'mode_ranking': mode_ranking, + 'dominant_mode': dominant_mode, + + # Raw data for debugging/visualization + 'angles_deg': angles, + 'subcases': subcases, + 'coefficients_relative': coefficients_relative.tolist(), + 'trajectory_params': trajectory_params.tolist(), + 'sensitivity_raw': sensitivity.tolist(), + + # Metadata + 'reference_angle': self.reference_angle, + 'n_modes': n_modes, + 'n_angles': n_angles, + 'focal_length': self.focal_length, + } + + return result + + +# ============================================================================ +# Convenience Functions +# ============================================================================ + +def extract_zernike_trajectory( + op2_file: Union[str, Path], + subcases: Optional[List[int]] = None, + angles: Optional[List[float]] = None, + bdf_file: Optional[Union[str, Path]] = None, + focal_length: Optional[float] = None, + reference_angle: float = 20.0, + unit: str = 'mm', +) -> Dict[str, Any]: + """ + Extract Zernike trajectory metrics from OP2 file. + + This is the main entry point for trajectory analysis. + + Args: + op2_file: Path to OP2 results file + subcases: List of subcase IDs (default: auto-detect) + angles: Elevation angles in degrees (default: use subcase IDs as angles) + bdf_file: Path to BDF/DAT geometry (default: auto-find) + focal_length: Parabola focal length for OPD correction (mm) + reference_angle: Reference angle (default 20°) + unit: Displacement unit in OP2 ('mm', 'm', 'um', 'nm') + + Returns: + Dict with trajectory analysis results including: + - coma_rms_nm, astigmatism_rms_nm, trefoil_rms_nm, etc. + - total_filtered_rms_nm + - linear_fit_r2 + - sensitivity_matrix + - mode_ranking + - dominant_mode + """ + extractor = ZernikeTrajectoryExtractor( + op2_file=op2_file, + bdf_file=bdf_file, + focal_length=focal_length, + reference_angle=reference_angle, + unit=unit, + ) + + return extractor.extract_trajectory(subcases=subcases, angles=angles) + + +def extract_zernike_trajectory_objectives( + op2_file: Union[str, Path], + **kwargs +) -> Dict[str, float]: + """ + Extract just the objective values (for optimization). + + Returns a flat dict of floats suitable for Atomizer objectives. + """ + result = extract_zernike_trajectory(op2_file, **kwargs) + + return { + 'coma_rms_nm': result['coma_rms_nm'], + 'astigmatism_rms_nm': result['astigmatism_rms_nm'], + 'trefoil_rms_nm': result['trefoil_rms_nm'], + 'spherical_rms_nm': result['spherical_rms_nm'], + 'total_filtered_rms_nm': result['total_filtered_rms_nm'], + 'linear_fit_r2': result['linear_fit_r2'], + } + + +# ============================================================================ +# CLI for Testing +# ============================================================================ + +if __name__ == '__main__': + import sys + import json + + if len(sys.argv) < 2: + print("Usage: python extract_zernike_trajectory.py [subcases...]") + print("Example: python extract_zernike_trajectory.py results.op2 20 30 40 50 60") + sys.exit(1) + + op2_file = sys.argv[1] + subcases = [int(s) for s in sys.argv[2:]] if len(sys.argv) > 2 else None + + result = extract_zernike_trajectory(op2_file, subcases=subcases) + + print("\n" + "="*60) + print("ZERNIKE TRAJECTORY ANALYSIS RESULTS") + print("="*60) + + print(f"\nLinear Model Fit: R² = {result['linear_fit_r2']:.4f}") + if result['linear_fit_r2'] > 0.95: + print(" ✓ Excellent fit - physics model validated") + elif result['linear_fit_r2'] > 0.85: + print(" ~ Good fit - some nonlinearity present") + else: + print(" ⚠ Poor fit - significant nonlinearity") + + print(f"\nDominant Mode: {result['dominant_mode'].upper()}") + print(f"Mode Ranking: {', '.join(result['mode_ranking'][:5])}") + + print("\n--- Mode-Specific Integrated RMS (nm) ---") + for mode in result['mode_ranking']: + rms = result.get(f'{mode}_rms_nm', 0.0) + sens = result['sensitivity_matrix'].get(mode, {}) + axial = sens.get('axial', 0) + lateral = sens.get('lateral', 0) + print(f" {mode:20s}: {rms:8.2f} nm (axial: {axial:.2f}, lateral: {lateral:.2f})") + + print(f"\n--- Total Filtered RMS: {result['total_filtered_rms_nm']:.2f} nm ---") + + print("\n--- Raw Result (JSON) ---") + # Print subset for readability + summary = {k: v for k, v in result.items() if not k.startswith('coefficients') and k != 'trajectory_params' and k != 'sensitivity_raw'} + print(json.dumps(summary, indent=2))