Dashboard: - Add Studio page with drag-drop model upload and Claude chat - Add intake system for study creation workflow - Improve session manager and context builder - Add intake API routes and frontend components Optimization Engine: - Add CLI module for command-line operations - Add intake module for study preprocessing - Add validation module with gate checks - Improve Zernike extractor documentation - Update spec models with better validation - Enhance solve_simulation robustness Documentation: - Add ATOMIZER_STUDIO.md planning doc - Add ATOMIZER_UX_SYSTEM.md for UX patterns - Update extractor library docs - Add study-readme-generator skill Tools: - Add test scripts for extraction validation - Add Zernike recentering test Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
933 lines
29 KiB
Python
933 lines
29 KiB
Python
"""
|
|
Zernike Coefficient Extractor for Telescope Mirror Optimization
|
|
================================================================
|
|
|
|
Extracts Zernike polynomial coefficients from OP2 displacement results
|
|
for optical surface quality analysis. Designed for telescope mirror
|
|
optimization where wavefront error (WFE) metrics are critical.
|
|
|
|
Key Features:
|
|
- Noll-indexed Zernike polynomials (standard optical convention)
|
|
- Multi-subcase support (different gravity orientations: 20, 40, 60, 90 deg)
|
|
- Global and filtered RMS wavefront error
|
|
- Individual aberration magnitudes (astigmatism, coma, trefoil, spherical)
|
|
- Relative metrics between subcases (e.g., operational vs polishing orientation)
|
|
|
|
Usage:
|
|
from optimization_engine.extractors.extract_zernike import (
|
|
extract_zernike_from_op2,
|
|
ZernikeExtractor
|
|
)
|
|
|
|
# Simple usage - get filtered RMS for optimization objective
|
|
result = extract_zernike_from_op2(op2_file, subcase=20)
|
|
rms_filtered = result['filtered_rms_nm']
|
|
|
|
# Full extractor for detailed analysis
|
|
extractor = ZernikeExtractor(op2_file, bdf_file)
|
|
metrics = extractor.extract_all_subcases()
|
|
|
|
Author: Atomizer Framework (adapted from telescope mirror analysis scripts)
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, List, Tuple, Union
|
|
import numpy as np
|
|
from math import factorial
|
|
from numpy.linalg import LinAlgError
|
|
|
|
try:
|
|
from pyNastran.op2.op2 import OP2
|
|
from pyNastran.bdf.bdf import BDF
|
|
except ImportError:
|
|
raise ImportError("pyNastran is required. Install with: pip install pyNastran")
|
|
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
DEFAULT_N_MODES = 50 # Number of Zernike modes to fit
|
|
DEFAULT_FILTER_ORDERS = 4 # Filter first N modes (piston, tip, tilt, defocus)
|
|
DEFAULT_CHUNK_SIZE = 100000 # For memory-efficient processing of large meshes
|
|
|
|
# Standard telescope orientations (gravity angles in degrees)
|
|
STANDARD_SUBCASES = [20, 40, 60, 90]
|
|
|
|
# Displacement unit conversions (to nanometers for WFE)
|
|
UNIT_TO_NM = {
|
|
'mm': 1e6, # 1 mm = 1e6 nm
|
|
'm': 1e9, # 1 m = 1e9 nm
|
|
'um': 1e3, # 1 um = 1e3 nm
|
|
'nm': 1.0, # already nm
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Zernike Polynomial Mathematics
|
|
# ============================================================================
|
|
|
|
def noll_indices(j: int) -> Tuple[int, int]:
|
|
"""
|
|
Convert Noll index j to radial order n and azimuthal frequency m.
|
|
|
|
The Noll indexing scheme is the standard convention in optics.
|
|
j=1: Piston, j=2,3: Tip/Tilt, j=4: Defocus, j=5,6: Astigmatism, etc.
|
|
|
|
Args:
|
|
j: Noll index (1-based)
|
|
|
|
Returns:
|
|
(n, m): Radial order and azimuthal frequency
|
|
"""
|
|
if j < 1:
|
|
raise ValueError("Noll index j must be >= 1")
|
|
|
|
count = 0
|
|
n = 0
|
|
while True:
|
|
if n == 0:
|
|
ms = [0]
|
|
elif n % 2 == 0:
|
|
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
|
|
else:
|
|
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
|
|
for m in ms:
|
|
count += 1
|
|
if count == j:
|
|
return n, m
|
|
n += 1
|
|
|
|
|
|
def zernike_radial(n: int, m: int, r: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Compute the radial component R_n^m(r) of the Zernike polynomial.
|
|
|
|
Args:
|
|
n: Radial order
|
|
m: Azimuthal frequency (absolute value used)
|
|
r: Radial coordinates (normalized to unit disk)
|
|
|
|
Returns:
|
|
Radial polynomial evaluated at r
|
|
"""
|
|
R = np.zeros_like(r)
|
|
m_abs = abs(m)
|
|
|
|
for s in range((n - m_abs) // 2 + 1):
|
|
coef = ((-1)**s * factorial(n - s) /
|
|
(factorial(s) *
|
|
factorial((n + m_abs) // 2 - s) *
|
|
factorial((n - m_abs) // 2 - s)))
|
|
R += coef * r**(n - 2*s)
|
|
|
|
return R
|
|
|
|
|
|
def zernike_noll(j: int, r: np.ndarray, theta: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Evaluate Noll-indexed Zernike polynomial Z_j(r, theta).
|
|
|
|
Args:
|
|
j: Noll index
|
|
r: Radial coordinates (normalized to unit disk)
|
|
theta: Angular coordinates (radians)
|
|
|
|
Returns:
|
|
Zernike polynomial values at (r, theta)
|
|
"""
|
|
n, m = noll_indices(j)
|
|
R = zernike_radial(n, m, r)
|
|
|
|
if m == 0:
|
|
return R
|
|
elif m > 0:
|
|
return R * np.cos(m * theta)
|
|
else:
|
|
return R * np.sin(-m * theta)
|
|
|
|
|
|
def zernike_name(j: int) -> str:
|
|
"""
|
|
Get common optical name for Zernike mode.
|
|
|
|
Args:
|
|
j: Noll index
|
|
|
|
Returns:
|
|
Human-readable name (e.g., "Defocus", "Astigmatism 0 deg")
|
|
"""
|
|
n, m = noll_indices(j)
|
|
|
|
names = {
|
|
(0, 0): "Piston",
|
|
(1, -1): "Tilt X",
|
|
(1, 1): "Tilt Y",
|
|
(2, 0): "Defocus",
|
|
(2, -2): "Astigmatism 45 deg",
|
|
(2, 2): "Astigmatism 0 deg",
|
|
(3, -1): "Coma X",
|
|
(3, 1): "Coma Y",
|
|
(3, -3): "Trefoil X",
|
|
(3, 3): "Trefoil Y",
|
|
(4, 0): "Primary Spherical",
|
|
(4, -2): "Secondary Astig X",
|
|
(4, 2): "Secondary Astig Y",
|
|
(4, -4): "Quadrafoil X",
|
|
(4, 4): "Quadrafoil Y",
|
|
(5, -1): "Secondary Coma X",
|
|
(5, 1): "Secondary Coma Y",
|
|
(5, -3): "Secondary Trefoil X",
|
|
(5, 3): "Secondary Trefoil Y",
|
|
(5, -5): "Pentafoil X",
|
|
(5, 5): "Pentafoil Y",
|
|
(6, 0): "Secondary Spherical",
|
|
}
|
|
|
|
return names.get((n, m), f"Z(n={n}, m={m})")
|
|
|
|
|
|
def zernike_label(j: int) -> str:
|
|
"""Full label for Zernike mode: J{j} - Name (n=, m=)"""
|
|
n, m = noll_indices(j)
|
|
return f"J{j:02d} - {zernike_name(j)} (n={n}, m={m})"
|
|
|
|
|
|
# ============================================================================
|
|
# Zernike Coefficient Fitting
|
|
# ============================================================================
|
|
|
|
def compute_zernike_coefficients(
|
|
x: np.ndarray,
|
|
y: np.ndarray,
|
|
values: np.ndarray,
|
|
n_modes: int = DEFAULT_N_MODES,
|
|
chunk_size: int = DEFAULT_CHUNK_SIZE
|
|
) -> Tuple[np.ndarray, float]:
|
|
"""
|
|
Fit Zernike coefficients to surface data using least-squares.
|
|
|
|
Uses chunked processing for memory efficiency with large meshes.
|
|
Points outside the unit disk (after centering/normalization) are excluded.
|
|
|
|
Args:
|
|
x, y: Node coordinates (will be centered and normalized)
|
|
values: Surface values at each node (e.g., WFE in nm)
|
|
n_modes: Number of Zernike modes to fit
|
|
chunk_size: Chunk size for memory-efficient processing
|
|
|
|
Returns:
|
|
(coefficients, R_max): Zernike coefficients and normalization radius
|
|
"""
|
|
# Center coordinates
|
|
x_centered = x - np.mean(x)
|
|
y_centered = y - np.mean(y)
|
|
|
|
# Normalize to unit disk
|
|
R_max = float(np.max(np.hypot(x_centered, y_centered)))
|
|
r = np.hypot(x_centered / R_max, y_centered / R_max).astype(np.float32)
|
|
theta = np.arctan2(y_centered, x_centered).astype(np.float32)
|
|
|
|
# Mask: inside unit disk and valid values
|
|
mask = (r <= 1.0) & ~np.isnan(values)
|
|
if not np.any(mask):
|
|
raise RuntimeError("No valid points inside unit disk for Zernike fitting.")
|
|
|
|
idx = np.nonzero(mask)[0]
|
|
m = int(n_modes)
|
|
|
|
# Normal equations: (Z^T Z) c = Z^T v
|
|
# Build incrementally for memory efficiency
|
|
G = np.zeros((m, m), dtype=np.float64) # Z^T Z
|
|
h = np.zeros((m,), dtype=np.float64) # Z^T v
|
|
v = values.astype(np.float64)
|
|
|
|
for start in range(0, len(idx), chunk_size):
|
|
chunk_idx = idx[start:start + chunk_size]
|
|
r_chunk = r[chunk_idx]
|
|
theta_chunk = theta[chunk_idx]
|
|
v_chunk = v[chunk_idx]
|
|
|
|
# Build Zernike basis for this chunk
|
|
Z_chunk = np.column_stack([
|
|
zernike_noll(j, r_chunk, theta_chunk).astype(np.float32)
|
|
for j in range(1, m + 1)
|
|
])
|
|
|
|
# Accumulate normal equations
|
|
G += (Z_chunk.T @ Z_chunk).astype(np.float64)
|
|
h += (Z_chunk.T @ v_chunk).astype(np.float64)
|
|
|
|
# Solve normal equations
|
|
try:
|
|
coeffs = np.linalg.solve(G, h)
|
|
except LinAlgError:
|
|
coeffs = np.linalg.lstsq(G, h, rcond=None)[0]
|
|
|
|
return coeffs, R_max
|
|
|
|
|
|
# ============================================================================
|
|
# RMS Calculations
|
|
# ============================================================================
|
|
|
|
def compute_rms_metrics(
|
|
x: np.ndarray,
|
|
y: np.ndarray,
|
|
wfe: np.ndarray,
|
|
n_modes: int = DEFAULT_N_MODES,
|
|
filter_orders: int = DEFAULT_FILTER_ORDERS
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Compute global and filtered RMS wavefront error.
|
|
|
|
Args:
|
|
x, y: Node coordinates
|
|
wfe: Wavefront error values (nm)
|
|
n_modes: Number of Zernike modes to fit
|
|
filter_orders: Number of low-order modes to filter (typically 4)
|
|
|
|
Returns:
|
|
Dict with 'global_rms_nm', 'filtered_rms_nm', 'rms_filter_j1to3', etc.
|
|
"""
|
|
coeffs, R_max = compute_zernike_coefficients(x, y, wfe, n_modes)
|
|
|
|
# Reconstruct filtered WFE (remove low-order modes)
|
|
x_c = x - np.mean(x)
|
|
y_c = y - np.mean(y)
|
|
r = np.hypot(x_c / R_max, y_c / R_max)
|
|
theta = np.arctan2(y_c, x_c)
|
|
|
|
# Build Zernike basis for low-order modes only
|
|
Z_low = np.column_stack([
|
|
zernike_noll(j, r, theta) for j in range(1, filter_orders + 1)
|
|
])
|
|
|
|
# Subtract low-order contribution
|
|
wfe_filtered = wfe - Z_low @ coeffs[:filter_orders]
|
|
|
|
global_rms = float(np.sqrt(np.mean(wfe**2)))
|
|
filtered_rms = float(np.sqrt(np.mean(wfe_filtered**2)))
|
|
|
|
# Also compute J1-J3 filtered RMS (optician workload metric)
|
|
Z_j1to3 = np.column_stack([
|
|
zernike_noll(j, r, theta) for j in range(1, 4)
|
|
])
|
|
wfe_filter_j1to3 = wfe - Z_j1to3 @ coeffs[:3]
|
|
rms_filter_j1to3 = float(np.sqrt(np.mean(wfe_filter_j1to3**2)))
|
|
|
|
return {
|
|
'global_rms_nm': global_rms,
|
|
'filtered_rms_nm': filtered_rms,
|
|
'rms_filter_j1to3': rms_filter_j1to3,
|
|
'coefficients': coeffs,
|
|
'R_max': R_max,
|
|
'X': x,
|
|
'Y': y,
|
|
'wfe': wfe
|
|
}
|
|
|
|
|
|
def compute_aberration_magnitudes(coeffs: np.ndarray) -> Dict[str, float]:
|
|
"""
|
|
Compute RMS magnitudes of common optical aberrations.
|
|
|
|
Args:
|
|
coeffs: Zernike coefficients (at least 11 modes)
|
|
|
|
Returns:
|
|
Dict with aberration RMS values in nm
|
|
"""
|
|
if len(coeffs) < 11:
|
|
raise ValueError("Need at least 11 Zernike modes for aberration analysis")
|
|
|
|
return {
|
|
'defocus_nm': float(abs(coeffs[3])), # J4
|
|
'astigmatism_rms_nm': float(np.sqrt(coeffs[4]**2 + coeffs[5]**2)), # J5+J6
|
|
'coma_rms_nm': float(np.sqrt(coeffs[6]**2 + coeffs[7]**2)), # J7+J8
|
|
'trefoil_rms_nm': float(np.sqrt(coeffs[8]**2 + coeffs[9]**2)), # J9+J10
|
|
'spherical_nm': float(abs(coeffs[10])), # J11
|
|
}
|
|
|
|
|
|
def compute_rms_with_custom_filter(
|
|
x: np.ndarray,
|
|
y: np.ndarray,
|
|
wfe: np.ndarray,
|
|
coeffs: np.ndarray,
|
|
R_max: float,
|
|
filter_modes: int = 3
|
|
) -> float:
|
|
"""
|
|
Compute RMS with custom number of modes filtered.
|
|
|
|
This is useful for specific metrics like optician workload where
|
|
only piston, tip, and tilt (J1-J3) are removed, keeping defocus.
|
|
|
|
Args:
|
|
x, y: Node coordinates
|
|
wfe: Wavefront error values (nm)
|
|
coeffs: Pre-computed Zernike coefficients
|
|
R_max: Normalization radius from coefficient fitting
|
|
filter_modes: Number of modes to filter (default 3 for J1-J3)
|
|
|
|
Returns:
|
|
RMS after filtering specified modes
|
|
"""
|
|
x_c = x - np.mean(x)
|
|
y_c = y - np.mean(y)
|
|
r = np.hypot(x_c / R_max, y_c / R_max)
|
|
theta = np.arctan2(y_c, x_c)
|
|
|
|
# Build Zernike basis for filtered modes
|
|
Z_filter = np.column_stack([
|
|
zernike_noll(j, r, theta) for j in range(1, filter_modes + 1)
|
|
])
|
|
|
|
# Subtract filtered contribution
|
|
wfe_filtered = wfe - Z_filter @ coeffs[:filter_modes]
|
|
|
|
return float(np.sqrt(np.mean(wfe_filtered**2)))
|
|
|
|
|
|
# ============================================================================
|
|
# OP2/BDF Data Extraction
|
|
# ============================================================================
|
|
|
|
def read_node_geometry(bdf_path: Path) -> Dict[int, np.ndarray]:
|
|
"""
|
|
Read node coordinates from BDF/DAT file.
|
|
|
|
Args:
|
|
bdf_path: Path to .bdf or .dat file
|
|
|
|
Returns:
|
|
Dict mapping node ID to [x, y, z] coordinates
|
|
"""
|
|
bdf = BDF()
|
|
bdf.read_bdf(str(bdf_path))
|
|
|
|
return {
|
|
int(nid): node.get_position()
|
|
for nid, node in bdf.nodes.items()
|
|
}
|
|
|
|
|
|
def find_geometry_file(op2_path: Path) -> Path:
|
|
"""
|
|
Find matching BDF/DAT file for an OP2.
|
|
|
|
Looks for same-basename first, then any .dat/.bdf in same folder.
|
|
|
|
Args:
|
|
op2_path: Path to OP2 file
|
|
|
|
Returns:
|
|
Path to geometry file
|
|
"""
|
|
folder = op2_path.parent
|
|
base = op2_path.stem
|
|
|
|
# Try same basename
|
|
for ext in ['.dat', '.bdf']:
|
|
cand = folder / (base + ext)
|
|
if cand.exists():
|
|
return cand
|
|
|
|
# Try any geometry file
|
|
for name in folder.iterdir():
|
|
if name.suffix.lower() in ['.dat', '.bdf']:
|
|
return name
|
|
|
|
raise FileNotFoundError(f"No .dat or .bdf geometry file found for {op2_path}")
|
|
|
|
|
|
def extract_displacements_by_subcase(
|
|
op2_path: Path,
|
|
required_subcases: Optional[List[int]] = None
|
|
) -> Dict[str, Dict[str, np.ndarray]]:
|
|
"""
|
|
Extract displacement data from OP2 organized by subcase.
|
|
|
|
Args:
|
|
op2_path: Path to OP2 file
|
|
required_subcases: List of required subcases (e.g., [20, 40, 60, 90])
|
|
|
|
Returns:
|
|
Dict keyed by subcase label: {'20': {'node_ids': array, 'disp': array}, ...}
|
|
"""
|
|
op2 = OP2()
|
|
op2.read_op2(str(op2_path))
|
|
|
|
if not op2.displacements:
|
|
raise RuntimeError("No displacement data found in OP2 file")
|
|
|
|
result = {}
|
|
|
|
for key, darr in op2.displacements.items():
|
|
data = darr.data
|
|
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
|
|
if dmat is None:
|
|
continue
|
|
|
|
ngt = darr.node_gridtype.astype(int)
|
|
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
|
|
|
|
# Try to identify subcase from subtitle, label, or isubcase
|
|
subtitle = getattr(darr, 'subtitle', None)
|
|
op2_label = getattr(darr, 'label', None)
|
|
isubcase = getattr(darr, 'isubcase', None)
|
|
|
|
# Extract numeric from subtitle first, then label, then isubcase
|
|
import re
|
|
subcase_id = None
|
|
|
|
# Priority 1: subtitle (e.g., "GRAVITY 20 DEG")
|
|
if isinstance(subtitle, str) and subtitle.strip():
|
|
m = re.search(r'-?\d+', subtitle)
|
|
if m:
|
|
subcase_id = m.group(0)
|
|
|
|
# Priority 2: label field (e.g., "90 SUBCASE 1")
|
|
if subcase_id is None and isinstance(op2_label, str) and op2_label.strip():
|
|
m = re.search(r'-?\d+', op2_label)
|
|
if m:
|
|
subcase_id = m.group(0)
|
|
|
|
# Priority 3: isubcase number
|
|
if subcase_id is None and isinstance(isubcase, int):
|
|
subcase_id = str(isubcase)
|
|
|
|
if subcase_id:
|
|
result[subcase_id] = {
|
|
'node_ids': node_ids.astype(int),
|
|
'disp': dmat.copy()
|
|
}
|
|
|
|
# Validate required subcases if specified
|
|
if required_subcases:
|
|
missing = [str(s) for s in required_subcases if str(s) not in result]
|
|
if missing:
|
|
available = list(result.keys())
|
|
raise RuntimeError(
|
|
f"Required subcases {missing} not found. Available: {available}"
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# Main Extractor Class
|
|
# ============================================================================
|
|
|
|
class ZernikeExtractor:
|
|
"""
|
|
Complete Zernike analysis extractor for telescope mirror optimization.
|
|
|
|
This class handles:
|
|
- Loading OP2 displacement results
|
|
- Matching with BDF geometry
|
|
- Computing Zernike coefficients and RMS metrics
|
|
- Multi-subcase analysis (different gravity orientations)
|
|
- Relative metrics between subcases
|
|
|
|
Example usage in optimization:
|
|
extractor = ZernikeExtractor(op2_file, bdf_file)
|
|
|
|
# For single-objective optimization (minimize filtered RMS at 20 deg)
|
|
result = extractor.extract_subcase('20')
|
|
objective = result['filtered_rms_nm']
|
|
|
|
# For multi-subcase optimization
|
|
all_results = extractor.extract_all_subcases()
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
op2_path: Union[str, Path],
|
|
bdf_path: Optional[Union[str, Path]] = None,
|
|
displacement_unit: str = 'mm',
|
|
n_modes: int = DEFAULT_N_MODES,
|
|
filter_orders: int = DEFAULT_FILTER_ORDERS
|
|
):
|
|
"""
|
|
Initialize Zernike extractor.
|
|
|
|
Args:
|
|
op2_path: Path to OP2 results file
|
|
bdf_path: Path to BDF/DAT geometry file (auto-detected if None)
|
|
displacement_unit: Unit of displacement in OP2 ('mm', 'm', 'um', 'nm')
|
|
n_modes: Number of Zernike modes to fit
|
|
filter_orders: Number of low-order modes to filter
|
|
"""
|
|
self.op2_path = Path(op2_path)
|
|
self.bdf_path = Path(bdf_path) if bdf_path else find_geometry_file(self.op2_path)
|
|
self.displacement_unit = displacement_unit
|
|
self.n_modes = n_modes
|
|
self.filter_orders = filter_orders
|
|
|
|
# Unit conversion factor (displacement to nm)
|
|
self.nm_scale = UNIT_TO_NM.get(displacement_unit.lower(), 1e6)
|
|
|
|
# WFE = 2 * surface displacement (optical convention)
|
|
self.wfe_factor = 2.0 * self.nm_scale
|
|
|
|
# Lazy-loaded data
|
|
self._node_geo = None
|
|
self._displacements = None
|
|
|
|
@property
|
|
def node_geometry(self) -> Dict[int, np.ndarray]:
|
|
"""Lazy-load node geometry from BDF."""
|
|
if self._node_geo is None:
|
|
self._node_geo = read_node_geometry(self.bdf_path)
|
|
return self._node_geo
|
|
|
|
@property
|
|
def displacements(self) -> Dict[str, Dict[str, np.ndarray]]:
|
|
"""Lazy-load displacements from OP2."""
|
|
if self._displacements is None:
|
|
self._displacements = extract_displacements_by_subcase(self.op2_path)
|
|
return self._displacements
|
|
|
|
def _build_dataframe(
|
|
self,
|
|
subcase_label: str
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
"""
|
|
Build coordinate and WFE arrays for a subcase.
|
|
|
|
Returns:
|
|
(X, Y, WFE_nm): Arrays of coordinates and wavefront error
|
|
"""
|
|
if subcase_label not in self.displacements:
|
|
available = list(self.displacements.keys())
|
|
raise ValueError(f"Subcase '{subcase_label}' not found. Available: {available}")
|
|
|
|
data = self.displacements[subcase_label]
|
|
node_ids = data['node_ids']
|
|
disp = data['disp']
|
|
|
|
# Build arrays
|
|
X, Y, WFE = [], [], []
|
|
for nid, vec in zip(node_ids, disp):
|
|
geo = self.node_geometry.get(int(nid))
|
|
if geo is None:
|
|
continue
|
|
|
|
X.append(geo[0])
|
|
Y.append(geo[1])
|
|
# Z-displacement to WFE (nm)
|
|
wfe = vec[2] * self.wfe_factor
|
|
WFE.append(wfe)
|
|
|
|
return np.array(X), np.array(Y), np.array(WFE)
|
|
|
|
def extract_subcase(
|
|
self,
|
|
subcase_label: str,
|
|
include_coefficients: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Extract Zernike metrics for a single subcase.
|
|
|
|
Args:
|
|
subcase_label: Subcase identifier (e.g., '20', '90')
|
|
include_coefficients: Whether to include all Zernike coefficients
|
|
|
|
Returns:
|
|
Dict with RMS metrics, aberrations, and optionally coefficients
|
|
"""
|
|
X, Y, WFE = self._build_dataframe(subcase_label)
|
|
|
|
# Compute RMS metrics
|
|
rms_result = compute_rms_metrics(
|
|
X, Y, WFE, self.n_modes, self.filter_orders
|
|
)
|
|
|
|
# Compute aberration magnitudes
|
|
aberrations = compute_aberration_magnitudes(rms_result['coefficients'])
|
|
|
|
result = {
|
|
'subcase': subcase_label,
|
|
'global_rms_nm': rms_result['global_rms_nm'],
|
|
'filtered_rms_nm': rms_result['filtered_rms_nm'],
|
|
'rms_filter_j1to3': rms_result['rms_filter_j1to3'],
|
|
'n_nodes': len(X),
|
|
**aberrations
|
|
}
|
|
|
|
if include_coefficients:
|
|
result['coefficients'] = rms_result['coefficients'].tolist()
|
|
result['coefficient_labels'] = [zernike_label(j) for j in range(1, self.n_modes + 1)]
|
|
|
|
return result
|
|
|
|
def extract_relative(
|
|
self,
|
|
target_subcase: str,
|
|
reference_subcase: str,
|
|
include_coefficients: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Extract Zernike metrics relative to a reference subcase.
|
|
|
|
Computes: WFE_relative = WFE_target - WFE_reference
|
|
|
|
Args:
|
|
target_subcase: Subcase to analyze
|
|
reference_subcase: Reference subcase to subtract
|
|
include_coefficients: Whether to include all Zernike coefficients
|
|
|
|
Returns:
|
|
Dict with relative RMS metrics and aberrations
|
|
"""
|
|
X_t, Y_t, WFE_t = self._build_dataframe(target_subcase)
|
|
X_r, Y_r, WFE_r = self._build_dataframe(reference_subcase)
|
|
|
|
# Build node-to-index mapping for reference
|
|
target_data = self.displacements[target_subcase]
|
|
ref_data = self.displacements[reference_subcase]
|
|
|
|
ref_node_to_idx = {
|
|
int(nid): i for i, nid in enumerate(ref_data['node_ids'])
|
|
}
|
|
|
|
# Compute relative WFE for common nodes
|
|
X_rel, Y_rel, WFE_rel = [], [], []
|
|
|
|
for i, nid in enumerate(target_data['node_ids']):
|
|
nid = int(nid)
|
|
if nid not in ref_node_to_idx:
|
|
continue
|
|
|
|
ref_idx = ref_node_to_idx[nid]
|
|
geo = self.node_geometry.get(nid)
|
|
if geo is None:
|
|
continue
|
|
|
|
X_rel.append(geo[0])
|
|
Y_rel.append(geo[1])
|
|
|
|
target_wfe = target_data['disp'][i, 2] * self.wfe_factor
|
|
ref_wfe = ref_data['disp'][ref_idx, 2] * self.wfe_factor
|
|
WFE_rel.append(target_wfe - ref_wfe)
|
|
|
|
X_rel = np.array(X_rel)
|
|
Y_rel = np.array(Y_rel)
|
|
WFE_rel = np.array(WFE_rel)
|
|
|
|
# Compute metrics on relative WFE
|
|
rms_result = compute_rms_metrics(
|
|
X_rel, Y_rel, WFE_rel, self.n_modes, self.filter_orders
|
|
)
|
|
aberrations = compute_aberration_magnitudes(rms_result['coefficients'])
|
|
|
|
result = {
|
|
'target_subcase': target_subcase,
|
|
'reference_subcase': reference_subcase,
|
|
'relative_global_rms_nm': rms_result['global_rms_nm'],
|
|
'relative_filtered_rms_nm': rms_result['filtered_rms_nm'],
|
|
'relative_rms_filter_j1to3': rms_result['rms_filter_j1to3'],
|
|
'n_common_nodes': len(X_rel),
|
|
**{f'relative_{k}': v for k, v in aberrations.items()}
|
|
}
|
|
|
|
if include_coefficients:
|
|
result['coefficients'] = rms_result['coefficients'].tolist()
|
|
result['coefficient_labels'] = [zernike_label(j) for j in range(1, self.n_modes + 1)]
|
|
|
|
return result
|
|
|
|
def extract_all_subcases(
|
|
self,
|
|
reference_subcase: Optional[str] = '20'
|
|
) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Extract metrics for all available subcases.
|
|
|
|
Args:
|
|
reference_subcase: Reference for relative calculations (None to skip)
|
|
|
|
Returns:
|
|
Dict mapping subcase label to metrics dict
|
|
"""
|
|
results = {}
|
|
|
|
for label in self.displacements.keys():
|
|
results[label] = self.extract_subcase(label)
|
|
|
|
# Add relative metrics if reference specified
|
|
if reference_subcase and label != reference_subcase:
|
|
try:
|
|
rel = self.extract_relative(label, reference_subcase)
|
|
results[label].update({
|
|
f'rel_{reference_subcase}_{k}': v
|
|
for k, v in rel.items()
|
|
if k.startswith('relative_')
|
|
})
|
|
except Exception as e:
|
|
results[label][f'rel_{reference_subcase}_error'] = str(e)
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================================
|
|
# Convenience Functions for Optimization
|
|
# ============================================================================
|
|
|
|
def extract_zernike_from_op2(
|
|
op2_file: Union[str, Path],
|
|
bdf_file: Optional[Union[str, Path]] = None,
|
|
subcase: Union[int, str] = 1,
|
|
displacement_unit: str = 'mm',
|
|
n_modes: int = DEFAULT_N_MODES,
|
|
filter_orders: int = DEFAULT_FILTER_ORDERS
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Convenience function to extract Zernike metrics from OP2.
|
|
|
|
This is the main entry point for optimization objectives.
|
|
|
|
Args:
|
|
op2_file: Path to OP2 results file
|
|
bdf_file: Path to BDF geometry (auto-detected if None)
|
|
subcase: Subcase identifier
|
|
displacement_unit: Unit of displacement in OP2
|
|
n_modes: Number of Zernike modes
|
|
filter_orders: Low-order modes to filter
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'global_rms_nm': Global RMS WFE in nanometers
|
|
- 'filtered_rms_nm': Filtered RMS (low orders removed)
|
|
- 'defocus_nm', 'astigmatism_rms_nm', etc.: Individual aberrations
|
|
"""
|
|
extractor = ZernikeExtractor(
|
|
op2_file, bdf_file, displacement_unit, n_modes, filter_orders
|
|
)
|
|
return extractor.extract_subcase(str(subcase))
|
|
|
|
|
|
def extract_zernike_filtered_rms(
|
|
op2_file: Union[str, Path],
|
|
bdf_file: Optional[Union[str, Path]] = None,
|
|
subcase: Union[int, str] = 1,
|
|
**kwargs
|
|
) -> float:
|
|
"""
|
|
Extract filtered RMS WFE - the primary metric for mirror optimization.
|
|
|
|
Filtered RMS removes piston, tip, tilt, and defocus (modes 1-4),
|
|
which can be corrected by alignment and focus adjustment.
|
|
|
|
Args:
|
|
op2_file: Path to OP2 file
|
|
bdf_file: Path to BDF geometry (auto-detected if None)
|
|
subcase: Subcase identifier
|
|
**kwargs: Additional arguments for ZernikeExtractor
|
|
|
|
Returns:
|
|
Filtered RMS WFE in nanometers
|
|
"""
|
|
result = extract_zernike_from_op2(op2_file, bdf_file, subcase, **kwargs)
|
|
return result['filtered_rms_nm']
|
|
|
|
|
|
def extract_zernike_relative_rms(
|
|
op2_file: Union[str, Path],
|
|
target_subcase: Union[int, str],
|
|
reference_subcase: Union[int, str],
|
|
bdf_file: Optional[Union[str, Path]] = None,
|
|
**kwargs
|
|
) -> float:
|
|
"""
|
|
Extract relative filtered RMS between two subcases.
|
|
|
|
Useful for analyzing gravity-induced deformation relative to
|
|
a reference orientation (e.g., polishing position).
|
|
|
|
Args:
|
|
op2_file: Path to OP2 file
|
|
target_subcase: Subcase to analyze
|
|
reference_subcase: Reference subcase
|
|
bdf_file: Path to BDF geometry
|
|
**kwargs: Additional arguments for ZernikeExtractor
|
|
|
|
Returns:
|
|
Relative filtered RMS WFE in nanometers
|
|
"""
|
|
extractor = ZernikeExtractor(op2_file, bdf_file, **kwargs)
|
|
result = extractor.extract_relative(str(target_subcase), str(reference_subcase))
|
|
return result['relative_filtered_rms_nm']
|
|
|
|
|
|
# ============================================================================
|
|
# Module Exports
|
|
# ============================================================================
|
|
|
|
__all__ = [
|
|
# Main extractor class
|
|
'ZernikeExtractor',
|
|
|
|
# Convenience functions for optimization
|
|
'extract_zernike_from_op2',
|
|
'extract_zernike_filtered_rms',
|
|
'extract_zernike_relative_rms',
|
|
|
|
# Zernike utilities (for advanced use)
|
|
'compute_zernike_coefficients',
|
|
'compute_rms_metrics',
|
|
'compute_rms_with_custom_filter',
|
|
'compute_aberration_magnitudes',
|
|
'noll_indices',
|
|
'zernike_noll',
|
|
'zernike_name',
|
|
'zernike_label',
|
|
]
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Example/test usage
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
op2_file = Path(sys.argv[1])
|
|
|
|
print(f"Analyzing: {op2_file}")
|
|
|
|
try:
|
|
extractor = ZernikeExtractor(op2_file)
|
|
|
|
print(f"\nAvailable subcases: {list(extractor.displacements.keys())}")
|
|
|
|
results = extractor.extract_all_subcases()
|
|
|
|
for label, metrics in results.items():
|
|
print(f"\n=== Subcase {label} ===")
|
|
print(f" Global RMS: {metrics['global_rms_nm']:.2f} nm")
|
|
print(f" Filtered RMS: {metrics['filtered_rms_nm']:.2f} nm")
|
|
print(f" Astigmatism: {metrics['astigmatism_rms_nm']:.2f} nm")
|
|
print(f" Coma: {metrics['coma_rms_nm']:.2f} nm")
|
|
print(f" Trefoil: {metrics['trefoil_rms_nm']:.2f} nm")
|
|
print(f" Spherical: {metrics['spherical_nm']:.2f} nm")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
else:
|
|
print("Usage: python extract_zernike.py <op2_file>")
|
|
print("\nThis module provides Zernike coefficient extraction for telescope mirror optimization.")
|
|
print("\nExample in optimization config:")
|
|
print(' "objectives": [')
|
|
print(' {')
|
|
print(' "name": "filtered_rms",')
|
|
print(' "extractor": "zernike",')
|
|
print(' "direction": "minimize",')
|
|
print(' "extractor_config": {')
|
|
print(' "subcase": "20",')
|
|
print(' "metric": "filtered_rms_nm"')
|
|
print(' }')
|
|
print(' }')
|
|
print(' ]')
|