""" Zernike OPD Method Comparison Insight ===================================== Compares standard Z-only Zernike method against rigorous OPD method that accounts for lateral (X, Y) displacements. Critical for understanding whether your optimization objective is capturing true optical performance. WHY THIS MATTERS: ----------------- When supports pinch or laterally deform the mirror, nodes shift in X and Y. The standard Zernike method ignores this, potentially leading to: - Optimized designs that "cheat" by distorting laterally - False low WFE when the actual optical surface is degraded - Optimizer convergence to non-optimal solutions This insight visualizes: 1. The difference between methods (where they disagree) 2. Lateral displacement magnitude map (where pinching occurs) 3. Which method you should be using for your study Applicable to: All mirror/optics studies, CRITICAL for lateral support optimization. """ from pathlib import Path from datetime import datetime from typing import Dict, Any, Optional, Tuple import numpy as np from .base import StudyInsight, InsightConfig, InsightResult, register_insight # Lazy imports _plotly_loaded = False _go = None _make_subplots = None _Triangulation = None def _load_dependencies(): """Lazy load heavy dependencies.""" global _plotly_loaded, _go, _make_subplots, _Triangulation if not _plotly_loaded: import plotly.graph_objects as go from plotly.subplots import make_subplots from matplotlib.tri import Triangulation _go = go _make_subplots = make_subplots _Triangulation = Triangulation _plotly_loaded = True # Default configuration DEFAULT_CONFIG = { 'amp': 0.5, # Visual deformation scale 'pancake': 3.0, # Z-axis range multiplier 'plot_downsample': 15000, 'colorscale_wfe': 'RdBu', # Diverging for WFE difference 'colorscale_lateral': 'Hot', # Sequential for lateral magnitude 'disp_unit': 'mm', 'n_modes': 50, 'filter_orders': 4, } @register_insight class ZernikeOPDComparisonInsight(StudyInsight): """ Compare standard vs OPD-rigorous Zernike methods. Generates visualizations showing: - Lateral displacement magnitude map (where pinching occurs) - WFE difference map (where methods disagree) - Quantitative comparison tables - Recommendation for which method to use """ insight_type = "zernike_opd_comparison" name = "Zernike Method Comparison (OPD vs Standard)" description = "Compare Z-only vs rigorous OPD Zernike methods - reveals lateral displacement impact" category = "optical" applicable_to = ["mirror", "optics", "wfe", "lateral"] required_files = ["*.op2"] def __init__(self, study_path: Path): super().__init__(study_path) self.op2_path: Optional[Path] = None self.geo_path: Optional[Path] = None self._node_geo: Optional[Dict] = None self._displacements: Optional[Dict] = None def can_generate(self) -> bool: """Check if OP2 and geometry files exist.""" search_paths = [ self.results_path, self.study_path / "2_iterations", self.setup_path / "model", ] for search_path in search_paths: if not search_path.exists(): continue op2_files = list(search_path.glob("**/*solution*.op2")) if not op2_files: op2_files = list(search_path.glob("**/*.op2")) if op2_files: self.op2_path = max(op2_files, key=lambda p: p.stat().st_mtime) break if self.op2_path is None: return False try: self.geo_path = self._find_geometry_file(self.op2_path) return True except FileNotFoundError: return False def _find_geometry_file(self, op2_path: Path) -> Path: """Find BDF/DAT geometry file for OP2.""" folder = op2_path.parent base = op2_path.stem for ext in ['.dat', '.bdf']: cand = folder / (base + ext) if cand.exists(): return cand for f in folder.iterdir(): if f.suffix.lower() in ['.dat', '.bdf']: return f raise FileNotFoundError(f"No geometry file found for {op2_path}") def _load_data(self): """Load geometry and displacement data.""" if self._node_geo is not None: return _load_dependencies() from pyNastran.op2.op2 import OP2 from pyNastran.bdf.bdf import BDF # Read geometry bdf = BDF() bdf.read_bdf(str(self.geo_path)) self._node_geo = {int(nid): node.get_position() for nid, node in bdf.nodes.items()} # Read displacements op2 = OP2() op2.read_op2(str(self.op2_path)) if not op2.displacements: raise RuntimeError("No displacement data in OP2") self._displacements = {} for key, darr in op2.displacements.items(): data = darr.data dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None) if dmat is None: continue ngt = darr.node_gridtype.astype(int) node_ids = ngt if ngt.ndim == 1 else ngt[:, 0] isubcase = getattr(darr, 'isubcase', None) label = str(isubcase) if isubcase else str(key) self._displacements[label] = { 'node_ids': node_ids.astype(int), 'disp': dmat.copy() } def _build_full_data( self, label: str, disp_unit: str = 'mm' ) -> Dict[str, np.ndarray]: """Build complete data arrays including X, Y, Z displacements.""" nm_per_unit = 1e6 if disp_unit == 'mm' else 1e9 um_per_unit = nm_per_unit / 1000.0 data = self._displacements[label] node_ids = data['node_ids'] dmat = data['disp'] x0, y0, z0 = [], [], [] dx, dy, dz = [], [], [] valid_nids = [] for nid, vec in zip(node_ids, dmat): geo = self._node_geo.get(int(nid)) if geo is None: continue x0.append(geo[0]) y0.append(geo[1]) z0.append(geo[2]) dx.append(vec[0]) dy.append(vec[1]) dz.append(vec[2]) valid_nids.append(nid) return { 'x0': np.array(x0), 'y0': np.array(y0), 'z0': np.array(z0), 'dx': np.array(dx), 'dy': np.array(dy), 'dz': np.array(dz), 'nids': np.array(valid_nids), 'nm_scale': nm_per_unit, 'um_scale': um_per_unit, } def _estimate_focal_length(self, x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float: """Estimate parabola focal length from geometry.""" r_squared = x**2 + y**2 mask = r_squared > 0 if not np.any(mask): return 5000.0 # Default fallback # Fit z = a * r² (assume centered parabola) A = r_squared[mask].reshape(-1, 1) b = z[mask] try: a, _, _, _ = np.linalg.lstsq(A, b, rcond=None) a = a[0] except: a = np.median(z[mask] / r_squared[mask]) if abs(a) < 1e-12: return 5000.0 return 1.0 / (4.0 * abs(a)) def _compute_opd_comparison( self, data: Dict[str, np.ndarray], n_modes: int, filter_orders: int ) -> Dict[str, Any]: """Compute both methods and return comparison data.""" from ..extractors.extract_zernike import compute_zernike_coefficients x0 = data['x0'] y0 = data['y0'] z0 = data['z0'] dx = data['dx'] dy = data['dy'] dz = data['dz'] nm_scale = data['nm_scale'] um_scale = data['um_scale'] # Estimate focal length focal = self._estimate_focal_length(x0, y0, z0) # === STANDARD METHOD (Z-only) === # Uses original (x0, y0) with Z-displacement wfe_standard = dz * 2.0 * nm_scale # WFE = 2 * surface error coeffs_std, R_std = compute_zernike_coefficients(x0, y0, wfe_standard, n_modes) # Compute filtered RMS x_c = x0 - np.mean(x0) y_c = y0 - np.mean(y0) r = np.hypot(x_c / R_std, y_c / R_std) th = np.arctan2(y_c, x_c) from ..extractors.extract_zernike import zernike_noll Z_low = np.column_stack([zernike_noll(j, r, th) for j in range(1, filter_orders + 1)]) wfe_std_filtered = wfe_standard - Z_low @ coeffs_std[:filter_orders] rms_std_filtered = float(np.sqrt(np.mean(wfe_std_filtered**2))) rms_std_global = float(np.sqrt(np.mean(wfe_standard**2))) # === RIGOROUS OPD METHOD === # Uses deformed (x0+dx, y0+dy) and computes true surface error # The CORRECT formulation uses DIFFERENTIAL approach: # surface_error = dz - delta_z_parabola # where delta_z_parabola is the Z change needed to stay on ideal parabola x_def = x0 + dx y_def = y0 + dy # Compute the CHANGE in parabola Z due to lateral displacement # For parabola: z = -r²/(4f), so: # Δz_parabola = z(x_def, y_def) - z(x0, y0) # = -[(x0+dx)² + (y0+dy)² - x0² - y0²] / (4f) r0_sq = x0**2 + y0**2 r_def_sq = x_def**2 + y_def**2 delta_r_sq = r_def_sq - r0_sq # For concave mirror, z = -r²/(4f), so Δz = -Δr²/(4f) delta_z_parabola = -delta_r_sq / (4.0 * focal) # TRUE surface error = actual dz - expected dz (to stay on parabola) surface_error = dz - delta_z_parabola wfe_opd = surface_error * 2.0 * nm_scale coeffs_opd, R_opd = compute_zernike_coefficients(x_def, y_def, wfe_opd, n_modes) x_c_def = x_def - np.mean(x_def) y_c_def = y_def - np.mean(y_def) r_def = np.hypot(x_c_def / R_opd, y_c_def / R_opd) th_def = np.arctan2(y_c_def, x_c_def) Z_low_opd = np.column_stack([zernike_noll(j, r_def, th_def) for j in range(1, filter_orders + 1)]) wfe_opd_filtered = wfe_opd - Z_low_opd @ coeffs_opd[:filter_orders] rms_opd_filtered = float(np.sqrt(np.mean(wfe_opd_filtered**2))) rms_opd_global = float(np.sqrt(np.mean(wfe_opd**2))) # === LATERAL DISPLACEMENT === lateral_disp = np.sqrt(dx**2 + dy**2) * um_scale # in µm # === DIFFERENCE ANALYSIS === # Where methods disagree (use original coords for visualization) wfe_difference = wfe_opd - wfe_standard # Could be different due to coordinate system # More meaningful: compare the residuals return { # Coordinates for plotting 'x_plot': x0, 'y_plot': y0, # Standard method 'wfe_standard': wfe_standard, 'wfe_std_filtered': wfe_std_filtered, 'rms_std_global': rms_std_global, 'rms_std_filtered': rms_std_filtered, # OPD method 'wfe_opd': wfe_opd, 'wfe_opd_filtered': wfe_opd_filtered, 'rms_opd_global': rms_opd_global, 'rms_opd_filtered': rms_opd_filtered, 'x_deformed': x_def, 'y_deformed': y_def, # Lateral displacement 'lateral_disp_um': lateral_disp, 'max_lateral_um': float(np.max(lateral_disp)), 'rms_lateral_um': float(np.sqrt(np.mean(lateral_disp**2))), 'p99_lateral_um': float(np.percentile(lateral_disp, 99)), # Deltas 'delta_global': rms_opd_global - rms_std_global, 'delta_filtered': rms_opd_filtered - rms_std_filtered, 'pct_diff_filtered': 100.0 * (rms_opd_filtered - rms_std_filtered) / rms_std_filtered if rms_std_filtered > 0 else 0.0, # Parabola info 'focal_length': focal, } def _get_recommendation(self, comparison: Dict[str, Any]) -> Tuple[str, str]: """Generate recommendation based on comparison results.""" max_lateral = comparison['max_lateral_um'] pct_diff = abs(comparison['pct_diff_filtered']) if max_lateral > 10.0: severity = "CRITICAL" msg = (f"Large lateral displacements detected (max {max_lateral:.1f} µm). " f"Methods differ by {pct_diff:.1f}%. " f"You MUST use OPD method for accurate optimization.") elif max_lateral > 1.0: severity = "RECOMMENDED" msg = (f"Significant lateral displacements (max {max_lateral:.2f} µm). " f"Methods differ by {pct_diff:.1f}%. " f"OPD method recommended for better accuracy.") elif max_lateral > 0.1: severity = "OPTIONAL" msg = (f"Small lateral displacements (max {max_lateral:.3f} µm). " f"Methods differ by {pct_diff:.2f}%. " f"OPD method provides minor improvement.") else: severity = "EQUIVALENT" msg = (f"Negligible lateral displacements (max {max_lateral:.4f} µm). " f"Both methods give essentially identical results.") return severity, msg def _generate_html( self, title: str, comparison: Dict[str, Any], config: Dict[str, Any] ) -> str: """Generate comparison HTML with multiple views.""" _load_dependencies() downsample = config.get('plot_downsample', 15000) colorscale_wfe = config.get('colorscale_wfe', 'RdBu') colorscale_lateral = config.get('colorscale_lateral', 'Hot') amp = config.get('amp', 0.5) x = comparison['x_plot'] y = comparison['y_plot'] lateral = comparison['lateral_disp_um'] wfe_std = comparison['wfe_std_filtered'] wfe_opd = comparison['wfe_opd_filtered'] # Downsample if needed n = len(x) if n > downsample: rng = np.random.default_rng(42) sel = rng.choice(n, size=downsample, replace=False) x, y = x[sel], y[sel] lateral = lateral[sel] wfe_std = wfe_std[sel] wfe_opd = wfe_opd[sel] # Get recommendation severity, recommendation = self._get_recommendation(comparison) severity_color = { 'CRITICAL': '#ef4444', 'RECOMMENDED': '#f59e0b', 'OPTIONAL': '#3b82f6', 'EQUIVALENT': '#10b981' }.get(severity, '#6b7280') # Create subplots: 2 rows, 2 cols # Row 1: Lateral displacement map, WFE difference map # Row 2: Comparison table, Recommendation fig = _make_subplots( rows=2, cols=2, specs=[ [{"type": "scene"}, {"type": "scene"}], [{"type": "table", "colspan": 2}, None] ], row_heights=[0.65, 0.35], column_widths=[0.5, 0.5], vertical_spacing=0.08, horizontal_spacing=0.05, subplot_titles=[ "Lateral Displacement Magnitude (µm)", "Filtered WFE Comparison (nm)", "Method Comparison" ] ) # === Plot 1: Lateral displacement map === try: tri = _Triangulation(x, y) if tri.triangles is not None and len(tri.triangles) > 0: i, j, k = tri.triangles.T fig.add_trace(_go.Mesh3d( x=x, y=y, z=lateral * amp * 100, # Scale for visibility i=i, j=j, k=k, intensity=lateral, colorscale=colorscale_lateral, opacity=1.0, flatshading=False, lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3), showscale=True, colorbar=dict( title=dict(text="Lateral (µm)", side="right"), x=0.42, thickness=15, len=0.5 ), hovertemplate="X: %{x:.1f}
Y: %{y:.1f}
Lateral: %{intensity:.3f} µm" ), row=1, col=1) except: fig.add_trace(_go.Scatter3d( x=x, y=y, z=lateral * amp * 100, mode='markers', marker=dict(size=2, color=lateral, colorscale=colorscale_lateral, showscale=True) ), row=1, col=1) # === Plot 2: WFE comparison (show OPD method) === try: tri = _Triangulation(x, y) if tri.triangles is not None and len(tri.triangles) > 0: i, j, k = tri.triangles.T fig.add_trace(_go.Mesh3d( x=x, y=y, z=wfe_opd * amp, i=i, j=j, k=k, intensity=wfe_opd, colorscale='Turbo', opacity=1.0, flatshading=False, lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3), showscale=True, colorbar=dict( title=dict(text="WFE (nm)", side="right"), x=1.0, thickness=15, len=0.5 ), hovertemplate="X: %{x:.1f}
Y: %{y:.1f}
WFE: %{intensity:.2f} nm" ), row=1, col=2) except: fig.add_trace(_go.Scatter3d( x=x, y=y, z=wfe_opd * amp, mode='markers', marker=dict(size=2, color=wfe_opd, colorscale='Turbo', showscale=True) ), row=1, col=2) # Configure 3D scenes for scene_name in ['scene', 'scene2']: fig.update_layout(**{ scene_name: dict( camera=dict(eye=dict(x=1.2, y=1.2, z=0.8)), xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'), yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'), zaxis=dict(title="", showgrid=True, gridcolor='rgba(128,128,128,0.3)'), aspectmode='data', ) }) # === Comparison Table === delta_sign = "+" if comparison['delta_filtered'] > 0 else "" fig.add_trace(_go.Table( header=dict( values=["Metric", "Standard (Z-only)", "OPD (Rigorous)", "Difference"], align="left", fill_color='#1f2937', font=dict(color='white', size=12), height=30 ), cells=dict( values=[ # Metric names ["Global RMS (nm)", "Filtered RMS (nm)", "Max Lateral Disp (µm)", "RMS Lateral Disp (µm)", "P99 Lateral Disp (µm)", "Focal Length (mm)", f"RECOMMENDATION: {severity}"], # Standard method [f"{comparison['rms_std_global']:.2f}", f"{comparison['rms_std_filtered']:.2f}", "N/A", "N/A", "N/A", f"{comparison['focal_length']:.1f}", ""], # OPD method [f"{comparison['rms_opd_global']:.2f}", f"{comparison['rms_opd_filtered']:.2f}", f"{comparison['max_lateral_um']:.3f}", f"{comparison['rms_lateral_um']:.3f}", f"{comparison['p99_lateral_um']:.3f}", f"{comparison['focal_length']:.1f}", ""], # Difference [f"{delta_sign}{comparison['delta_global']:.2f}", f"{delta_sign}{comparison['delta_filtered']:.2f} ({delta_sign}{comparison['pct_diff_filtered']:.1f}%)", "-", "-", "-", "-", recommendation], ], align="left", fill_color=[['#374151']*7, ['#374151']*7, ['#374151']*7, ['#374151']*6 + [severity_color]], font=dict(color='white', size=11), height=25 ) ), row=2, col=1) # Layout fig.update_layout( width=1600, height=1000, margin=dict(t=80, b=20, l=20, r=20), paper_bgcolor='#111827', plot_bgcolor='#1f2937', font=dict(color='white'), title=dict( text=f"Atomizer Zernike Method Comparison - {title}
" f"{severity}: {recommendation[:80]}...", x=0.5, font=dict(size=18) ) ) return fig.to_html(include_plotlyjs='cdn', full_html=True) def _generate(self, config: InsightConfig) -> InsightResult: """Generate method comparison visualization.""" self._load_data() cfg = {**DEFAULT_CONFIG, **config.extra} n_modes = cfg['n_modes'] filter_orders = cfg['filter_orders'] disp_unit = cfg['disp_unit'] # Find subcases disps = self._displacements available = list(disps.keys()) if not available: return InsightResult(success=False, error="No subcases found in OP2") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = config.output_dir or self.insights_path output_dir.mkdir(parents=True, exist_ok=True) html_files = [] all_comparisons = {} # Process each subcase for label in available: data = self._build_full_data(label, disp_unit) comparison = self._compute_opd_comparison(data, n_modes, filter_orders) title = f"Subcase {label}" html = self._generate_html(title, comparison, cfg) path = output_dir / f"zernike_opd_comparison_{timestamp}_{label}.html" path.write_text(html, encoding='utf-8') html_files.append(path) severity, _ = self._get_recommendation(comparison) all_comparisons[label] = { 'rms_std_filtered': comparison['rms_std_filtered'], 'rms_opd_filtered': comparison['rms_opd_filtered'], 'pct_diff': comparison['pct_diff_filtered'], 'max_lateral_um': comparison['max_lateral_um'], 'severity': severity, } # Determine overall recommendation severities = [c['severity'] for c in all_comparisons.values()] if 'CRITICAL' in severities: overall = "CRITICAL: Use OPD method for all optimization" elif 'RECOMMENDED' in severities: overall = "RECOMMENDED: Switch to OPD method for better accuracy" elif 'OPTIONAL' in severities: overall = "OPTIONAL: OPD method provides minor improvement" else: overall = "EQUIVALENT: Standard method is fine for this study" return InsightResult( success=True, html_path=html_files[0], summary={ 'html_files': [str(p) for p in html_files], 'comparisons': all_comparisons, 'overall_recommendation': overall, } )