Files
Atomizer/optimization_engine/insights/zernike_opd_comparison.py
Anto01 d19fc39a2a feat: Add OPD method support to Zernike visualization with Standard/OPD toggle
Major improvements to Zernike WFE visualization:

- Add ZernikeDashboardInsight: Unified dashboard with all orientations (40°, 60°, 90°)
  on one page with light theme and executive summary
- Add OPD method toggle: Switch between Standard (Z-only) and OPD (X,Y,Z) methods
  in ZernikeWFEInsight with interactive buttons
- Add lateral displacement maps: Visualize X,Y displacement for each orientation
- Add displacement component views: Toggle between WFE, ΔX, ΔY, ΔZ in relative views
- Add metrics comparison table showing both methods side-by-side

New extractors:
- extract_zernike_figure.py: ZernikeOPDExtractor using BDF geometry interpolation
- extract_zernike_opd.py: Parabola-based OPD with focal length

Key finding: OPD method gives 8-11% higher WFE values than Standard method
(more conservative/accurate for surfaces with lateral displacement under gravity)

Documentation updates:
- SYS_12: Added E22 ZernikeOPD as recommended method
- SYS_16: Added ZernikeDashboard, updated ZernikeWFE with OPD features
- Cheatsheet: Added Zernike method comparison table

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-22 21:03:19 -05:00

626 lines
23 KiB
Python

