feat(extractors): add Zernike trajectory analysis for mode-specific optimization

This commit is contained in:
2026-01-29 16:02:07 +00:00
parent d7986922d5
commit 99be370fad

View File

@@ -0,0 +1,599 @@
"""
Zernike Trajectory Extractor for Elevation-Dependent Deformation Analysis
==========================================================================
This extractor analyzes how Zernike coefficients evolve across multiple elevation
angles, enabling:
1. Mode-specific optimization objectives (coma, astigmatism, trefoil separately)
2. Integrated RMS across the full operating range (not just discrete angles)
3. Sensitivity analysis (which modes respond to axial vs lateral loads)
PHYSICS BASIS:
--------------
At elevation angle θ from horizontal, the gravity vector decomposes into:
- Axial component (along optical axis): ∝ sin(θ)
- Lateral component (perpendicular): ∝ cos(θ)
For a linear elastic structure, the deformation is linear in these load components.
Therefore, each Zernike coefficient follows:
c_j(θ) = a_j·(sinθ - sin θ_ref) + b_j·(cosθ - cos θ_ref)
where:
- θ_ref = reference angle (typically 20° for polishing/measurement)
- a_j = sensitivity of mode j to axial load change
- b_j = sensitivity of mode j to lateral load change
The sensitivity matrix S = [a_j, b_j] tells us which modes respond to which loads.
USAGE:
------
from optimization_engine.extractors import extract_zernike_trajectory
# Full trajectory analysis
result = extract_zernike_trajectory(
op2_file,
subcases=['20', '30', '40', '50', '60'],
reference_angle=20.0,
focal_length=22000.0
)
# Get mode-specific objectives
coma_rms = result['coma_rms_nm']
trefoil_rms = result['trefoil_rms_nm']
# Get sensitivity matrix
sensitivity = result['sensitivity_matrix']
# sensitivity['coma']['axial'] = how coma responds to axial load
Author: Mario (Clawdbot) for Atomizer Framework
Date: 2026-01-29
"""
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple, Union
import numpy as np
from numpy.linalg import lstsq
# Import base Zernike functionality
from .extract_zernike import (
compute_zernike_coefficients,
zernike_noll,
zernike_label,
read_node_geometry,
find_geometry_file,
extract_displacements_by_subcase,
UNIT_TO_NM,
DEFAULT_N_MODES,
DEFAULT_FILTER_ORDERS,
)
try:
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
except ImportError:
raise ImportError("pyNastran is required. Install with: pip install pyNastran")
# ============================================================================
# Mode Groupings (Noll indices)
# ============================================================================
MODE_GROUPS = {
'astigmatism': [5, 6], # Primary astigmatism
'coma': [7, 8], # Primary coma
'trefoil': [9, 10], # Trefoil
'spherical': [11], # Primary spherical
'secondary_astig': [12, 13], # Secondary astigmatism
'secondary_coma': [14, 15], # Secondary coma
'quadrafoil': [16, 17], # Quadrafoil
'secondary_trefoil': [18, 19], # Secondary trefoil
'secondary_spherical': [22], # Secondary spherical
}
# Human-readable names for reporting
MODE_NAMES = {
'astigmatism': 'Astigmatism (Z5,Z6)',
'coma': 'Coma (Z7,Z8)',
'trefoil': 'Trefoil (Z9,Z10)',
'spherical': 'Spherical (Z11)',
'secondary_astig': 'Secondary Astigmatism (Z12,Z13)',
'secondary_coma': 'Secondary Coma (Z14,Z15)',
'quadrafoil': 'Quadrafoil (Z16,Z17)',
'secondary_trefoil': 'Secondary Trefoil (Z18,Z19)',
'secondary_spherical': 'Secondary Spherical (Z22)',
}
# ============================================================================
# Trajectory Parameter Functions
# ============================================================================
def compute_trajectory_params(
angles_deg: np.ndarray,
reference_angle: float = 20.0
) -> np.ndarray:
"""
Compute trajectory parameters τ(θ) = [sin(θ) - sin(θ_ref), cos(θ) - cos(θ_ref)]
Args:
angles_deg: Array of elevation angles in degrees
reference_angle: Reference angle in degrees (default 20°)
Returns:
Array of shape (N, 2) with [alpha, beta] for each angle
"""
angles_rad = np.deg2rad(angles_deg)
ref_rad = np.deg2rad(reference_angle)
alpha = np.sin(angles_rad) - np.sin(ref_rad) # Axial load change
beta = np.cos(angles_rad) - np.cos(ref_rad) # Lateral load change
return np.column_stack([alpha, beta])
def fit_sensitivity_matrix(
coefficients: np.ndarray,
trajectory_params: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, float]:
"""
Fit the sensitivity matrix S such that c(θ) = S @ τ(θ)
Uses least-squares fitting for each Zernike mode.
Args:
coefficients: Array of shape (N_angles, N_modes) with Zernike coefficients
trajectory_params: Array of shape (N_angles, 2) with [alpha, beta]
Returns:
sensitivity: Array of shape (N_modes, 2) with [a_j, b_j] for each mode
fitted_coeffs: Predicted coefficients from the linear model
r_squared: Overall R² fit quality
"""
n_angles, n_modes = coefficients.shape
sensitivity = np.zeros((n_modes, 2))
fitted_coeffs = np.zeros_like(coefficients)
ss_res_total = 0.0
ss_tot_total = 0.0
for j in range(n_modes):
c_j = coefficients[:, j]
# Least squares: c_j = T @ [a_j, b_j]
# Use lstsq for robustness
result = lstsq(trajectory_params, c_j, rcond=None)
sensitivity[j, :] = result[0]
# Compute fitted values
fitted_coeffs[:, j] = trajectory_params @ sensitivity[j, :]
# R² calculation
ss_res = np.sum((c_j - fitted_coeffs[:, j])**2)
ss_tot = np.sum((c_j - np.mean(c_j))**2)
ss_res_total += ss_res
ss_tot_total += ss_tot
# Overall R²
r_squared = 1.0 - (ss_res_total / max(ss_tot_total, 1e-12))
return sensitivity, fitted_coeffs, r_squared
def compute_mode_group_rms(
coefficients: np.ndarray,
mode_indices: List[int],
n_modes_total: int = DEFAULT_N_MODES
) -> float:
"""
Compute integrated RMS for a group of Zernike modes across all angles.
RMS = sqrt(mean(sum of squared coefficients across modes and angles))
Args:
coefficients: Array of shape (N_angles, N_modes)
mode_indices: List of Noll indices (1-based) for this mode group
n_modes_total: Total number of modes fitted
Returns:
RMS in same units as coefficients (nm)
"""
# Convert Noll indices (1-based) to array indices (0-based)
# But we skip first 4 modes (piston, tip, tilt, defocus) so index = noll - 5
indices = []
for noll in mode_indices:
if noll >= 5 and noll < 5 + coefficients.shape[1]:
indices.append(noll - 5) # Offset because we start from mode 5
if not indices:
return 0.0
# Sum of squared coefficients for this mode group
sq_sum = np.sum(coefficients[:, indices]**2)
# Average across angles and modes, then sqrt
n_angles = coefficients.shape[0]
rms = np.sqrt(sq_sum / n_angles)
return float(rms)
# ============================================================================
# Main Extractor Class
# ============================================================================
class ZernikeTrajectoryExtractor:
"""
Extracts Zernike trajectory metrics from multi-subcase FEA results.
This extractor:
1. Computes Zernike coefficients for each elevation angle
2. Fits the linear trajectory model c_j(θ) = a_j·α(θ) + b_j·β(θ)
3. Computes mode-specific integrated RMS values
4. Provides sensitivity analysis
Attributes:
op2_file: Path to OP2 results file
bdf_file: Path to BDF/DAT geometry file
focal_length: Parabola focal length for OPD correction (optional)
reference_angle: Reference elevation angle (default 20°)
"""
def __init__(
self,
op2_file: Union[str, Path],
bdf_file: Optional[Union[str, Path]] = None,
focal_length: Optional[float] = None,
reference_angle: float = 20.0,
unit: str = 'mm',
n_modes: int = DEFAULT_N_MODES,
):
self.op2_path = Path(op2_file)
self.focal_length = focal_length
self.reference_angle = reference_angle
self.unit = unit
self.nm_scale = UNIT_TO_NM.get(unit, 1e6) # Default mm -> nm
self.n_modes = n_modes
# Find geometry file
if bdf_file:
self.bdf_path = Path(bdf_file)
else:
self.bdf_path = find_geometry_file(self.op2_path)
# Load data
self._load_data()
def _load_data(self):
"""Load OP2 and geometry files."""
print(f"[ZernikeTrajectory] Loading OP2: {self.op2_path}")
self.op2 = OP2()
self.op2.read_op2(str(self.op2_path))
print(f"[ZernikeTrajectory] Loading geometry: {self.bdf_path}")
self.node_coords = read_node_geometry(self.bdf_path)
# Get available subcases
self.available_subcases = list(self.op2.displacements.keys())
print(f"[ZernikeTrajectory] Available subcases: {self.available_subcases}")
def extract_subcase(self, subcase_id: int) -> Dict[str, Any]:
"""
Extract Zernike coefficients for a single subcase.
Args:
subcase_id: Subcase ID
Returns:
Dict with coefficients and metrics
"""
# Get displacements
displacements = extract_displacements_by_subcase(
self.op2, subcase_id, self.node_coords
)
if displacements is None or len(displacements) == 0:
raise ValueError(f"No displacements found for subcase {subcase_id}")
# Extract coordinates and displacements
node_ids = list(displacements.keys())
x = np.array([self.node_coords[nid][0] for nid in node_ids])
y = np.array([self.node_coords[nid][1] for nid in node_ids])
z = np.array([self.node_coords[nid][2] for nid in node_ids])
dx = np.array([displacements[nid][0] for nid in node_ids])
dy = np.array([displacements[nid][1] for nid in node_ids])
dz = np.array([displacements[nid][2] for nid in node_ids])
# Compute WFE (surface error * 2 for reflective surface)
# For now, use Z-displacement (can add OPD correction if focal_length provided)
surface_error = dz # In original units (mm)
if self.focal_length is not None:
# Apply OPD correction for lateral displacements
r0_sq = x**2 + y**2
r_def_sq = (x + dx)**2 + (y + dy)**2
delta_r_sq = r_def_sq - r0_sq
# Parabola correction (concave mirror)
dz_parabola = -delta_r_sq / (4.0 * self.focal_length)
surface_error = dz - dz_parabola
# Convert to WFE in nm
wfe = surface_error * 2.0 * self.nm_scale
# Fit Zernike coefficients (excluding first 4 modes by convention)
coeffs = compute_zernike_coefficients(
x + dx if self.focal_length else x, # Use deformed coords if OPD
y + dy if self.focal_length else y,
wfe,
n_modes=self.n_modes + 4, # Include modes 1-4 for fitting
filter_orders=0 # Don't filter, we'll handle it
)
# Return coefficients starting from mode 5 (index 4)
return {
'coefficients': coeffs[4:self.n_modes + 4], # Modes 5 to n_modes+4
'raw_coefficients': coeffs,
'n_nodes': len(node_ids),
'wfe_array': wfe,
}
def extract_trajectory(
self,
subcases: Optional[List[int]] = None,
angles: Optional[List[float]] = None,
) -> Dict[str, Any]:
"""
Extract full trajectory analysis across multiple subcases.
Args:
subcases: List of subcase IDs. If None, auto-detect from OP2.
angles: Corresponding elevation angles in degrees.
If None, assumes subcase IDs are angles (e.g., 20, 40, 60).
Returns:
Dict with trajectory analysis results
"""
# Determine subcases and angles
if subcases is None:
subcases = sorted(self.available_subcases)
if angles is None:
# Assume subcase IDs are angles
angles = [float(sc) for sc in subcases]
if len(subcases) != len(angles):
raise ValueError("subcases and angles must have same length")
print(f"[ZernikeTrajectory] Analyzing {len(subcases)} subcases: {subcases}")
print(f"[ZernikeTrajectory] Elevation angles: {angles}")
# Extract coefficients for each subcase
all_coeffs = []
for sc in subcases:
result = self.extract_subcase(sc)
all_coeffs.append(result['coefficients'])
# Stack into array: (N_angles, N_modes)
coefficients = np.vstack(all_coeffs)
n_angles, n_modes = coefficients.shape
print(f"[ZernikeTrajectory] Coefficient matrix shape: {coefficients.shape}")
# Compute relative coefficients (relative to reference angle)
ref_idx = None
for i, ang in enumerate(angles):
if abs(ang - self.reference_angle) < 0.1:
ref_idx = i
break
if ref_idx is not None:
# Make all coefficients relative to reference
ref_coeffs = coefficients[ref_idx, :]
coefficients_relative = coefficients - ref_coeffs
else:
# No exact reference found, use as-is
print(f"[ZernikeTrajectory] Warning: Reference angle {self.reference_angle}° not in data")
coefficients_relative = coefficients
# Compute trajectory parameters
angles_arr = np.array(angles)
trajectory_params = compute_trajectory_params(angles_arr, self.reference_angle)
# Fit sensitivity matrix
sensitivity, fitted_coeffs, r_squared = fit_sensitivity_matrix(
coefficients_relative, trajectory_params
)
print(f"[ZernikeTrajectory] Linear fit R² = {r_squared:.4f}")
# Compute mode-specific integrated RMS
mode_rms = {}
for group_name, noll_indices in MODE_GROUPS.items():
rms = compute_mode_group_rms(coefficients_relative, noll_indices, n_modes)
mode_rms[group_name] = rms
# Compute total filtered RMS (modes 5+)
total_filtered_rms = np.sqrt(np.mean(coefficients_relative**2))
# Build sensitivity matrix dict
sensitivity_dict = {}
for group_name, noll_indices in MODE_GROUPS.items():
indices = [n - 5 for n in noll_indices if 5 <= n < 5 + n_modes]
if indices:
axial = np.sqrt(np.sum(sensitivity[indices, 0]**2))
lateral = np.sqrt(np.sum(sensitivity[indices, 1]**2))
total = np.sqrt(axial**2 + lateral**2)
sensitivity_dict[group_name] = {
'axial': float(axial),
'lateral': float(lateral),
'total': float(total),
}
# Rank modes by sensitivity
mode_ranking = sorted(
sensitivity_dict.keys(),
key=lambda k: sensitivity_dict[k]['total'],
reverse=True
)
# Dominant mode
dominant_mode = mode_ranking[0] if mode_ranking else 'unknown'
# Build result
result = {
# Mode-specific integrated RMS (nm)
'astigmatism_rms_nm': mode_rms.get('astigmatism', 0.0),
'coma_rms_nm': mode_rms.get('coma', 0.0),
'trefoil_rms_nm': mode_rms.get('trefoil', 0.0),
'spherical_rms_nm': mode_rms.get('spherical', 0.0),
'secondary_astig_rms_nm': mode_rms.get('secondary_astig', 0.0),
'secondary_coma_rms_nm': mode_rms.get('secondary_coma', 0.0),
'quadrafoil_rms_nm': mode_rms.get('quadrafoil', 0.0),
# Total filtered RMS
'total_filtered_rms_nm': float(total_filtered_rms),
# Model quality
'linear_fit_r2': float(r_squared),
# Sensitivity analysis
'sensitivity_matrix': sensitivity_dict,
'mode_ranking': mode_ranking,
'dominant_mode': dominant_mode,
# Raw data for debugging/visualization
'angles_deg': angles,
'subcases': subcases,
'coefficients_relative': coefficients_relative.tolist(),
'trajectory_params': trajectory_params.tolist(),
'sensitivity_raw': sensitivity.tolist(),
# Metadata
'reference_angle': self.reference_angle,
'n_modes': n_modes,
'n_angles': n_angles,
'focal_length': self.focal_length,
}
return result
# ============================================================================
# Convenience Functions
# ============================================================================
def extract_zernike_trajectory(
op2_file: Union[str, Path],
subcases: Optional[List[int]] = None,
angles: Optional[List[float]] = None,
bdf_file: Optional[Union[str, Path]] = None,
focal_length: Optional[float] = None,
reference_angle: float = 20.0,
unit: str = 'mm',
) -> Dict[str, Any]:
"""
Extract Zernike trajectory metrics from OP2 file.
This is the main entry point for trajectory analysis.
Args:
op2_file: Path to OP2 results file
subcases: List of subcase IDs (default: auto-detect)
angles: Elevation angles in degrees (default: use subcase IDs as angles)
bdf_file: Path to BDF/DAT geometry (default: auto-find)
focal_length: Parabola focal length for OPD correction (mm)
reference_angle: Reference angle (default 20°)
unit: Displacement unit in OP2 ('mm', 'm', 'um', 'nm')
Returns:
Dict with trajectory analysis results including:
- coma_rms_nm, astigmatism_rms_nm, trefoil_rms_nm, etc.
- total_filtered_rms_nm
- linear_fit_r2
- sensitivity_matrix
- mode_ranking
- dominant_mode
"""
extractor = ZernikeTrajectoryExtractor(
op2_file=op2_file,
bdf_file=bdf_file,
focal_length=focal_length,
reference_angle=reference_angle,
unit=unit,
)
return extractor.extract_trajectory(subcases=subcases, angles=angles)
def extract_zernike_trajectory_objectives(
op2_file: Union[str, Path],
**kwargs
) -> Dict[str, float]:
"""
Extract just the objective values (for optimization).
Returns a flat dict of floats suitable for Atomizer objectives.
"""
result = extract_zernike_trajectory(op2_file, **kwargs)
return {
'coma_rms_nm': result['coma_rms_nm'],
'astigmatism_rms_nm': result['astigmatism_rms_nm'],
'trefoil_rms_nm': result['trefoil_rms_nm'],
'spherical_rms_nm': result['spherical_rms_nm'],
'total_filtered_rms_nm': result['total_filtered_rms_nm'],
'linear_fit_r2': result['linear_fit_r2'],
}
# ============================================================================
# CLI for Testing
# ============================================================================
if __name__ == '__main__':
import sys
import json
if len(sys.argv) < 2:
print("Usage: python extract_zernike_trajectory.py <op2_file> [subcases...]")
print("Example: python extract_zernike_trajectory.py results.op2 20 30 40 50 60")
sys.exit(1)
op2_file = sys.argv[1]
subcases = [int(s) for s in sys.argv[2:]] if len(sys.argv) > 2 else None
result = extract_zernike_trajectory(op2_file, subcases=subcases)
print("\n" + "="*60)
print("ZERNIKE TRAJECTORY ANALYSIS RESULTS")
print("="*60)
print(f"\nLinear Model Fit: R² = {result['linear_fit_r2']:.4f}")
if result['linear_fit_r2'] > 0.95:
print(" ✓ Excellent fit - physics model validated")
elif result['linear_fit_r2'] > 0.85:
print(" ~ Good fit - some nonlinearity present")
else:
print(" ⚠ Poor fit - significant nonlinearity")
print(f"\nDominant Mode: {result['dominant_mode'].upper()}")
print(f"Mode Ranking: {', '.join(result['mode_ranking'][:5])}")
print("\n--- Mode-Specific Integrated RMS (nm) ---")
for mode in result['mode_ranking']:
rms = result.get(f'{mode}_rms_nm', 0.0)
sens = result['sensitivity_matrix'].get(mode, {})
axial = sens.get('axial', 0)
lateral = sens.get('lateral', 0)
print(f" {mode:20s}: {rms:8.2f} nm (axial: {axial:.2f}, lateral: {lateral:.2f})")
print(f"\n--- Total Filtered RMS: {result['total_filtered_rms_nm']:.2f} nm ---")
print("\n--- Raw Result (JSON) ---")
# Print subset for readability
summary = {k: v for k, v in result.items() if not k.startswith('coefficients') and k != 'trajectory_params' and k != 'sensitivity_raw'}
print(json.dumps(summary, indent=2))