"""
Zernike Wavefront Error (WFE) Insight
Provides 3D surface visualization of mirror wavefront errors with
Zernike polynomial decomposition. Generates three views:
- 40 deg vs 20 deg (operational tilt comparison)
- 60 deg vs 20 deg (operational tilt comparison)
- 90 deg Manufacturing (absolute with optician workload metrics)
Supports two WFE computation methods:
- Standard: Z-displacement only at original (x, y) coordinates
- OPD: Accounts for lateral (X, Y) displacement via interpolation (RECOMMENDED)
Applicable to: Mirror optimization studies with multi-subcase gravity loads.
"""
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
import numpy as np
from math import factorial
from numpy.linalg import LinAlgError
from .base import StudyInsight, InsightConfig, InsightResult, register_insight
# Lazy import for OPD extractor
_ZernikeOPDExtractor = None
# Lazy imports to avoid startup overhead
_plotly_loaded = False
_go = None
_make_subplots = None
_Triangulation = None
_OP2 = None
_BDF = None
def _load_dependencies():
"""Lazy load heavy dependencies."""
global _plotly_loaded, _go, _make_subplots, _Triangulation, _OP2, _BDF, _ZernikeOPDExtractor
if not _plotly_loaded:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.tri import Triangulation
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
from ..extractors.extract_zernike_figure import ZernikeOPDExtractor
_go = go
_make_subplots = make_subplots
_Triangulation = Triangulation
_OP2 = OP2
_BDF = BDF
_ZernikeOPDExtractor = ZernikeOPDExtractor
_plotly_loaded = True
# ============================================================================
# Zernike Mathematics
# ============================================================================
def noll_indices(j: int) -> Tuple[int, int]:
"""Convert Noll index to (n, m) radial/azimuthal orders."""
if j < 1:
raise ValueError("Noll index j must be >= 1")
count = 0
n = 0
while True:
if n == 0:
ms = [0]
elif n % 2 == 0:
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
else:
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
for m in ms:
count += 1
if count == j:
return n, m
n += 1
def zernike_noll(j: int, r: np.ndarray, th: np.ndarray) -> np.ndarray:
"""Evaluate Zernike polynomial j at (r, theta)."""
n, m = noll_indices(j)
R = np.zeros_like(r)
for s in range((n - abs(m)) // 2 + 1):
c = ((-1)**s * factorial(n - s) /
(factorial(s) *
factorial((n + abs(m)) // 2 - s) *
factorial((n - abs(m)) // 2 - s)))
R += c * r**(n - 2*s)
if m == 0:
return R
return R * (np.cos(m * th) if m > 0 else np.sin(-m * th))
def zernike_common_name(n: int, m: int) -> str:
"""Get common name for Zernike mode."""
names = {
(0, 0): "Piston", (1, -1): "Tilt X", (1, 1): "Tilt Y",
(2, 0): "Defocus", (2, -2): "Astig 45°", (2, 2): "Astig 0°",
(3, -1): "Coma X", (3, 1): "Coma Y", (3, -3): "Trefoil X", (3, 3): "Trefoil Y",
(4, 0): "Primary Spherical", (4, -2): "Sec Astig X", (4, 2): "Sec Astig Y",
(4, -4): "Quadrafoil X", (4, 4): "Quadrafoil Y",
(5, -1): "Sec Coma X", (5, 1): "Sec Coma Y",
(5, -3): "Sec Trefoil X", (5, 3): "Sec Trefoil Y",
(5, -5): "Pentafoil X", (5, 5): "Pentafoil Y",
(6, 0): "Sec Spherical",
}
return names.get((n, m), f"Z(n={n}, m={m})")
def zernike_label(j: int) -> str:
"""Get label for Zernike coefficient J{j}."""
n, m = noll_indices(j)
return f"J{j:02d} - {zernike_common_name(n, m)} (n={n}, m={m})"
def compute_zernike_coeffs(
X: np.ndarray,
Y: np.ndarray,
vals: np.ndarray,
n_modes: int,
chunk_size: int = 100000
) -> Tuple[np.ndarray, float]:
"""Fit Zernike coefficients to WFE data."""
Xc, Yc = X - np.mean(X), Y - np.mean(Y)
R = float(np.max(np.hypot(Xc, Yc)))
r = np.hypot(Xc / R, Yc / R).astype(np.float32)
th = np.arctan2(Yc, Xc).astype(np.float32)
mask = (r <= 1.0) & ~np.isnan(vals)
if not np.any(mask):
raise RuntimeError("No valid points inside unit disk.")
idx = np.nonzero(mask)[0]
m = int(n_modes)
G = np.zeros((m, m), dtype=np.float64)
h = np.zeros((m,), dtype=np.float64)
v = vals.astype(np.float64)
for start in range(0, len(idx), chunk_size):
sl = idx[start:start + chunk_size]
r_b, th_b, v_b = r[sl], th[sl], v[sl]
Zb = np.column_stack([zernike_noll(j, r_b, th_b).astype(np.float32)
for j in range(1, m + 1)])
G += (Zb.T @ Zb).astype(np.float64)
h += (Zb.T @ v_b).astype(np.float64)
try:
coeffs = np.linalg.solve(G, h)
except LinAlgError:
coeffs = np.linalg.lstsq(G, h, rcond=None)[0]
return coeffs, R
# ============================================================================
# Configuration Defaults
# ============================================================================
DEFAULT_CONFIG = {
'n_modes': 36, # Reduced from 50 for faster computation (covers through 7th order)
'amp': 0.5, # Visual deformation scale
'pancake': 3.0, # Z-axis range multiplier
'plot_downsample': 8000, # Reduced from 10000 for faster rendering
'filter_low_orders': 4, # Piston, tip, tilt, defocus
'colorscale': 'Turbo',
'disp_unit': 'mm',
'show_bar_chart': True,
}
@register_insight
class ZernikeWFEInsight(StudyInsight):
"""
Zernike Wavefront Error visualization for mirror optimization.
Generates interactive 3D surface plots showing:
- Residual WFE after Zernike fit
- Coefficient bar charts
- RMS metrics tables
- Manufacturing orientation analysis
"""
insight_type = "zernike_wfe"
name = "Zernike WFE Analysis"
description = "3D wavefront error surface with Zernike decomposition"
category = "optical"
applicable_to = ["mirror", "optics", "wfe"]
required_files = ["*.op2"]
def __init__(self, study_path: Path):
super().__init__(study_path)
self.op2_path: Optional[Path] = None
self.geo_path: Optional[Path] = None
self._node_geo: Optional[Dict] = None
self._displacements: Optional[Dict] = None
self._opd_extractor: Optional[Any] = None # Cached OPD extractor
def can_generate(self) -> bool:
"""Check if OP2 and geometry files exist.
Uses fast non-recursive search to avoid slow glob operations.
"""
# Fast search order: best_design_archive first, then iterations
search_locations = [
(self.study_path / "3_results" / "best_design_archive", False), # (path, is_iter_parent)
(self.study_path / "2_iterations", True), # iterations have subdirs
(self.setup_path / "model", False),
]
op2_candidates = []
for search_path, is_iter_parent in search_locations:
if not search_path.exists():
continue
if is_iter_parent:
# For iterations, check each subdir (one level only)
try:
for subdir in search_path.iterdir():
if subdir.is_dir():
for f in subdir.iterdir():
if f.suffix.lower() == '.op2':
op2_candidates.append(f)
except (PermissionError, OSError):
continue
else:
# Direct check (non-recursive)
try:
for f in search_path.iterdir():
if f.suffix.lower() == '.op2':
op2_candidates.append(f)
# Also check one level down for best_design_archive
elif f.is_dir():
for sub in f.iterdir():
if sub.suffix.lower() == '.op2':
op2_candidates.append(sub)
except (PermissionError, OSError):
continue
if op2_candidates:
break # Found some, stop searching
if not op2_candidates:
return False
# Pick newest OP2
self.op2_path = max(op2_candidates, key=lambda p: p.stat().st_mtime)
# Find geometry
try:
self.geo_path = self._find_geometry_file(self.op2_path)
return True
except FileNotFoundError:
return False
def _find_geometry_file(self, op2_path: Path) -> Path:
"""Find BDF/DAT geometry file for OP2."""
folder = op2_path.parent
base = op2_path.stem
for ext in ['.dat', '.bdf']:
cand = folder / (base + ext)
if cand.exists():
return cand
for f in folder.iterdir():
if f.suffix.lower() in ['.dat', '.bdf']:
return f
raise FileNotFoundError(f"No geometry file found for {op2_path}")
def _load_data(self):
"""Load geometry and displacement data."""
if self._node_geo is not None:
return # Already loaded
_load_dependencies()
# Read geometry
bdf = _BDF()
bdf.read_bdf(str(self.geo_path))
self._node_geo = {int(nid): node.get_position()
for nid, node in bdf.nodes.items()}
# Read displacements
op2 = _OP2()
op2.read_op2(str(self.op2_path))
if not op2.displacements:
raise RuntimeError("No displacement data in OP2")
self._displacements = {}
for key, darr in op2.displacements.items():
data = darr.data
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
if dmat is None:
continue
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
isubcase = getattr(darr, 'isubcase', None)
label = str(isubcase) if isubcase else str(key)
self._displacements[label] = {
'node_ids': node_ids.astype(int),
'disp': dmat.copy()
}
def _build_wfe_arrays_standard(
self,
label: str,
disp_unit: str = 'mm'
) -> Dict[str, np.ndarray]:
"""Build X, Y, WFE arrays using standard Z-only method.
This is the original method that uses only Z-displacement
at the original (x, y) coordinates.
"""
nm_per_unit = 1e6 if disp_unit == 'mm' else 1e9
data = self._displacements[label]
node_ids = data['node_ids']
dmat = data['disp']
X, Y, WFE = [], [], []
valid_nids = []
dx_arr, dy_arr, dz_arr = [], [], []
for nid, vec in zip(node_ids, dmat):
geo = self._node_geo.get(int(nid))
if geo is None:
continue
X.append(geo[0])
Y.append(geo[1])
wfe = vec[2] * 2.0 * nm_per_unit # Z-disp to WFE
WFE.append(wfe)
valid_nids.append(nid)
dx_arr.append(vec[0])
dy_arr.append(vec[1])
dz_arr.append(vec[2])
return {
'X': np.array(X),
'Y': np.array(Y),
'WFE': np.array(WFE),
'node_ids': np.array(valid_nids),
'dx': np.array(dx_arr),
'dy': np.array(dy_arr),
'dz': np.array(dz_arr),
'lateral_disp': np.sqrt(np.array(dx_arr)**2 + np.array(dy_arr)**2),
'method': 'standard',
}
def _build_wfe_arrays_opd(
self,
label: str,
disp_unit: str = 'mm'
) -> Dict[str, np.ndarray]:
"""Build X, Y, WFE arrays using OPD method (accounts for lateral displacement).
This is the RECOMMENDED method that:
1. Uses deformed (x+dx, y+dy) coordinates for Zernike fitting
2. Computes true surface error via interpolation of undeformed geometry
Uses cached OPD extractor to avoid re-reading OP2/BDF files for each subcase.
"""
_load_dependencies()
# Reuse cached extractor to avoid re-reading files
if self._opd_extractor is None:
self._opd_extractor = _ZernikeOPDExtractor(
self.op2_path,
bdf_path=self.geo_path,
displacement_unit=disp_unit
)
opd_data = self._opd_extractor._build_figure_opd_data(label)
return {
'X': opd_data['x_deformed'],
'Y': opd_data['y_deformed'],
'WFE': opd_data['wfe_nm'],
'node_ids': opd_data['node_ids'],
'dx': opd_data['dx'],
'dy': opd_data['dy'],
'dz': opd_data['dz'],
'lateral_disp': opd_data['lateral_disp'],
'x_original': opd_data['x_original'],
'y_original': opd_data['y_original'],
'method': 'opd',
}
def _build_wfe_arrays(
self,
label: str,
disp_unit: str = 'mm',
method: str = 'opd'
) -> Dict[str, np.ndarray]:
"""Build X, Y, WFE arrays for a subcase.
Args:
label: Subcase label
disp_unit: Displacement unit ('mm' or 'm')
method: 'opd' (recommended) or 'standard'
Returns:
Dict with X, Y, WFE, node_ids, dx, dy, dz, lateral_disp arrays
"""
if method == 'opd':
return self._build_wfe_arrays_opd(label, disp_unit)
else:
return self._build_wfe_arrays_standard(label, disp_unit)
def _compute_relative_wfe(
self,
data1: Dict[str, np.ndarray],
data2: Dict[str, np.ndarray]
) -> Dict[str, np.ndarray]:
"""Compute relative displacement and WFE (target - reference) for common nodes.
Args:
data1: Target subcase data dict
data2: Reference subcase data dict
Returns:
Dict with relative WFE arrays AND relative displacement components (dx, dy, dz)
"""
X1, Y1, WFE1, nids1 = data1['X'], data1['Y'], data1['WFE'], data1['node_ids']
X2, Y2, WFE2, nids2 = data2['X'], data2['Y'], data2['WFE'], data2['node_ids']
# Build reference maps for all data
ref_map = {int(nid): (x, y, w) for nid, x, y, w in zip(nids2, X2, Y2, WFE2)}
# Maps for displacement components
dx1_map = {int(nid): d for nid, d in zip(data1['node_ids'], data1['dx'])}
dy1_map = {int(nid): d for nid, d in zip(data1['node_ids'], data1['dy'])}
dz1_map = {int(nid): d for nid, d in zip(data1['node_ids'], data1['dz'])}
dx2_map = {int(nid): d for nid, d in zip(data2['node_ids'], data2['dx'])}
dy2_map = {int(nid): d for nid, d in zip(data2['node_ids'], data2['dy'])}
dz2_map = {int(nid): d for nid, d in zip(data2['node_ids'], data2['dz'])}
X_rel, Y_rel, WFE_rel, nids_rel = [], [], [], []
dx_rel, dy_rel, dz_rel = [], [], []
lateral_rel = []
for nid, x, y, w in zip(nids1, X1, Y1, WFE1):
nid = int(nid)
if nid not in ref_map:
continue
if nid not in dx2_map:
continue
_, _, w_ref = ref_map[nid]
X_rel.append(x)
Y_rel.append(y)
WFE_rel.append(w - w_ref)
nids_rel.append(nid)
# Compute relative displacements (target - reference)
dx_rel.append(dx1_map[nid] - dx2_map[nid])
dy_rel.append(dy1_map[nid] - dy2_map[nid])
dz_rel.append(dz1_map[nid] - dz2_map[nid])
# Relative lateral displacement magnitude
lat1 = np.sqrt(dx1_map[nid]**2 + dy1_map[nid]**2)
lat2 = np.sqrt(dx2_map[nid]**2 + dy2_map[nid]**2)
lateral_rel.append(lat1 - lat2)
return {
'X': np.array(X_rel),
'Y': np.array(Y_rel),
'WFE': np.array(WFE_rel),
'node_ids': np.array(nids_rel),
'dx': np.array(dx_rel), # Relative X displacement (mm)
'dy': np.array(dy_rel), # Relative Y displacement (mm)
'dz': np.array(dz_rel), # Relative Z displacement (mm)
'lateral_disp': np.array(lateral_rel) if lateral_rel else np.zeros(len(X_rel)),
'method': data1.get('method', 'unknown'),
}
def _compute_metrics(
self,
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
n_modes: int,
filter_orders: int
) -> Dict[str, Any]:
"""Compute RMS metrics and Zernike coefficients."""
coeffs, R = compute_zernike_coeffs(X, Y, W_nm, n_modes)
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
r = np.hypot(Xc / R, Yc / R)
th = np.arctan2(Yc, Xc)
Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, n_modes + 1)])
W_res_filt = W_nm - Z[:, :filter_orders].dot(coeffs[:filter_orders])
W_res_filt_j1to3 = W_nm - Z[:, :3].dot(coeffs[:3])
return {
'coefficients': coeffs,
'R': R,
'global_rms': float(np.sqrt(np.mean(W_nm**2))),
'filtered_rms': float(np.sqrt(np.mean(W_res_filt**2))),
'rms_filter_j1to3': float(np.sqrt(np.mean(W_res_filt_j1to3**2))),
'W_res_filt': W_res_filt,
}
def _compute_aberration_magnitudes(self, coeffs: np.ndarray) -> Dict[str, float]:
"""Compute magnitude of specific aberration modes."""
return {
'defocus_nm': float(abs(coeffs[3])) if len(coeffs) > 3 else 0.0,
'astigmatism_rms': float(np.sqrt(coeffs[4]**2 + coeffs[5]**2)) if len(coeffs) > 5 else 0.0,
'coma_rms': float(np.sqrt(coeffs[6]**2 + coeffs[7]**2)) if len(coeffs) > 7 else 0.0,
'trefoil_rms': float(np.sqrt(coeffs[8]**2 + coeffs[9]**2)) if len(coeffs) > 9 else 0.0,
'spherical_nm': float(abs(coeffs[10])) if len(coeffs) > 10 else 0.0,
}
def _generate_view_html(
self,
title: str,
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
rms_data: Dict,
config: Dict,
is_relative: bool = False,
ref_title: str = "20 deg",
abs_pair: Optional[Tuple[float, float]] = None,
is_manufacturing: bool = False,
mfg_metrics: Optional[Dict] = None,
correction_metrics: Optional[Dict] = None,
) -> str:
"""Generate HTML for a single view."""
_load_dependencies()
n_modes = config.get('n_modes', 50)
amp = config.get('amp', 0.5)
pancake = config.get('pancake', 3.0)
downsample = config.get('plot_downsample', 10000)
colorscale = config.get('colorscale', 'Turbo')
show_bar = config.get('show_bar_chart', True)
coeffs = rms_data['coefficients']
global_rms = rms_data['global_rms']
filtered_rms = rms_data['filtered_rms']
W_res_filt = rms_data['W_res_filt']
labels = [zernike_label(j) for j in range(1, n_modes + 1)]
coeff_abs = np.abs(coeffs)
# Downsample
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp, Wp = X[sel], Y[sel], W_res_filt[sel]
else:
Xp, Yp, Wp = X, Y, W_res_filt
res_amp = amp * Wp
max_amp = float(np.max(np.abs(res_amp))) if res_amp.size else 1.0
# Build mesh
mesh_traces = []
try:
tri = _Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
mesh_traces.append(_go.Mesh3d(
x=Xp, y=Yp, z=res_amp,
i=i, j=j, k=k,
intensity=res_amp,
colorscale=colorscale,
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3,
roughness=0.5, fresnel=0.2),
lightposition=dict(x=100, y=200, z=300),
showscale=True,
colorbar=dict(title=dict(text="Residual (nm)", side="right"),
thickness=15, len=0.6, tickformat=".1f"),
hovertemplate="X: %{x:.1f}
Y: %{y:.1f}
Residual: %{z:.2f} nm"
))
except Exception:
pass
if not mesh_traces:
mesh_traces.append(_go.Scatter3d(
x=Xp, y=Yp, z=res_amp,
mode='markers',
marker=dict(size=2, color=res_amp, colorscale=colorscale, showscale=True),
showlegend=False
))
title_suffix = f" (relative to {ref_title})" if is_relative else " (absolute)"
# Build subplots
if is_manufacturing and mfg_metrics and correction_metrics:
fig = _make_subplots(
rows=5, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}], [{"type": "table"}],
[{"type": "table"}], [{"type": "xy"}]],
row_heights=[0.38, 0.12, 0.12, 0.18, 0.20],
vertical_spacing=0.025,
subplot_titles=[
f"Surface Residual - {title}{title_suffix}",
"RMS Metrics (Absolute 90 deg)",
"Mode Magnitudes at 90 deg",
"Pre-Correction (90 deg - 20 deg)",
"|Zernike Coefficients| (nm)"
]
)
elif show_bar:
fig = _make_subplots(
rows=4, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}],
[{"type": "table"}], [{"type": "xy"}]],
row_heights=[0.45, 0.12, 0.25, 0.18],
vertical_spacing=0.03,
subplot_titles=[
f"Surface Residual - {title}{title_suffix}",
"RMS Metrics",
f"Zernike Coefficients ({n_modes} modes)",
"|Zernike Coefficients| (nm)"
]
)
else:
fig = _make_subplots(
rows=3, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}], [{"type": "table"}]],
row_heights=[0.55, 0.15, 0.30],
vertical_spacing=0.03,
subplot_titles=[
f"Surface Residual - {title}{title_suffix}",
"RMS Metrics",
f"Zernike Coefficients ({n_modes} modes)"
]
)
# Add mesh
for tr in mesh_traces:
fig.add_trace(tr, row=1, col=1)
# Configure 3D scene
fig.update_scenes(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
xaxis=dict(title="X (mm)", showgrid=True,
gridcolor='rgba(128,128,128,0.3)',
showbackground=True, backgroundcolor='rgba(240,240,240,0.9)'),
yaxis=dict(title="Y (mm)", showgrid=True,
gridcolor='rgba(128,128,128,0.3)',
showbackground=True, backgroundcolor='rgba(240,240,240,0.9)'),
zaxis=dict(title="Residual (nm)",
range=[-max_amp * pancake, max_amp * pancake],
showgrid=True, gridcolor='rgba(128,128,128,0.3)',
showbackground=True, backgroundcolor='rgba(230,230,250,0.9)'),
aspectmode='manual',
aspectratio=dict(x=1, y=1, z=0.4)
)
# Add tables
if is_relative and abs_pair:
abs_global, abs_filtered = abs_pair
fig.add_trace(_go.Table(
header=dict(values=["Metric", "Relative (nm)", "Absolute (nm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Global RMS", "Filtered RMS (J1-J4 removed)"],
[f"{global_rms:.2f}", f"{filtered_rms:.2f}"],
[f"{abs_global:.2f}", f"{abs_filtered:.2f}"],
], align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
elif is_manufacturing and mfg_metrics and correction_metrics:
fig.add_trace(_go.Table(
header=dict(values=["Metric", "Value (nm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Global RMS", "Filtered RMS (J1-J4)"],
[f"{global_rms:.2f}", f"{filtered_rms:.2f}"]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
fig.add_trace(_go.Table(
header=dict(values=["Mode", "Value (nm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Filtered RMS (J1-J3, with defocus)", "Astigmatism (J5+J6)",
"Coma (J7+J8)", "Trefoil (J9+J10)", "Spherical (J11)"],
[f"{rms_data['rms_filter_j1to3']:.2f}",
f"{mfg_metrics['astigmatism_rms']:.2f}",
f"{mfg_metrics['coma_rms']:.2f}",
f"{mfg_metrics['trefoil_rms']:.2f}",
f"{mfg_metrics['spherical_nm']:.2f}"]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=3, col=1)
fig.add_trace(_go.Table(
header=dict(values=["Mode", "Correction (nm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Total RMS (J1-J3 filter)", "Defocus (J4)",
"Astigmatism (J5+J6)", "Coma (J7+J8)"],
[f"{correction_metrics['rms_filter_j1to3']:.2f}",
f"{correction_metrics['defocus_nm']:.2f}",
f"{correction_metrics['astigmatism_rms']:.2f}",
f"{correction_metrics['coma_rms']:.2f}"]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=4, col=1)
else:
fig.add_trace(_go.Table(
header=dict(values=["Metric", "Value (nm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Global RMS", "Filtered RMS (J1-J4 removed)"],
[f"{global_rms:.2f}", f"{filtered_rms:.2f}"]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
# Coefficients table
if not (is_manufacturing and mfg_metrics and correction_metrics):
fig.add_trace(_go.Table(
header=dict(values=["Noll j", "Label", "|Coeff| (nm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
list(range(1, n_modes + 1)),
labels,
[f"{c:.3f}" for c in coeff_abs]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=3, col=1)
# Bar chart
if show_bar:
bar_row = 5 if (is_manufacturing and mfg_metrics and correction_metrics) else 4
fig.add_trace(
_go.Bar(
x=coeff_abs.tolist(), y=labels,
orientation='h', marker_color='#6366f1',
hovertemplate="%{y}
|Coeff| = %{x:.3f} nm",
showlegend=False
),
row=bar_row, col=1
)
# Layout
height = 1500 if (is_manufacturing and mfg_metrics and correction_metrics) else 1300
fig.update_layout(
width=1400, height=height,
margin=dict(t=60, b=20, l=20, r=20),
paper_bgcolor='#111827', plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(text=f"Atomizer Zernike Analysis - {title}",
x=0.5, font=dict(size=18))
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate_lateral_map_html(
self,
title: str,
data: Dict[str, np.ndarray],
config: Dict,
) -> str:
"""Generate HTML for lateral displacement visualization.
Shows a 3D surface colored by lateral displacement magnitude,
with metrics table showing max/RMS/mean lateral displacement.
"""
_load_dependencies()
X = data['X']
Y = data['Y']
lateral_disp = data['lateral_disp'] # in mm
downsample = config.get('plot_downsample', 10000)
# Convert to µm for display
lateral_um = lateral_disp * 1000.0 # mm to µm
# Downsample
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp, Lp = X[sel], Y[sel], lateral_um[sel]
else:
Xp, Yp, Lp = X, Y, lateral_um
# Build mesh
mesh_traces = []
try:
tri = _Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
mesh_traces.append(_go.Mesh3d(
x=Xp, y=Yp, z=Lp,
i=i, j=j, k=k,
intensity=Lp,
colorscale='Viridis',
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
lightposition=dict(x=100, y=200, z=300),
showscale=True,
colorbar=dict(title=dict(text="Lateral (µm)", side="right"),
thickness=15, len=0.6, tickformat=".3f"),
hovertemplate="X: %{x:.1f}
Y: %{y:.1f}
Lateral: %{z:.4f} µm"
))
except Exception:
pass
if not mesh_traces:
mesh_traces.append(_go.Scatter3d(
x=Xp, y=Yp, z=Lp,
mode='markers',
marker=dict(size=2, color=Lp, colorscale='Viridis', showscale=True),
showlegend=False
))
# Create figure with subplots
fig = _make_subplots(
rows=2, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}]],
row_heights=[0.75, 0.25],
vertical_spacing=0.03,
subplot_titles=[
f"Lateral Displacement Map - {title}",
"Lateral Displacement Statistics"
]
)
for tr in mesh_traces:
fig.add_trace(tr, row=1, col=1)
# Stats
max_lat = float(np.max(lateral_um))
rms_lat = float(np.sqrt(np.mean(lateral_um**2)))
mean_lat = float(np.mean(lateral_um))
min_lat = float(np.min(lateral_um))
fig.add_trace(_go.Table(
header=dict(values=["Statistic", "Value (µm)"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Max Lateral", "RMS Lateral", "Mean Lateral", "Min Lateral"],
[f"{max_lat:.4f}", f"{rms_lat:.4f}", f"{mean_lat:.4f}", f"{min_lat:.4f}"]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
# Configure 3D scene
max_z = float(np.max(Lp)) if Lp.size else 1.0
fig.update_scenes(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
zaxis=dict(title="Lateral (µm)",
range=[0, max_z * 1.2],
showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
aspectmode='manual',
aspectratio=dict(x=1, y=1, z=0.4)
)
fig.update_layout(
width=1200, height=900,
margin=dict(t=60, b=20, l=20, r=20),
paper_bgcolor='#111827', plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(text=f"Lateral Displacement Analysis - {title}",
x=0.5, font=dict(size=18))
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate_dual_method_view_html(
self,
title: str,
data_std: Dict[str, np.ndarray],
data_opd: Dict[str, np.ndarray],
rms_std: Dict,
rms_opd: Dict,
config: Dict,
is_relative: bool = False,
ref_title: str = "20 deg",
) -> str:
"""Generate HTML with toggle between Standard/OPD methods AND X/Y/Z displacement components.
For relative views, provides toggles to see:
- WFE (Z): The main wavefront error view (default)
- ΔX: Relative X displacement between subcases
- ΔY: Relative Y displacement between subcases
- ΔZ: Relative Z displacement between subcases
"""
_load_dependencies()
n_modes = config.get('n_modes', 50)
amp = config.get('amp', 0.5)
pancake = config.get('pancake', 3.0)
downsample = config.get('plot_downsample', 10000)
colorscale = config.get('colorscale', 'Turbo')
title_suffix = f" (relative to {ref_title})" if is_relative else " (absolute)"
# Build traces for both methods (WFE view)
traces_std_wfe = []
traces_opd_wfe = []
# Build displacement component traces (OPD method only, for relative views)
traces_dx = []
traces_dy = []
traces_dz = []
# Helper to build mesh trace
def build_mesh_trace(Xp, Yp, Zp, colorscale, label, unit, colorbar_title):
try:
tri = _Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
return _go.Mesh3d(
x=Xp, y=Yp, z=Zp,
i=i, j=j, k=k,
intensity=Zp,
colorscale=colorscale,
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
lightposition=dict(x=100, y=200, z=300),
showscale=True,
colorbar=dict(title=dict(text=colorbar_title, side="right"),
thickness=15, len=0.6, tickformat=".2f" if 'µm' in unit else ".1f"),
hovertemplate=f"{label}
X: %{{x:.1f}}
Y: %{{y:.1f}}
Value: %{{z:.3f}} {unit}"
)
except Exception:
pass
return _go.Scatter3d(
x=Xp, y=Yp, z=Zp,
mode='markers',
marker=dict(size=2, color=Zp, colorscale=colorscale, showscale=True),
showlegend=False
)
# Build WFE traces for both methods
for data, rms_data, traces, method_name in [
(data_std, rms_std, traces_std_wfe, 'Standard'),
(data_opd, rms_opd, traces_opd_wfe, 'OPD')
]:
X, Y = data['X'], data['Y']
W_res_filt = rms_data['W_res_filt']
# Downsample
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp, Wp = X[sel], Y[sel], W_res_filt[sel]
else:
Xp, Yp, Wp = X, Y, W_res_filt
res_amp = amp * Wp
traces.append(build_mesh_trace(Xp, Yp, res_amp, colorscale, method_name, 'nm', f'{method_name} WFE (nm)'))
# Build displacement component traces (for relative views)
has_displacement_data = is_relative and 'dx' in data_opd and len(data_opd.get('dx', [])) > 0
if has_displacement_data:
X, Y = data_opd['X'], data_opd['Y']
dx_mm = data_opd['dx'] # mm
dy_mm = data_opd['dy'] # mm
dz_mm = data_opd['dz'] # mm
# Convert to µm for display
dx_um = dx_mm * 1000.0
dy_um = dy_mm * 1000.0
dz_um = dz_mm * 1000.0
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp = X[sel], Y[sel]
dxp, dyp, dzp = dx_um[sel], dy_um[sel], dz_um[sel]
else:
Xp, Yp = X, Y
dxp, dyp, dzp = dx_um, dy_um, dz_um
# Apply visual amplification for displacement views
disp_amp = amp * 1000.0 # Scale factor for µm display
traces_dx.append(build_mesh_trace(Xp, Yp, dxp * amp, 'RdBu_r', 'ΔX Displacement', 'µm', 'ΔX (µm)'))
traces_dy.append(build_mesh_trace(Xp, Yp, dyp * amp, 'RdBu_r', 'ΔY Displacement', 'µm', 'ΔY (µm)'))
traces_dz.append(build_mesh_trace(Xp, Yp, dzp * amp, 'RdBu_r', 'ΔZ Displacement', 'µm', 'ΔZ (µm)'))
# Create figure
fig = _make_subplots(
rows=2, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}]],
row_heights=[0.65, 0.35],
vertical_spacing=0.05,
subplot_titles=[
f"Surface Analysis - {title}{title_suffix}",
"Metrics Comparison"
]
)
# Add all traces in order: [std_wfe, opd_wfe, dx, dy, dz, table]
# Start with OPD WFE visible (default view)
for tr in traces_std_wfe:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
for tr in traces_opd_wfe:
tr.visible = True # Default view
fig.add_trace(tr, row=1, col=1)
for tr in traces_dx:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
for tr in traces_dy:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
for tr in traces_dz:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
# Compute lateral stats (from OPD data)
lateral_um = data_opd.get('lateral_disp', np.zeros(1)) * 1000.0
max_lat = float(np.max(np.abs(lateral_um)))
rms_lat = float(np.sqrt(np.mean(lateral_um**2)))
# Compute % difference
std_filt = rms_std['filtered_rms']
opd_filt = rms_opd['filtered_rms']
pct_diff = 100.0 * (opd_filt - std_filt) / std_filt if std_filt > 0 else 0.0
# Displacement stats (for relative views)
disp_stats_rows = []
if has_displacement_data:
dx_um = data_opd['dx'] * 1000.0
dy_um = data_opd['dy'] * 1000.0
dz_um = data_opd['dz'] * 1000.0
disp_stats_rows = [
"ΔX RMS (µm)", "ΔY RMS (µm)", "ΔZ RMS (µm)"
]
disp_stats_values_std = ["—", "—", "—"]
disp_stats_values_opd = [
f"{float(np.sqrt(np.mean(dx_um**2))):.4f}",
f"{float(np.sqrt(np.mean(dy_um**2))):.4f}",
f"{float(np.sqrt(np.mean(dz_um**2))):.4f}"
]
disp_stats_diff = ["—", "—", "—"]
# Comparison table
table_headers = ["Metric", "Standard (Z-only)", "OPD (X,Y,Z)", "Difference"]
table_rows = ["Global RMS (nm)", "Filtered RMS (nm)", "Method", "Max Lateral (µm)", "RMS Lateral (µm)"]
table_std = [f"{rms_std['global_rms']:.2f}", f"{std_filt:.2f}", "Z-displacement only", "—", "—"]
table_opd = [f"{rms_opd['global_rms']:.2f}", f"{opd_filt:.2f}", "Deformed coords + OPD", f"{max_lat:.3f}", f"{rms_lat:.3f}"]
table_diff = ["—", f"{pct_diff:+.2f}%", "← RECOMMENDED", "—", "—"]
if has_displacement_data:
table_rows.extend(disp_stats_rows)
table_std.extend(disp_stats_values_std)
table_opd.extend(disp_stats_values_opd)
table_diff.extend(disp_stats_diff)
fig.add_trace(_go.Table(
header=dict(values=table_headers, align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[table_rows, table_std, table_opd, table_diff],
align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
# Build visibility arrays for toggles
n_std_wfe = len(traces_std_wfe)
n_opd_wfe = len(traces_opd_wfe)
n_dx = len(traces_dx)
n_dy = len(traces_dy)
n_dz = len(traces_dz)
# Total traces before table: n_std_wfe + n_opd_wfe + n_dx + n_dy + n_dz
# Then table is last
def make_visibility(show_std_wfe=False, show_opd_wfe=False, show_dx=False, show_dy=False, show_dz=False):
vis = []
vis.extend([show_std_wfe] * n_std_wfe)
vis.extend([show_opd_wfe] * n_opd_wfe)
vis.extend([show_dx] * n_dx)
vis.extend([show_dy] * n_dy)
vis.extend([show_dz] * n_dz)
vis.append(True) # Table always visible
return vis
# Build button definitions
buttons_method = [
dict(label="OPD Method (Recommended)",
method="update",
args=[{"visible": make_visibility(show_opd_wfe=True)}]),
dict(label="Standard Method (Z-only)",
method="update",
args=[{"visible": make_visibility(show_std_wfe=True)}]),
]
buttons_component = [
dict(label="WFE (Z)",
method="update",
args=[{"visible": make_visibility(show_opd_wfe=True)}]),
]
if has_displacement_data:
buttons_component.extend([
dict(label="ΔX Disp",
method="update",
args=[{"visible": make_visibility(show_dx=True)}]),
dict(label="ΔY Disp",
method="update",
args=[{"visible": make_visibility(show_dy=True)}]),
dict(label="ΔZ Disp",
method="update",
args=[{"visible": make_visibility(show_dz=True)}]),
])
# Create update menus
updatemenus = [
dict(
type="buttons",
direction="right",
x=0.0, y=1.15,
xanchor="left",
showactive=True,
buttons=buttons_method,
font=dict(size=11),
pad=dict(r=10, t=10),
),
]
if has_displacement_data:
updatemenus.append(
dict(
type="buttons",
direction="right",
x=0.55, y=1.15,
xanchor="left",
showactive=True,
buttons=buttons_component,
font=dict(size=11),
pad=dict(r=10, t=10),
)
)
fig.update_layout(updatemenus=updatemenus)
# Configure 3D scene
max_amp_opd = float(np.max(np.abs(amp * rms_opd['W_res_filt']))) if rms_opd['W_res_filt'].size else 1.0
fig.update_scenes(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
zaxis=dict(title="Value",
range=[-max_amp_opd * pancake, max_amp_opd * pancake],
showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
aspectmode='manual',
aspectratio=dict(x=1, y=1, z=0.4)
)
fig.update_layout(
width=1400, height=1100,
margin=dict(t=100, b=20, l=20, r=20),
paper_bgcolor='#111827', plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(text=f"Atomizer Zernike Analysis - {title}",
x=0.5, font=dict(size=18)),
annotations=[
dict(text="Method:", x=0.0, y=1.18, xref="paper", yref="paper",
showarrow=False, font=dict(size=12, color='white'), xanchor='left'),
dict(text="View:", x=0.55, y=1.18, xref="paper", yref="paper",
showarrow=False, font=dict(size=12, color='white'), xanchor='left') if has_displacement_data else {},
]
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate(self, config: InsightConfig) -> InsightResult:
"""Generate all Zernike WFE views with Standard/OPD toggle and lateral maps.
Performance optimizations:
- Uses cached OPD extractor (reads OP2/BDF only once)
- Loads all subcase data upfront to minimize I/O
- Standard method reuses geometry from already-loaded data
"""
self._load_data()
# Merge config
cfg = {**DEFAULT_CONFIG, **config.extra}
cfg['colorscale'] = config.extra.get('colorscale', cfg['colorscale'])
cfg['amp'] = config.amplification if config.amplification != 1.0 else cfg['amp']
n_modes = cfg['n_modes']
filter_orders = cfg['filter_low_orders']
disp_unit = cfg['disp_unit']
# Map subcases
disps = self._displacements
if '1' in disps and '2' in disps:
sc_map = {'90': '1', '20': '2', '40': '3', '60': '4'}
elif '90' in disps and '20' in disps:
sc_map = {'90': '90', '20': '20', '40': '40', '60': '60'}
else:
available = sorted(disps.keys(), key=lambda x: int(x) if x.isdigit() else 0)
if len(available) >= 4:
sc_map = {'90': available[0], '20': available[1],
'40': available[2], '60': available[3]}
else:
return InsightResult(success=False,
error=f"Need 4 subcases, found: {available}")
# Check subcases
for angle, label in sc_map.items():
if label not in disps:
return InsightResult(success=False,
error=f"Subcase '{label}' (angle {angle}) not found")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = config.output_dir or self.insights_path
output_dir.mkdir(parents=True, exist_ok=True)
html_files = []
summary = {}
# Load data for BOTH methods - OPD method shares cached extractor
# Standard method uses already-loaded _node_geo and _displacements
# Reference: 20 deg
data_ref_std = self._build_wfe_arrays_standard(sc_map['20'], disp_unit)
data_ref_opd = self._build_wfe_arrays_opd(sc_map['20'], disp_unit)
# 40 deg
data_40_std = self._build_wfe_arrays_standard(sc_map['40'], disp_unit)
data_40_opd = self._build_wfe_arrays_opd(sc_map['40'], disp_unit)
# 60 deg
data_60_std = self._build_wfe_arrays_standard(sc_map['60'], disp_unit)
data_60_opd = self._build_wfe_arrays_opd(sc_map['60'], disp_unit)
# 90 deg
data_90_std = self._build_wfe_arrays_standard(sc_map['90'], disp_unit)
data_90_opd = self._build_wfe_arrays_opd(sc_map['90'], disp_unit)
# =========================================
# 40 deg vs 20 deg (with dual method toggle)
# =========================================
rel_40_std = self._compute_relative_wfe(data_40_std, data_ref_std)
rel_40_opd = self._compute_relative_wfe(data_40_opd, data_ref_opd)
rms_40_std = self._compute_metrics(rel_40_std['X'], rel_40_std['Y'], rel_40_std['WFE'],
n_modes, filter_orders)
rms_40_opd = self._compute_metrics(rel_40_opd['X'], rel_40_opd['Y'], rel_40_opd['WFE'],
n_modes, filter_orders)
html_40 = self._generate_dual_method_view_html(
"40 deg", rel_40_std, rel_40_opd, rms_40_std, rms_40_opd, cfg,
is_relative=True, ref_title="20 deg")
path_40 = output_dir / f"zernike_{timestamp}_40_vs_20.html"
path_40.write_text(html_40, encoding='utf-8')
html_files.append(path_40)
# Lateral map for 40 deg
html_40_lat = self._generate_lateral_map_html("40 deg", data_40_opd, cfg)
path_40_lat = output_dir / f"zernike_{timestamp}_40_lateral.html"
path_40_lat.write_text(html_40_lat, encoding='utf-8')
html_files.append(path_40_lat)
summary['40_vs_20_filtered_rms_std'] = rms_40_std['filtered_rms']
summary['40_vs_20_filtered_rms_opd'] = rms_40_opd['filtered_rms']
# =========================================
# 60 deg vs 20 deg (with dual method toggle)
# =========================================
rel_60_std = self._compute_relative_wfe(data_60_std, data_ref_std)
rel_60_opd = self._compute_relative_wfe(data_60_opd, data_ref_opd)
rms_60_std = self._compute_metrics(rel_60_std['X'], rel_60_std['Y'], rel_60_std['WFE'],
n_modes, filter_orders)
rms_60_opd = self._compute_metrics(rel_60_opd['X'], rel_60_opd['Y'], rel_60_opd['WFE'],
n_modes, filter_orders)
html_60 = self._generate_dual_method_view_html(
"60 deg", rel_60_std, rel_60_opd, rms_60_std, rms_60_opd, cfg,
is_relative=True, ref_title="20 deg")
path_60 = output_dir / f"zernike_{timestamp}_60_vs_20.html"
path_60.write_text(html_60, encoding='utf-8')
html_files.append(path_60)
# Lateral map for 60 deg
html_60_lat = self._generate_lateral_map_html("60 deg", data_60_opd, cfg)
path_60_lat = output_dir / f"zernike_{timestamp}_60_lateral.html"
path_60_lat.write_text(html_60_lat, encoding='utf-8')
html_files.append(path_60_lat)
summary['60_vs_20_filtered_rms_std'] = rms_60_std['filtered_rms']
summary['60_vs_20_filtered_rms_opd'] = rms_60_opd['filtered_rms']
# =========================================
# 90 deg Manufacturing (absolute, with dual method toggle)
# =========================================
rms_90_std = self._compute_metrics(data_90_std['X'], data_90_std['Y'], data_90_std['WFE'],
n_modes, filter_orders)
rms_90_opd = self._compute_metrics(data_90_opd['X'], data_90_opd['Y'], data_90_opd['WFE'],
n_modes, filter_orders)
html_90 = self._generate_dual_method_view_html(
"90 deg (Manufacturing)", data_90_std, data_90_opd, rms_90_std, rms_90_opd, cfg,
is_relative=False)
path_90 = output_dir / f"zernike_{timestamp}_90_mfg.html"
path_90.write_text(html_90, encoding='utf-8')
html_files.append(path_90)
# Lateral map for 90 deg
html_90_lat = self._generate_lateral_map_html("90 deg (Manufacturing)", data_90_opd, cfg)
path_90_lat = output_dir / f"zernike_{timestamp}_90_mfg_lateral.html"
path_90_lat.write_text(html_90_lat, encoding='utf-8')
html_files.append(path_90_lat)
summary['90_mfg_filtered_rms_std'] = rms_90_std['filtered_rms']
summary['90_mfg_filtered_rms_opd'] = rms_90_opd['filtered_rms']
summary['90_optician_workload'] = rms_90_opd['rms_filter_j1to3']
# Lateral displacement summary
lateral_40 = data_40_opd.get('lateral_disp', np.zeros(1)) * 1000.0 # mm to µm
lateral_60 = data_60_opd.get('lateral_disp', np.zeros(1)) * 1000.0
lateral_90 = data_90_opd.get('lateral_disp', np.zeros(1)) * 1000.0
summary['lateral_40_max_um'] = float(np.max(lateral_40))
summary['lateral_60_max_um'] = float(np.max(lateral_60))
summary['lateral_90_max_um'] = float(np.max(lateral_90))
return InsightResult(
success=True,
html_path=html_files[0], # Return first as primary
summary={
'html_files': [str(p) for p in html_files],
**summary
}
)