"""
Zernike OPD Method Comparison Insight
=====================================
Compares standard Z-only Zernike method against rigorous OPD method that
accounts for lateral (X, Y) displacements. Critical for understanding
whether your optimization objective is capturing true optical performance.
WHY THIS MATTERS:
-----------------
When supports pinch or laterally deform the mirror, nodes shift in X and Y.
The standard Zernike method ignores this, potentially leading to:
- Optimized designs that "cheat" by distorting laterally
- False low WFE when the actual optical surface is degraded
- Optimizer convergence to non-optimal solutions
This insight visualizes:
1. The difference between methods (where they disagree)
2. Lateral displacement magnitude map (where pinching occurs)
3. Which method you should be using for your study
Applicable to: All mirror/optics studies, CRITICAL for lateral support optimization.
"""
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional, Tuple
import numpy as np
from .base import StudyInsight, InsightConfig, InsightResult, register_insight
# Lazy imports
_plotly_loaded = False
_go = None
_make_subplots = None
_Triangulation = None
def _load_dependencies():
"""Lazy load heavy dependencies."""
global _plotly_loaded, _go, _make_subplots, _Triangulation
if not _plotly_loaded:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.tri import Triangulation
_go = go
_make_subplots = make_subplots
_Triangulation = Triangulation
_plotly_loaded = True
# Default configuration
DEFAULT_CONFIG = {
'amp': 0.5, # Visual deformation scale
'pancake': 3.0, # Z-axis range multiplier
'plot_downsample': 15000,
'colorscale_wfe': 'RdBu', # Diverging for WFE difference
'colorscale_lateral': 'Hot', # Sequential for lateral magnitude
'disp_unit': 'mm',
'n_modes': 50,
'filter_orders': 4,
}
@register_insight
class ZernikeOPDComparisonInsight(StudyInsight):
"""
Compare standard vs OPD-rigorous Zernike methods.
Generates visualizations showing:
- Lateral displacement magnitude map (where pinching occurs)
- WFE difference map (where methods disagree)
- Quantitative comparison tables
- Recommendation for which method to use
"""
insight_type = "zernike_opd_comparison"
name = "Zernike Method Comparison (OPD vs Standard)"
description = "Compare Z-only vs rigorous OPD Zernike methods - reveals lateral displacement impact"
category = "optical"
applicable_to = ["mirror", "optics", "wfe", "lateral"]
required_files = ["*.op2"]
def __init__(self, study_path: Path):
super().__init__(study_path)
self.op2_path: Optional[Path] = None
self.geo_path: Optional[Path] = None
self._node_geo: Optional[Dict] = None
self._displacements: Optional[Dict] = None
def can_generate(self) -> bool:
"""Check if OP2 and geometry files exist."""
search_paths = [
self.results_path,
self.study_path / "2_iterations",
self.setup_path / "model",
]
for search_path in search_paths:
if not search_path.exists():
continue
op2_files = list(search_path.glob("**/*solution*.op2"))
if not op2_files:
op2_files = list(search_path.glob("**/*.op2"))
if op2_files:
self.op2_path = max(op2_files, key=lambda p: p.stat().st_mtime)
break
if self.op2_path is None:
return False
try:
self.geo_path = self._find_geometry_file(self.op2_path)
return True
except FileNotFoundError:
return False
def _find_geometry_file(self, op2_path: Path) -> Path:
"""Find BDF/DAT geometry file for OP2."""
folder = op2_path.parent
base = op2_path.stem
for ext in ['.dat', '.bdf']:
cand = folder / (base + ext)
if cand.exists():
return cand
for f in folder.iterdir():
if f.suffix.lower() in ['.dat', '.bdf']:
return f
raise FileNotFoundError(f"No geometry file found for {op2_path}")
def _load_data(self):
"""Load geometry and displacement data."""
if self._node_geo is not None:
return
_load_dependencies()
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
# Read geometry
bdf = BDF()
bdf.read_bdf(str(self.geo_path))
self._node_geo = {int(nid): node.get_position()
for nid, node in bdf.nodes.items()}
# Read displacements
op2 = OP2()
op2.read_op2(str(self.op2_path))
if not op2.displacements:
raise RuntimeError("No displacement data in OP2")
self._displacements = {}
for key, darr in op2.displacements.items():
data = darr.data
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
if dmat is None:
continue
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
isubcase = getattr(darr, 'isubcase', None)
label = str(isubcase) if isubcase else str(key)
self._displacements[label] = {
'node_ids': node_ids.astype(int),
'disp': dmat.copy()
}
def _build_full_data(
self,
label: str,
disp_unit: str = 'mm'
) -> Dict[str, np.ndarray]:
"""Build complete data arrays including X, Y, Z displacements."""
nm_per_unit = 1e6 if disp_unit == 'mm' else 1e9
um_per_unit = nm_per_unit / 1000.0
data = self._displacements[label]
node_ids = data['node_ids']
dmat = data['disp']
x0, y0, z0 = [], [], []
dx, dy, dz = [], [], []
valid_nids = []
for nid, vec in zip(node_ids, dmat):
geo = self._node_geo.get(int(nid))
if geo is None:
continue
x0.append(geo[0])
y0.append(geo[1])
z0.append(geo[2])
dx.append(vec[0])
dy.append(vec[1])
dz.append(vec[2])
valid_nids.append(nid)
return {
'x0': np.array(x0),
'y0': np.array(y0),
'z0': np.array(z0),
'dx': np.array(dx),
'dy': np.array(dy),
'dz': np.array(dz),
'nids': np.array(valid_nids),
'nm_scale': nm_per_unit,
'um_scale': um_per_unit,
}
def _estimate_focal_length(self, x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float:
"""Estimate parabola focal length from geometry."""
r_squared = x**2 + y**2
mask = r_squared > 0
if not np.any(mask):
return 5000.0 # Default fallback
# Fit z = a * r² (assume centered parabola)
A = r_squared[mask].reshape(-1, 1)
b = z[mask]
try:
a, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
a = a[0]
except:
a = np.median(z[mask] / r_squared[mask])
if abs(a) < 1e-12:
return 5000.0
return 1.0 / (4.0 * abs(a))
def _compute_opd_comparison(
self,
data: Dict[str, np.ndarray],
n_modes: int,
filter_orders: int
) -> Dict[str, Any]:
"""Compute both methods and return comparison data."""
from ..extractors.extract_zernike import compute_zernike_coefficients
x0 = data['x0']
y0 = data['y0']
z0 = data['z0']
dx = data['dx']
dy = data['dy']
dz = data['dz']
nm_scale = data['nm_scale']
um_scale = data['um_scale']
# Estimate focal length
focal = self._estimate_focal_length(x0, y0, z0)
# === STANDARD METHOD (Z-only) ===
# Uses original (x0, y0) with Z-displacement
wfe_standard = dz * 2.0 * nm_scale # WFE = 2 * surface error
coeffs_std, R_std = compute_zernike_coefficients(x0, y0, wfe_standard, n_modes)
# Compute filtered RMS
x_c = x0 - np.mean(x0)
y_c = y0 - np.mean(y0)
r = np.hypot(x_c / R_std, y_c / R_std)
th = np.arctan2(y_c, x_c)
from ..extractors.extract_zernike import zernike_noll
Z_low = np.column_stack([zernike_noll(j, r, th) for j in range(1, filter_orders + 1)])
wfe_std_filtered = wfe_standard - Z_low @ coeffs_std[:filter_orders]
rms_std_filtered = float(np.sqrt(np.mean(wfe_std_filtered**2)))
rms_std_global = float(np.sqrt(np.mean(wfe_standard**2)))
# === RIGOROUS OPD METHOD ===
# Uses deformed (x0+dx, y0+dy) and computes true surface error
# The CORRECT formulation uses DIFFERENTIAL approach:
# surface_error = dz - delta_z_parabola
# where delta_z_parabola is the Z change needed to stay on ideal parabola
x_def = x0 + dx
y_def = y0 + dy
# Compute the CHANGE in parabola Z due to lateral displacement
# For parabola: z = -r²/(4f), so:
# Δz_parabola = z(x_def, y_def) - z(x0, y0)
# = -[(x0+dx)² + (y0+dy)² - x0² - y0²] / (4f)
r0_sq = x0**2 + y0**2
r_def_sq = x_def**2 + y_def**2
delta_r_sq = r_def_sq - r0_sq
# For concave mirror, z = -r²/(4f), so Δz = -Δr²/(4f)
delta_z_parabola = -delta_r_sq / (4.0 * focal)
# TRUE surface error = actual dz - expected dz (to stay on parabola)
surface_error = dz - delta_z_parabola
wfe_opd = surface_error * 2.0 * nm_scale
coeffs_opd, R_opd = compute_zernike_coefficients(x_def, y_def, wfe_opd, n_modes)
x_c_def = x_def - np.mean(x_def)
y_c_def = y_def - np.mean(y_def)
r_def = np.hypot(x_c_def / R_opd, y_c_def / R_opd)
th_def = np.arctan2(y_c_def, x_c_def)
Z_low_opd = np.column_stack([zernike_noll(j, r_def, th_def) for j in range(1, filter_orders + 1)])
wfe_opd_filtered = wfe_opd - Z_low_opd @ coeffs_opd[:filter_orders]
rms_opd_filtered = float(np.sqrt(np.mean(wfe_opd_filtered**2)))
rms_opd_global = float(np.sqrt(np.mean(wfe_opd**2)))
# === LATERAL DISPLACEMENT ===
lateral_disp = np.sqrt(dx**2 + dy**2) * um_scale # in µm
# === DIFFERENCE ANALYSIS ===
# Where methods disagree (use original coords for visualization)
wfe_difference = wfe_opd - wfe_standard # Could be different due to coordinate system
# More meaningful: compare the residuals
return {
# Coordinates for plotting
'x_plot': x0,
'y_plot': y0,
# Standard method
'wfe_standard': wfe_standard,
'wfe_std_filtered': wfe_std_filtered,
'rms_std_global': rms_std_global,
'rms_std_filtered': rms_std_filtered,
# OPD method
'wfe_opd': wfe_opd,
'wfe_opd_filtered': wfe_opd_filtered,
'rms_opd_global': rms_opd_global,
'rms_opd_filtered': rms_opd_filtered,
'x_deformed': x_def,
'y_deformed': y_def,
# Lateral displacement
'lateral_disp_um': lateral_disp,
'max_lateral_um': float(np.max(lateral_disp)),
'rms_lateral_um': float(np.sqrt(np.mean(lateral_disp**2))),
'p99_lateral_um': float(np.percentile(lateral_disp, 99)),
# Deltas
'delta_global': rms_opd_global - rms_std_global,
'delta_filtered': rms_opd_filtered - rms_std_filtered,
'pct_diff_filtered': 100.0 * (rms_opd_filtered - rms_std_filtered) / rms_std_filtered if rms_std_filtered > 0 else 0.0,
# Parabola info
'focal_length': focal,
}
def _get_recommendation(self, comparison: Dict[str, Any]) -> Tuple[str, str]:
"""Generate recommendation based on comparison results."""
max_lateral = comparison['max_lateral_um']
pct_diff = abs(comparison['pct_diff_filtered'])
if max_lateral > 10.0:
severity = "CRITICAL"
msg = (f"Large lateral displacements detected (max {max_lateral:.1f} µm). "
f"Methods differ by {pct_diff:.1f}%. "
f"You MUST use OPD method for accurate optimization.")
elif max_lateral > 1.0:
severity = "RECOMMENDED"
msg = (f"Significant lateral displacements (max {max_lateral:.2f} µm). "
f"Methods differ by {pct_diff:.1f}%. "
f"OPD method recommended for better accuracy.")
elif max_lateral > 0.1:
severity = "OPTIONAL"
msg = (f"Small lateral displacements (max {max_lateral:.3f} µm). "
f"Methods differ by {pct_diff:.2f}%. "
f"OPD method provides minor improvement.")
else:
severity = "EQUIVALENT"
msg = (f"Negligible lateral displacements (max {max_lateral:.4f} µm). "
f"Both methods give essentially identical results.")
return severity, msg
def _generate_html(
self,
title: str,
comparison: Dict[str, Any],
config: Dict[str, Any]
) -> str:
"""Generate comparison HTML with multiple views."""
_load_dependencies()
downsample = config.get('plot_downsample', 15000)
colorscale_wfe = config.get('colorscale_wfe', 'RdBu')
colorscale_lateral = config.get('colorscale_lateral', 'Hot')
amp = config.get('amp', 0.5)
x = comparison['x_plot']
y = comparison['y_plot']
lateral = comparison['lateral_disp_um']
wfe_std = comparison['wfe_std_filtered']
wfe_opd = comparison['wfe_opd_filtered']
# Downsample if needed
n = len(x)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
x, y = x[sel], y[sel]
lateral = lateral[sel]
wfe_std = wfe_std[sel]
wfe_opd = wfe_opd[sel]
# Get recommendation
severity, recommendation = self._get_recommendation(comparison)
severity_color = {
'CRITICAL': '#ef4444',
'RECOMMENDED': '#f59e0b',
'OPTIONAL': '#3b82f6',
'EQUIVALENT': '#10b981'
}.get(severity, '#6b7280')
# Create subplots: 2 rows, 2 cols
# Row 1: Lateral displacement map, WFE difference map
# Row 2: Comparison table, Recommendation
fig = _make_subplots(
rows=2, cols=2,
specs=[
[{"type": "scene"}, {"type": "scene"}],
[{"type": "table", "colspan": 2}, None]
],
row_heights=[0.65, 0.35],
column_widths=[0.5, 0.5],
vertical_spacing=0.08,
horizontal_spacing=0.05,
subplot_titles=[
"<b>Lateral Displacement Magnitude (µm)</b>",
"<b>Filtered WFE Comparison (nm)</b>",
"<b>Method Comparison</b>"
]
)
# === Plot 1: Lateral displacement map ===
try:
tri = _Triangulation(x, y)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
fig.add_trace(_go.Mesh3d(
x=x, y=y, z=lateral * amp * 100, # Scale for visibility
i=i, j=j, k=k,
intensity=lateral,
colorscale=colorscale_lateral,
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
showscale=True,
colorbar=dict(
title=dict(text="Lateral (µm)", side="right"),
x=0.42, thickness=15, len=0.5
),
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>Lateral: %{intensity:.3f} µm<extra></extra>"
), row=1, col=1)
except:
fig.add_trace(_go.Scatter3d(
x=x, y=y, z=lateral * amp * 100,
mode='markers',
marker=dict(size=2, color=lateral, colorscale=colorscale_lateral, showscale=True)
), row=1, col=1)
# === Plot 2: WFE comparison (show OPD method) ===
try:
tri = _Triangulation(x, y)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
fig.add_trace(_go.Mesh3d(
x=x, y=y, z=wfe_opd * amp,
i=i, j=j, k=k,
intensity=wfe_opd,
colorscale='Turbo',
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
showscale=True,
colorbar=dict(
title=dict(text="WFE (nm)", side="right"),
x=1.0, thickness=15, len=0.5
),
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>WFE: %{intensity:.2f} nm<extra></extra>"
), row=1, col=2)
except:
fig.add_trace(_go.Scatter3d(
x=x, y=y, z=wfe_opd * amp,
mode='markers',
marker=dict(size=2, color=wfe_opd, colorscale='Turbo', showscale=True)
), row=1, col=2)
# Configure 3D scenes
for scene_name in ['scene', 'scene2']:
fig.update_layout(**{
scene_name: dict(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8)),
xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
zaxis=dict(title="", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
aspectmode='data',
)
})
# === Comparison Table ===
delta_sign = "+" if comparison['delta_filtered'] > 0 else ""
fig.add_trace(_go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Standard (Z-only)</b>", "<b>OPD (Rigorous)</b>", "<b>Difference</b>"],
align="left",
fill_color='#1f2937',
font=dict(color='white', size=12),
height=30
),
cells=dict(
values=[
# Metric names
["Global RMS (nm)", "Filtered RMS (nm)", "Max Lateral Disp (µm)",
"RMS Lateral Disp (µm)", "P99 Lateral Disp (µm)", "Focal Length (mm)",
f"<b>RECOMMENDATION: {severity}</b>"],
# Standard method
[f"{comparison['rms_std_global']:.2f}", f"{comparison['rms_std_filtered']:.2f}",
"N/A", "N/A", "N/A", f"{comparison['focal_length']:.1f}", ""],
# OPD method
[f"{comparison['rms_opd_global']:.2f}", f"{comparison['rms_opd_filtered']:.2f}",
f"{comparison['max_lateral_um']:.3f}", f"{comparison['rms_lateral_um']:.3f}",
f"{comparison['p99_lateral_um']:.3f}", f"{comparison['focal_length']:.1f}", ""],
# Difference
[f"{delta_sign}{comparison['delta_global']:.2f}",
f"{delta_sign}{comparison['delta_filtered']:.2f} ({delta_sign}{comparison['pct_diff_filtered']:.1f}%)",
"-", "-", "-", "-", recommendation],
],
align="left",
fill_color=[['#374151']*7, ['#374151']*7, ['#374151']*7,
['#374151']*6 + [severity_color]],
font=dict(color='white', size=11),
height=25
)
), row=2, col=1)
# Layout
fig.update_layout(
width=1600,
height=1000,
margin=dict(t=80, b=20, l=20, r=20),
paper_bgcolor='#111827',
plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(
text=f"<b>Atomizer Zernike Method Comparison - {title}</b><br>"
f"<span style='font-size:14px;color:{severity_color}'>{severity}: {recommendation[:80]}...</span>",
x=0.5,
font=dict(size=18)
)
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate(self, config: InsightConfig) -> InsightResult:
"""Generate method comparison visualization."""
self._load_data()
cfg = {**DEFAULT_CONFIG, **config.extra}
n_modes = cfg['n_modes']
filter_orders = cfg['filter_orders']
disp_unit = cfg['disp_unit']
# Find subcases
disps = self._displacements
available = list(disps.keys())
if not available:
return InsightResult(success=False, error="No subcases found in OP2")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = config.output_dir or self.insights_path
output_dir.mkdir(parents=True, exist_ok=True)
html_files = []
all_comparisons = {}
# Process each subcase
for label in available:
data = self._build_full_data(label, disp_unit)
comparison = self._compute_opd_comparison(data, n_modes, filter_orders)
title = f"Subcase {label}"
html = self._generate_html(title, comparison, cfg)
path = output_dir / f"zernike_opd_comparison_{timestamp}_{label}.html"
path.write_text(html, encoding='utf-8')
html_files.append(path)
severity, _ = self._get_recommendation(comparison)
all_comparisons[label] = {
'rms_std_filtered': comparison['rms_std_filtered'],
'rms_opd_filtered': comparison['rms_opd_filtered'],
'pct_diff': comparison['pct_diff_filtered'],
'max_lateral_um': comparison['max_lateral_um'],
'severity': severity,
}
# Determine overall recommendation
severities = [c['severity'] for c in all_comparisons.values()]
if 'CRITICAL' in severities:
overall = "CRITICAL: Use OPD method for all optimization"
elif 'RECOMMENDED' in severities:
overall = "RECOMMENDED: Switch to OPD method for better accuracy"
elif 'OPTIONAL' in severities:
overall = "OPTIONAL: OPD method provides minor improvement"
else:
overall = "EQUIVALENT: Standard method is fine for this study"
return InsightResult(
success=True,
html_path=html_files[0],
summary={
'html_files': [str(p) for p in html_files],
'comparisons': all_comparisons,
'overall_recommendation': overall,
}
)