feat: Add OPD method support to Zernike visualization with Standard/OPD toggle

Major improvements to Zernike WFE visualization:

- Add ZernikeDashboardInsight: Unified dashboard with all orientations (40°, 60°, 90°)
  on one page with light theme and executive summary
- Add OPD method toggle: Switch between Standard (Z-only) and OPD (X,Y,Z) methods
  in ZernikeWFEInsight with interactive buttons
- Add lateral displacement maps: Visualize X,Y displacement for each orientation
- Add displacement component views: Toggle between WFE, ΔX, ΔY, ΔZ in relative views
- Add metrics comparison table showing both methods side-by-side

New extractors:
- extract_zernike_figure.py: ZernikeOPDExtractor using BDF geometry interpolation
- extract_zernike_opd.py: Parabola-based OPD with focal length

Key finding: OPD method gives 8-11% higher WFE values than Standard method
(more conservative/accurate for surfaces with lateral displacement under gravity)

Documentation updates:
- SYS_12: Added E22 ZernikeOPD as recommended method
- SYS_16: Added ZernikeDashboard, updated ZernikeWFE with OPD features
- Cheatsheet: Added Zernike method comparison table

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-22 21:03:19 -05:00
parent d089003ced
commit d19fc39a2a
19 changed files with 8117 additions and 396 deletions

View File

@@ -8,36 +8,57 @@ reality of specific designs through interactive 3D visualizations.
Architecture:
- StudyInsight: Abstract base class for all insight types
- InsightRegistry: Central registry for available insight types
- InsightSpec: Config-driven insight specification (from optimization_config.json)
- InsightReport: Aggregates insights into HTML/PDF report with appendix
- Each insight generates standalone HTML or Plotly data for dashboard
Available Insight Types:
-----------------------
| Type ID | Name | Description |
|----------------|------------------------|------------------------------------------|
| zernike_wfe | Zernike WFE Analysis | 3D wavefront error with Zernike decomp |
| stress_field | Stress Distribution | Von Mises stress contours |
| modal | Modal Analysis | Natural frequencies and mode shapes |
| thermal | Thermal Analysis | Temperature distribution |
| design_space | Design Space Explorer | Parameter-objective relationships |
| Type ID | Name | Description |
|------------------|------------------------|------------------------------------------|
| zernike_dashboard| Zernike Dashboard | RECOMMENDED: Unified WFE dashboard |
| zernike_wfe | Zernike WFE Analysis | 3D wavefront error with Zernike decomp |
| msf_zernike | MSF Zernike Analysis | Mid-spatial frequency band decomposition |
| stress_field | Stress Distribution | Von Mises stress contours |
| modal | Modal Analysis | Natural frequencies and mode shapes |
| thermal | Thermal Analysis | Temperature distribution |
| design_space | Design Space Explorer | Parameter-objective relationships |
Config Schema (in optimization_config.json):
-------------------------------------------
```json
"insights": [
{
"type": "zernike_wfe",
"name": "WFE at 40 vs 20 deg",
"enabled": true,
"linked_objective": "rel_filtered_rms_40_vs_20",
"config": { "target_subcase": "3", "reference_subcase": "2" },
"include_in_report": true
}
]
```
Quick Start:
-----------
```python
from optimization_engine.insights import get_insight, list_available_insights
from optimization_engine.insights import (
get_insight, list_available_insights,
get_configured_insights, recommend_insights_for_study, InsightReport
)
from pathlib import Path
study_path = Path("studies/my_study")
# List available insights for a study
available = list_available_insights(study_path)
print(available)
# Get recommended insights based on objectives
recommendations = recommend_insights_for_study(study_path)
for rec in recommendations:
print(f"{rec['type']}: {rec['reason']}")
# Generate a specific insight
insight = get_insight('zernike_wfe', study_path)
if insight and insight.can_generate():
result = insight.generate()
print(f"Generated: {result.html_path}")
print(f"Summary: {result.summary}")
# Generate report from configured insights
report = InsightReport(study_path)
results = report.generate_all() # Uses specs from optimization_config.json
report_path = report.generate_report_html() # Creates STUDY_INSIGHTS_REPORT.html
```
CLI Usage:
@@ -49,6 +70,12 @@ python -m optimization_engine.insights generate studies/my_study
# Generate specific insight type
python -m optimization_engine.insights generate studies/my_study --type zernike_wfe
# Generate full report from config
python -m optimization_engine.insights report studies/my_study
# Get recommendations for a study
python -m optimization_engine.insights recommend studies/my_study
# List available insight types
python -m optimization_engine.insights list
```
@@ -59,15 +86,22 @@ from .base import (
StudyInsight,
InsightConfig,
InsightResult,
InsightSpec,
InsightReport,
InsightRegistry,
register_insight,
get_insight,
list_insights,
list_available_insights,
get_configured_insights,
recommend_insights_for_study,
)
# Import insight implementations (triggers @register_insight decorators)
from .zernike_wfe import ZernikeWFEInsight
from .zernike_opd_comparison import ZernikeOPDComparisonInsight
from .msf_zernike import MSFZernikeInsight
from .zernike_dashboard import ZernikeDashboardInsight # NEW: Unified dashboard
from .stress_field import StressFieldInsight
from .modal_analysis import ModalInsight
from .thermal_field import ThermalInsight
@@ -79,6 +113,8 @@ __all__ = [
'StudyInsight',
'InsightConfig',
'InsightResult',
'InsightSpec',
'InsightReport',
'InsightRegistry',
'register_insight',
@@ -86,9 +122,14 @@ __all__ = [
'get_insight',
'list_insights',
'list_available_insights',
'get_configured_insights',
'recommend_insights_for_study',
# Insight implementations
'ZernikeWFEInsight',
'ZernikeOPDComparisonInsight',
'MSFZernikeInsight',
'ZernikeDashboardInsight', # NEW: Unified dashboard
'StressFieldInsight',
'ModalInsight',
'ThermalInsight',
@@ -142,10 +183,14 @@ if __name__ == '__main__':
print("Usage:")
print(" python -m optimization_engine.insights list")
print(" python -m optimization_engine.insights generate <study_path> [--type TYPE]")
print(" python -m optimization_engine.insights report <study_path>")
print(" python -m optimization_engine.insights recommend <study_path>")
print()
print("Commands:")
print(" list - List all registered insight types")
print(" generate - Generate insights for a study")
print(" list - List all registered insight types")
print(" generate - Generate insights for a study")
print(" report - Generate full report from config")
print(" recommend - Get AI recommendations for insights")
print()
print("Options:")
print(" --type TYPE Generate only the specified insight type")
@@ -166,6 +211,84 @@ if __name__ == '__main__':
print(f" Applies to: {', '.join(info['applicable_to'])}")
print()
elif command == 'recommend':
if len(sys.argv) < 3:
print("Error: Missing study path")
print_usage()
sys.exit(1)
study_path = Path(sys.argv[2])
if not study_path.exists():
print(f"Error: Study path does not exist: {study_path}")
sys.exit(1)
print(f"\nRecommended Insights for: {study_path.name}")
print("-" * 60)
recommendations = recommend_insights_for_study(study_path)
if not recommendations:
print("No recommendations available (no objectives found)")
sys.exit(0)
for rec in recommendations:
linked = f" -> {rec['linked_objective']}" if rec.get('linked_objective') else ""
print(f"\n {rec['type']}{linked}")
print(f" Name: {rec['name']}")
print(f" Reason: {rec['reason']}")
print()
print("Add these to optimization_config.json under 'insights' key")
elif command == 'report':
if len(sys.argv) < 3:
print("Error: Missing study path")
print_usage()
sys.exit(1)
study_path = Path(sys.argv[2])
if not study_path.exists():
print(f"Error: Study path does not exist: {study_path}")
sys.exit(1)
print(f"\nGenerating Insight Report for: {study_path.name}")
print("-" * 60)
# Check for configured insights
specs = get_configured_insights(study_path)
if not specs:
print("No insights configured in optimization_config.json")
print("Using recommendations based on objectives...")
recommendations = recommend_insights_for_study(study_path)
specs = [
InsightSpec(
type=rec['type'],
name=rec['name'],
linked_objective=rec.get('linked_objective'),
config=rec.get('config', {})
)
for rec in recommendations
]
if not specs:
print("No insights to generate")
sys.exit(0)
print(f"Generating {len(specs)} insight(s)...")
report = InsightReport(study_path)
results = report.generate_all(specs)
for result in results:
status = "OK" if result.success else f"FAIL: {result.error}"
print(f" {result.insight_name}: {status}")
if any(r.success for r in results):
report_path = report.generate_report_html()
print(f"\nReport generated: {report_path}")
print("Open in browser and use 'Save as PDF' button to export")
else:
print("\nNo insights generated successfully")
elif command == 'generate':
if len(sys.argv) < 3:
print("Error: Missing study path")

View File

@@ -8,25 +8,86 @@ of specific designs.
Architecture:
- StudyInsight: Abstract base class for all insight types
- InsightRegistry: Central registry for available insight types
- InsightSpec: Config-defined insight specification from optimization_config.json
- InsightReport: Aggregates multiple insights into a study report with PDF export
- Each insight can generate standalone HTML or Plotly data for dashboard
Config Schema (in optimization_config.json):
"insights": [
{
"type": "zernike_wfe",
"name": "WFE at 40 vs 20 deg",
"enabled": true,
"linked_objective": "rel_filtered_rms_40_vs_20",
"config": { "target_subcase": "3", "reference_subcase": "2" },
"include_in_report": true
},
{
"type": "design_space",
"name": "Parameter Exploration",
"enabled": true,
"linked_objective": null,
"config": {},
"include_in_report": true
}
]
Usage:
from optimization_engine.insights import get_insight, list_insights
# Get specific insight
insight = get_insight('zernike_wfe')
if insight.can_generate(study_path):
html_path = insight.generate_html(study_path, trial_id=47)
plotly_data = insight.get_plotly_data(study_path, trial_id=47)
insight = get_insight('zernike_wfe', study_path)
if insight.can_generate():
html_path = insight.generate_html(trial_id=47)
plotly_data = insight.get_plotly_data(trial_id=47)
# Get insights from config
specs = get_configured_insights(study_path)
for spec in specs:
insight = get_insight(spec.type, study_path)
result = insight.generate(spec.to_config())
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Type
from typing import Any, Dict, List, Optional, Type, Union
from datetime import datetime
import json
@dataclass
class InsightSpec:
"""
Insight specification from optimization_config.json.
Defines what insights the user wants for their study, optionally
linking them to specific objectives.
"""
type: str # Insight type ID (e.g., 'zernike_wfe')
name: str # User-defined display name
enabled: bool = True # Whether to generate this insight
linked_objective: Optional[str] = None # Objective name this visualizes (or None)
config: Dict[str, Any] = field(default_factory=dict) # Type-specific config
include_in_report: bool = True # Include in study report
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'InsightSpec':
"""Create InsightSpec from config dictionary."""
return cls(
type=data['type'],
name=data.get('name', data['type']),
enabled=data.get('enabled', True),
linked_objective=data.get('linked_objective'),
config=data.get('config', {}),
include_in_report=data.get('include_in_report', True)
)
def to_config(self) -> 'InsightConfig':
"""Convert spec to InsightConfig for generation."""
return InsightConfig(extra=self.config)
@dataclass
class InsightConfig:
"""Configuration for an insight instance."""
@@ -46,10 +107,14 @@ class InsightConfig:
class InsightResult:
"""Result from generating an insight."""
success: bool
insight_type: str = "" # Which insight generated this
insight_name: str = "" # Display name
html_path: Optional[Path] = None
plotly_figure: Optional[Dict[str, Any]] = None # Plotly figure as dict
summary: Optional[Dict[str, Any]] = None # Key metrics
error: Optional[str] = None
linked_objective: Optional[str] = None # Objective this relates to (if any)
generated_at: Optional[str] = None # ISO timestamp
class StudyInsight(ABC):
@@ -66,15 +131,29 @@ class StudyInsight(ABC):
- insight_type: Unique identifier (e.g., 'zernike_wfe')
- name: Human-readable name
- description: What this insight shows
- category: Physics domain (see INSIGHT_CATEGORIES)
- applicable_to: List of study types this applies to
- can_generate(): Check if study has required data
- _generate(): Core generation logic
"""
# Physics categories for grouping
INSIGHT_CATEGORIES = {
'optical': 'Optical',
'structural_static': 'Structural (Static)',
'structural_dynamic': 'Structural (Dynamic)',
'structural_modal': 'Structural (Modal)',
'thermal': 'Thermal',
'kinematic': 'Kinematic',
'design_exploration': 'Design Exploration',
'other': 'Other'
}
# Class-level metadata (override in subclasses)
insight_type: str = "base"
name: str = "Base Insight"
description: str = "Abstract base insight"
category: str = "other" # Physics domain (key from INSIGHT_CATEGORIES)
applicable_to: List[str] = [] # e.g., ['mirror', 'structural', 'all']
# Required files/data patterns
@@ -273,21 +352,57 @@ class InsightRegistry:
'type': cls.insight_type,
'name': cls.name,
'description': cls.description,
'category': cls.category,
'category_label': StudyInsight.INSIGHT_CATEGORIES.get(cls.category, 'Other'),
'applicable_to': cls.applicable_to
}
for cls in self._insights.values()
]
def list_available(self, study_path: Path) -> List[Dict[str, Any]]:
def list_available(self, study_path: Path, fast_mode: bool = True) -> List[Dict[str, Any]]:
"""
List insights that can be generated for a specific study.
Args:
study_path: Path to study directory
fast_mode: If True, use quick file existence checks instead of full can_generate()
Returns:
List of available insight metadata
"""
study_path = Path(study_path)
# Fast mode: check file patterns once, then filter insights by what they need
if fast_mode:
# Quick scan for data files (non-recursive, just check known locations)
has_op2 = self._quick_has_op2(study_path)
has_db = (study_path / "2_results" / "study.db").exists()
available = []
for insight_type, cls in self._insights.items():
# Quick filter based on required_files attribute
required = getattr(cls, 'required_files', [])
can_gen = True
if '*.op2' in required and not has_op2:
can_gen = False
if 'study.db' in required and not has_db:
can_gen = False
if can_gen:
available.append({
'type': insight_type,
'name': cls.name,
'description': cls.description,
'category': getattr(cls, 'category', 'other'),
'category_label': StudyInsight.INSIGHT_CATEGORIES.get(
getattr(cls, 'category', 'other'), 'Other'
),
'applicable_to': getattr(cls, 'applicable_to', [])
})
return available
# Slow mode: full can_generate() check (for accuracy)
available = []
for insight_type, cls in self._insights.items():
try:
@@ -296,12 +411,43 @@ class InsightRegistry:
available.append({
'type': insight_type,
'name': cls.name,
'description': cls.description
'description': cls.description,
'category': getattr(cls, 'category', 'other'),
'category_label': StudyInsight.INSIGHT_CATEGORIES.get(
getattr(cls, 'category', 'other'), 'Other'
),
'applicable_to': getattr(cls, 'applicable_to', [])
})
except Exception:
pass # Skip insights that fail to initialize
return available
def _quick_has_op2(self, study_path: Path) -> bool:
"""Quick check if study has OP2 files without recursive search."""
# Check common locations only (non-recursive)
check_dirs = [
study_path / "3_results" / "best_design_archive",
study_path / "2_iterations",
study_path / "1_setup" / "model",
]
for check_dir in check_dirs:
if not check_dir.exists():
continue
# Non-recursive check of immediate children
try:
for item in check_dir.iterdir():
if item.suffix.lower() == '.op2':
return True
# Check one level down for iterations
if item.is_dir():
for subitem in item.iterdir():
if subitem.suffix.lower() == '.op2':
return True
except (PermissionError, OSError):
continue
return False
# Global registry instance
_registry = InsightRegistry()
@@ -334,3 +480,458 @@ def list_insights() -> List[Dict[str, Any]]:
def list_available_insights(study_path: Path) -> List[Dict[str, Any]]:
"""List insights available for a specific study."""
return _registry.list_available(study_path)
def get_configured_insights(study_path: Path) -> List[InsightSpec]:
"""
Get insight specifications from study's optimization_config.json.
Returns:
List of InsightSpec objects defined in the config, or empty list
"""
study_path = Path(study_path)
config_path = study_path / "1_setup" / "optimization_config.json"
if not config_path.exists():
return []
with open(config_path) as f:
config = json.load(f)
insights_config = config.get('insights', [])
return [InsightSpec.from_dict(spec) for spec in insights_config]
def recommend_insights_for_study(study_path: Path) -> List[Dict[str, Any]]:
"""
Recommend insights based on study type and objectives.
Analyzes the optimization_config.json to suggest appropriate insights.
Returns recommendations with reasoning.
Returns:
List of recommendation dicts with 'type', 'name', 'reason', 'linked_objective'
"""
study_path = Path(study_path)
config_path = study_path / "1_setup" / "optimization_config.json"
if not config_path.exists():
return []
with open(config_path) as f:
config = json.load(f)
recommendations = []
objectives = config.get('objectives', [])
# Check for Zernike-related objectives
for obj in objectives:
obj_name = obj.get('name', '')
extractor = obj.get('extractor_config', {})
# WFE objectives -> recommend zernike_wfe
if any(kw in obj_name.lower() for kw in ['rms', 'wfe', 'zernike', 'filtered']):
target_sc = extractor.get('target_subcase', '')
ref_sc = extractor.get('reference_subcase', '')
recommendations.append({
'type': 'zernike_wfe',
'name': f"WFE: {obj.get('description', obj_name)[:50]}",
'reason': f"Visualizes Zernike WFE for objective '{obj_name}'",
'linked_objective': obj_name,
'config': {
'target_subcase': target_sc,
'reference_subcase': ref_sc
}
})
# Mass objectives -> no direct insight, but note for design space
elif 'mass' in obj_name.lower():
pass # Covered by design_space
# Stress-related
elif any(kw in obj_name.lower() for kw in ['stress', 'von_mises', 'strain']):
recommendations.append({
'type': 'stress_field',
'name': f"Stress: {obj.get('description', obj_name)[:50]}",
'reason': f"Visualizes stress distribution for objective '{obj_name}'",
'linked_objective': obj_name,
'config': {}
})
# Frequency/modal
elif any(kw in obj_name.lower() for kw in ['freq', 'modal', 'eigen', 'vibration']):
recommendations.append({
'type': 'modal',
'name': f"Modal: {obj.get('description', obj_name)[:50]}",
'reason': f"Shows mode shapes for objective '{obj_name}'",
'linked_objective': obj_name,
'config': {}
})
# Temperature/thermal
elif any(kw in obj_name.lower() for kw in ['temp', 'thermal', 'heat']):
recommendations.append({
'type': 'thermal',
'name': f"Thermal: {obj.get('description', obj_name)[:50]}",
'reason': f"Shows temperature distribution for objective '{obj_name}'",
'linked_objective': obj_name,
'config': {}
})
# Always recommend design_space for any optimization
if objectives:
recommendations.append({
'type': 'design_space',
'name': 'Design Space Exploration',
'reason': 'Shows parameter-objective relationships across all trials',
'linked_objective': None,
'config': {}
})
return recommendations
class InsightReport:
"""
Aggregates multiple insights into a comprehensive study report.
Generates:
- Full HTML report with all insights embedded
- Individual insight HTMLs in 3_insights/
- PDF export capability (via browser print)
- Summary JSON for Results page
Usage:
report = InsightReport(study_path)
report.add_insight(result1)
report.add_insight(result2)
report.generate_html() # Creates 3_insights/STUDY_INSIGHTS_REPORT.html
"""
def __init__(self, study_path: Path):
self.study_path = Path(study_path)
self.insights_path = self.study_path / "3_insights"
self.results_path = self.study_path / "3_results"
self.results: List[InsightResult] = []
# Load study config for metadata
config_path = self.study_path / "1_setup" / "optimization_config.json"
if config_path.exists():
with open(config_path) as f:
self.config = json.load(f)
else:
self.config = {}
def add_insight(self, result: InsightResult) -> None:
"""Add an insight result to the report."""
if result.success:
self.results.append(result)
def generate_all(self, specs: Optional[List[InsightSpec]] = None) -> List[InsightResult]:
"""
Generate all configured or specified insights.
Args:
specs: List of InsightSpecs to generate (or None to use config)
Returns:
List of InsightResult objects
"""
if specs is None:
specs = get_configured_insights(self.study_path)
results = []
for spec in specs:
if not spec.enabled:
continue
insight = get_insight(spec.type, self.study_path)
if insight is None:
results.append(InsightResult(
success=False,
insight_type=spec.type,
insight_name=spec.name,
error=f"Unknown insight type: {spec.type}"
))
continue
if not insight.can_generate():
results.append(InsightResult(
success=False,
insight_type=spec.type,
insight_name=spec.name,
error=f"Cannot generate {spec.name}: required data not found"
))
continue
config = spec.to_config()
result = insight.generate(config)
result.insight_type = spec.type
result.insight_name = spec.name
result.linked_objective = spec.linked_objective
result.generated_at = datetime.now().isoformat()
results.append(result)
self.add_insight(result)
return results
def generate_report_html(self, include_appendix: bool = True) -> Path:
"""
Generate comprehensive HTML report with all insights.
Args:
include_appendix: Whether to include full-size insight appendix
Returns:
Path to generated HTML report
"""
self.insights_path.mkdir(parents=True, exist_ok=True)
study_name = self.config.get('study_name', self.study_path.name)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
# Build HTML
html_parts = [self._report_header(study_name, timestamp)]
# Executive summary
html_parts.append(self._executive_summary())
# Objective-linked insights
linked = [r for r in self.results if r.linked_objective]
if linked:
html_parts.append('<h2>Objective Insights</h2>')
for result in linked:
html_parts.append(self._insight_section(result, compact=True))
# Standalone insights
standalone = [r for r in self.results if not r.linked_objective]
if standalone:
html_parts.append('<h2>General Insights</h2>')
for result in standalone:
html_parts.append(self._insight_section(result, compact=True))
# Appendix with full-size figures
if include_appendix and self.results:
html_parts.append(self._appendix_section())
html_parts.append(self._report_footer())
# Write report
report_path = self.insights_path / "STUDY_INSIGHTS_REPORT.html"
report_path.write_text('\n'.join(html_parts), encoding='utf-8')
# Also write summary JSON for Results page
self._write_summary_json()
return report_path
def _report_header(self, study_name: str, timestamp: str) -> str:
"""Generate HTML header."""
return f'''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Study Insights Report - {study_name}</title>
<script src="https://cdn.plot.ly/plotly-2.27.0.min.js"></script>
<style>
:root {{
--bg-dark: #111827;
--bg-card: #1f2937;
--border: #374151;
--text: #f9fafb;
--text-muted: #9ca3af;
--primary: #3b82f6;
--success: #22c55e;
}}
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: var(--bg-dark);
color: var(--text);
line-height: 1.6;
padding: 2rem;
}}
.container {{ max-width: 1200px; margin: 0 auto; }}
h1 {{ font-size: 2rem; margin-bottom: 0.5rem; color: var(--primary); }}
h2 {{ font-size: 1.5rem; margin: 2rem 0 1rem; border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; }}
h3 {{ font-size: 1.1rem; margin-bottom: 0.5rem; }}
.meta {{ color: var(--text-muted); font-size: 0.9rem; margin-bottom: 2rem; }}
.card {{
background: var(--bg-card);
border: 1px solid var(--border);
border-radius: 8px;
padding: 1.5rem;
margin-bottom: 1.5rem;
}}
.summary-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 1rem;
margin-bottom: 2rem;
}}
.stat {{
background: var(--bg-card);
padding: 1rem;
border-radius: 8px;
text-align: center;
}}
.stat-value {{ font-size: 1.5rem; font-weight: bold; color: var(--success); }}
.stat-label {{ font-size: 0.8rem; color: var(--text-muted); text-transform: uppercase; }}
.objective-tag {{
display: inline-block;
background: var(--primary);
color: white;
padding: 0.2rem 0.5rem;
border-radius: 4px;
font-size: 0.75rem;
margin-left: 0.5rem;
}}
.plot-container {{ width: 100%; height: 400px; margin: 1rem 0; }}
.appendix-plot {{ height: 600px; page-break-inside: avoid; }}
table {{ width: 100%; border-collapse: collapse; margin: 1rem 0; }}
th, td {{ padding: 0.5rem; text-align: left; border-bottom: 1px solid var(--border); }}
th {{ color: var(--text-muted); font-weight: normal; font-size: 0.85rem; }}
.print-btn {{
position: fixed;
top: 1rem;
right: 1rem;
background: var(--primary);
color: white;
border: none;
padding: 0.75rem 1.5rem;
border-radius: 6px;
cursor: pointer;
font-size: 0.9rem;
}}
.print-btn:hover {{ opacity: 0.9; }}
@media print {{
.print-btn {{ display: none; }}
body {{ background: white; color: black; }}
.card {{ border: 1px solid #ccc; }}
}}
</style>
</head>
<body>
<button class="print-btn" onclick="window.print()">Save as PDF</button>
<div class="container">
<h1>Study Insights Report</h1>
<p class="meta">{study_name} | Generated: {timestamp}</p>
'''
def _executive_summary(self) -> str:
"""Generate executive summary section."""
n_objectives = len(self.config.get('objectives', []))
n_insights = len(self.results)
linked = len([r for r in self.results if r.linked_objective])
return f'''
<div class="summary-grid">
<div class="stat">
<div class="stat-value">{n_insights}</div>
<div class="stat-label">Total Insights</div>
</div>
<div class="stat">
<div class="stat-value">{linked}</div>
<div class="stat-label">Objective-Linked</div>
</div>
<div class="stat">
<div class="stat-value">{n_objectives}</div>
<div class="stat-label">Study Objectives</div>
</div>
</div>
'''
def _insight_section(self, result: InsightResult, compact: bool = False) -> str:
"""Generate HTML section for a single insight."""
objective_tag = ""
if result.linked_objective:
objective_tag = f'<span class="objective-tag">{result.linked_objective}</span>'
# Summary table
summary_html = ""
if result.summary:
rows = ""
for key, value in list(result.summary.items())[:6]:
if isinstance(value, float):
value = f"{value:.4g}"
rows += f"<tr><td>{key}</td><td>{value}</td></tr>"
summary_html = f"<table><tbody>{rows}</tbody></table>"
# Compact plot or placeholder
plot_html = ""
if result.plotly_figure and compact:
plot_id = f"plot_{result.insight_type}_{id(result)}"
plot_json = json.dumps(result.plotly_figure)
plot_html = f'''
<div id="{plot_id}" class="plot-container"></div>
<script>
var fig = {plot_json};
fig.layout.height = 350;
fig.layout.margin = {{l:50,r:50,t:30,b:50}};
Plotly.newPlot("{plot_id}", fig.data, fig.layout, {{responsive:true}});
</script>
'''
return f'''
<div class="card">
<h3>{result.insight_name}{objective_tag}</h3>
{summary_html}
{plot_html}
</div>
'''
def _appendix_section(self) -> str:
"""Generate appendix with full-size figures."""
html = '<h2>Appendix: Full-Size Visualizations</h2>'
for i, result in enumerate(self.results):
if not result.plotly_figure:
continue
plot_id = f"appendix_plot_{i}"
plot_json = json.dumps(result.plotly_figure)
html += f'''
<div class="card" style="page-break-before: always;">
<h3>Appendix {i+1}: {result.insight_name}</h3>
<div id="{plot_id}" class="appendix-plot"></div>
<script>
var fig = {plot_json};
fig.layout.height = 550;
Plotly.newPlot("{plot_id}", fig.data, fig.layout, {{responsive:true}});
</script>
</div>
'''
return html
def _report_footer(self) -> str:
"""Generate HTML footer."""
return '''
</div>
</body>
</html>'''
def _write_summary_json(self) -> None:
"""Write summary JSON for Results page integration."""
summary = {
'generated_at': datetime.now().isoformat(),
'study_name': self.config.get('study_name', self.study_path.name),
'insights': []
}
for result in self.results:
summary['insights'].append({
'type': result.insight_type,
'name': result.insight_name,
'success': result.success,
'linked_objective': result.linked_objective,
'html_path': str(result.html_path) if result.html_path else None,
'summary': result.summary
})
summary_path = self.insights_path / "insights_summary.json"
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)

View File

@@ -0,0 +1,875 @@
"""
MSF Zernike Insight - Mid-Spatial Frequency Analysis for Telescope Mirrors
Provides detailed analysis of mid-spatial frequency (MSF) content in mirror
wavefront error, specifically for gravity-induced support print-through.
Key Features:
- Band decomposition: LSF (n≤10), MSF (n=11-50), HSF (n>50)
- RSS (Root Sum Square) metrics per band
- Relative analysis vs reference orientations (20°, 90°)
- Higher-order Zernike fitting (100 modes by default)
- PSD (Power Spectral Density) visualization
- Support print-through identification
For GigaBIT and similar telescope mirrors where MSF errors are dominated by
gravity-induced support deformation rather than polishing residual.
Author: Atomizer Framework
"""
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple
import numpy as np
from math import factorial
from numpy.linalg import LinAlgError
from .base import StudyInsight, InsightConfig, InsightResult, register_insight
# Lazy imports
_plotly_loaded = False
_go = None
_make_subplots = None
_Triangulation = None
_OP2 = None
_BDF = None
def _load_dependencies():
"""Lazy load heavy dependencies."""
global _plotly_loaded, _go, _make_subplots, _Triangulation, _OP2, _BDF
if not _plotly_loaded:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.tri import Triangulation
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
_go = go
_make_subplots = make_subplots
_Triangulation = Triangulation
_OP2 = OP2
_BDF = BDF
_plotly_loaded = True
# ============================================================================
# Band Definitions
# ============================================================================
# Default band boundaries (Zernike radial order n)
DEFAULT_LSF_MAX = 10 # LSF: n = 1-10 (M2 hexapod correctable)
DEFAULT_MSF_MAX = 50 # MSF: n = 11-50 (support print-through)
DEFAULT_N_MODES = 100 # Total modes to fit (n ≈ 14)
# Band colors for visualization
BAND_COLORS = {
'lsf': '#3b82f6', # Blue - low spatial frequency
'msf': '#f59e0b', # Amber - mid spatial frequency
'hsf': '#ef4444', # Red - high spatial frequency
}
# ============================================================================
# Zernike Mathematics (same as zernike_wfe.py)
# ============================================================================
def noll_indices(j: int) -> Tuple[int, int]:
"""Convert Noll index to (n, m) radial/azimuthal orders."""
if j < 1:
raise ValueError("Noll index j must be >= 1")
count = 0
n = 0
while True:
if n == 0:
ms = [0]
elif n % 2 == 0:
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
else:
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
for m in ms:
count += 1
if count == j:
return n, m
n += 1
def noll_to_radial_order(j: int) -> int:
"""Get radial order n for Noll index j."""
n, _ = noll_indices(j)
return n
def radial_order_to_first_noll(n: int) -> int:
"""Get first Noll index for radial order n."""
# Sum of (k+1) for k=0 to n-1, plus 1
return n * (n + 1) // 2 + 1
def zernike_noll(j: int, r: np.ndarray, th: np.ndarray) -> np.ndarray:
"""Evaluate Zernike polynomial j at (r, theta)."""
n, m = noll_indices(j)
R = np.zeros_like(r)
for s in range((n - abs(m)) // 2 + 1):
c = ((-1)**s * factorial(n - s) /
(factorial(s) *
factorial((n + abs(m)) // 2 - s) *
factorial((n - abs(m)) // 2 - s)))
R += c * r**(n - 2*s)
if m == 0:
return R
return R * (np.cos(m * th) if m > 0 else np.sin(-m * th))
def zernike_common_name(n: int, m: int) -> str:
"""Get common name for Zernike mode."""
names = {
(0, 0): "Piston", (1, -1): "Tilt X", (1, 1): "Tilt Y",
(2, 0): "Defocus", (2, -2): "Astig 45°", (2, 2): "Astig 0°",
(3, -1): "Coma X", (3, 1): "Coma Y", (3, -3): "Trefoil X", (3, 3): "Trefoil Y",
(4, 0): "Primary Spherical", (4, -2): "Sec Astig X", (4, 2): "Sec Astig Y",
(4, -4): "Quadrafoil X", (4, 4): "Quadrafoil Y",
(5, -1): "Sec Coma X", (5, 1): "Sec Coma Y",
(5, -3): "Sec Trefoil X", (5, 3): "Sec Trefoil Y",
(5, -5): "Pentafoil X", (5, 5): "Pentafoil Y",
(6, 0): "Sec Spherical",
}
return names.get((n, m), f"Z(n={n}, m={m})")
def zernike_label(j: int) -> str:
"""Get label for Zernike coefficient J{j}."""
n, m = noll_indices(j)
return f"J{j:02d} - {zernike_common_name(n, m)} (n={n}, m={m})"
def compute_zernike_coeffs(
X: np.ndarray,
Y: np.ndarray,
vals: np.ndarray,
n_modes: int,
chunk_size: int = 100000
) -> Tuple[np.ndarray, float]:
"""Fit Zernike coefficients to WFE data."""
Xc, Yc = X - np.mean(X), Y - np.mean(Y)
R = float(np.max(np.hypot(Xc, Yc)))
r = np.hypot(Xc / R, Yc / R).astype(np.float32)
th = np.arctan2(Yc, Xc).astype(np.float32)
mask = (r <= 1.0) & ~np.isnan(vals)
if not np.any(mask):
raise RuntimeError("No valid points inside unit disk.")
idx = np.nonzero(mask)[0]
m = int(n_modes)
G = np.zeros((m, m), dtype=np.float64)
h = np.zeros((m,), dtype=np.float64)
v = vals.astype(np.float64)
for start in range(0, len(idx), chunk_size):
sl = idx[start:start + chunk_size]
r_b, th_b, v_b = r[sl], th[sl], v[sl]
Zb = np.column_stack([zernike_noll(j, r_b, th_b).astype(np.float32)
for j in range(1, m + 1)])
G += (Zb.T @ Zb).astype(np.float64)
h += (Zb.T @ v_b).astype(np.float64)
try:
coeffs = np.linalg.solve(G, h)
except LinAlgError:
coeffs = np.linalg.lstsq(G, h, rcond=None)[0]
return coeffs, R
# ============================================================================
# Band Analysis
# ============================================================================
def classify_modes_by_band(
n_modes: int,
lsf_max: int = DEFAULT_LSF_MAX,
msf_max: int = DEFAULT_MSF_MAX
) -> Dict[str, List[int]]:
"""
Classify Zernike modes into LSF/MSF/HSF bands.
Returns:
Dict with 'lsf', 'msf', 'hsf' keys containing lists of Noll indices
"""
bands = {'lsf': [], 'msf': [], 'hsf': []}
for j in range(1, n_modes + 1):
n = noll_to_radial_order(j)
if n <= lsf_max:
bands['lsf'].append(j)
elif n <= msf_max:
bands['msf'].append(j)
else:
bands['hsf'].append(j)
return bands
def compute_band_rss(
coeffs: np.ndarray,
bands: Dict[str, List[int]],
filter_modes: int = 4
) -> Dict[str, float]:
"""
Compute RSS (Root Sum Square) of coefficients in each band.
RSS = sqrt(sum(c_j^2)) for j in band
Args:
coeffs: Zernike coefficients
bands: Dict with band name -> list of Noll indices
filter_modes: Number of low-order modes to filter for 'filtered' metric (default 4 = J1-J4)
Returns:
Dict with 'lsf', 'msf', 'hsf', 'total', and 'filtered' (J5+) RSS values
"""
rss = {}
for band_name, indices in bands.items():
# Convert to 0-indexed
band_coeffs = [coeffs[j-1] for j in indices if j-1 < len(coeffs)]
rss[band_name] = float(np.sqrt(np.sum(np.array(band_coeffs)**2)))
rss['total'] = float(np.sqrt(np.sum(coeffs**2)))
# Filtered RSS: exclude first filter_modes (J1-J4 = piston, tip, tilt, defocus)
# This is the engineering-relevant metric after M2 alignment
filtered_coeffs = coeffs[filter_modes:] if len(coeffs) > filter_modes else np.array([])
rss['filtered'] = float(np.sqrt(np.sum(filtered_coeffs**2)))
return rss
def compute_band_residual(
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
coeffs: np.ndarray,
R: float,
bands: Dict[str, List[int]],
keep_bands: List[str]
) -> np.ndarray:
"""
Compute WFE residual keeping only specified bands.
Args:
keep_bands: List of band names to keep (e.g., ['msf'] for MSF-only)
Returns:
WFE array with only specified band content
"""
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
r = np.hypot(Xc / R, Yc / R)
th = np.arctan2(Yc, Xc)
# Build Zernike basis for modes to keep
keep_indices = []
for band in keep_bands:
keep_indices.extend(bands.get(band, []))
if not keep_indices:
return np.zeros_like(W_nm)
# Reconstruct only the kept bands
W_band = np.zeros_like(W_nm)
for j in keep_indices:
if j - 1 < len(coeffs):
Z_j = zernike_noll(j, r, th)
W_band += coeffs[j-1] * Z_j
return W_band
def find_dominant_msf_modes(
coeffs: np.ndarray,
bands: Dict[str, List[int]],
top_n: int = 5
) -> List[Dict[str, Any]]:
"""
Find the dominant MSF modes by coefficient magnitude.
Returns:
List of dicts with 'j', 'label', 'coeff_nm', 'n', 'm'
"""
msf_indices = bands.get('msf', [])
mode_data = []
for j in msf_indices:
if j - 1 < len(coeffs):
n, m = noll_indices(j)
mode_data.append({
'j': j,
'label': zernike_label(j),
'coeff_nm': float(abs(coeffs[j-1])),
'n': n,
'm': m
})
# Sort by magnitude
mode_data.sort(key=lambda x: x['coeff_nm'], reverse=True)
return mode_data[:top_n]
def estimate_mesh_resolution(X: np.ndarray, Y: np.ndarray) -> Dict[str, float]:
"""
Estimate mesh spatial resolution.
Returns:
Dict with 'node_count', 'diameter_mm', 'avg_spacing_mm',
'max_resolvable_order', 'min_feature_mm'
"""
n_nodes = len(X)
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
R = np.max(np.hypot(Xc, Yc))
diameter = 2 * R
# Approximate spacing from area
area = np.pi * R**2
avg_spacing = np.sqrt(area / n_nodes)
# Nyquist: can resolve features > 2x spacing
min_feature = 2 * avg_spacing
# Max Zernike order
max_order = int(diameter / min_feature)
return {
'node_count': n_nodes,
'diameter_mm': float(diameter),
'avg_spacing_mm': float(avg_spacing),
'min_feature_mm': float(min_feature),
'max_resolvable_order': max_order
}
# ============================================================================
# Default Configuration
# ============================================================================
DEFAULT_CONFIG = {
'n_modes': 100,
'lsf_max': 10, # n ≤ 10 is LSF
'msf_max': 50, # n = 11-50 is MSF
'amp': 0.5, # Visual deformation scale
'pancake': 3.0, # Z-axis range multiplier
'plot_downsample': 15000,
'colorscale': 'Turbo',
'disp_unit': 'mm',
'show_psd': True,
'show_all_orientations': True,
}
@register_insight
class MSFZernikeInsight(StudyInsight):
"""
Mid-Spatial Frequency Zernike analysis for telescope mirror optimization.
Provides detailed breakdown of spatial frequency content:
- LSF (Low): n ≤ 10, correctable by M2 hexapod
- MSF (Mid): n = 11-50, support print-through
- HSF (High): n > 50, near mesh resolution limit
Generates:
- Band decomposition table with RSS metrics
- MSF-only 3D surface visualization
- Coefficient bar chart color-coded by band
- Dominant MSF mode identification
- Mesh resolution analysis
"""
insight_type = "msf_zernike"
name = "MSF Zernike Analysis"
description = "Mid-spatial frequency analysis with band decomposition for support print-through"
category = "optical"
applicable_to = ["mirror", "optics", "wfe", "msf"]
required_files = ["*.op2"]
def __init__(self, study_path: Path):
super().__init__(study_path)
self.op2_path: Optional[Path] = None
self.geo_path: Optional[Path] = None
self._node_geo: Optional[Dict] = None
self._displacements: Optional[Dict] = None
def can_generate(self) -> bool:
"""Check if OP2 and geometry files exist."""
search_paths = [
self.results_path,
self.study_path / "2_iterations",
self.setup_path / "model",
]
for search_path in search_paths:
if not search_path.exists():
continue
op2_files = list(search_path.glob("**/*solution*.op2"))
if not op2_files:
op2_files = list(search_path.glob("**/*.op2"))
if op2_files:
self.op2_path = max(op2_files, key=lambda p: p.stat().st_mtime)
break
if self.op2_path is None:
return False
try:
self.geo_path = self._find_geometry_file(self.op2_path)
return True
except FileNotFoundError:
return False
def _find_geometry_file(self, op2_path: Path) -> Path:
"""Find BDF/DAT geometry file for OP2."""
folder = op2_path.parent
base = op2_path.stem
for ext in ['.dat', '.bdf']:
cand = folder / (base + ext)
if cand.exists():
return cand
for f in folder.iterdir():
if f.suffix.lower() in ['.dat', '.bdf']:
return f
raise FileNotFoundError(f"No geometry file found for {op2_path}")
def _load_data(self):
"""Load geometry and displacement data."""
if self._node_geo is not None:
return
_load_dependencies()
bdf = _BDF()
bdf.read_bdf(str(self.geo_path))
self._node_geo = {int(nid): node.get_position()
for nid, node in bdf.nodes.items()}
op2 = _OP2()
op2.read_op2(str(self.op2_path))
if not op2.displacements:
raise RuntimeError("No displacement data in OP2")
self._displacements = {}
for key, darr in op2.displacements.items():
data = darr.data
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
if dmat is None:
continue
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
isubcase = getattr(darr, 'isubcase', None)
label = str(isubcase) if isubcase else str(key)
self._displacements[label] = {
'node_ids': node_ids.astype(int),
'disp': dmat.copy()
}
def _build_wfe_arrays(
self,
label: str,
disp_unit: str = 'mm'
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Build X, Y, WFE arrays for a subcase."""
nm_per_unit = 1e6 if disp_unit == 'mm' else 1e9
data = self._displacements[label]
node_ids = data['node_ids']
dmat = data['disp']
X, Y, WFE = [], [], []
valid_nids = []
for nid, vec in zip(node_ids, dmat):
geo = self._node_geo.get(int(nid))
if geo is None:
continue
X.append(geo[0])
Y.append(geo[1])
wfe = vec[2] * 2.0 * nm_per_unit
WFE.append(wfe)
valid_nids.append(nid)
return (np.array(X), np.array(Y), np.array(WFE), np.array(valid_nids))
def _compute_relative_wfe(
self,
X1, Y1, WFE1, nids1,
X2, Y2, WFE2, nids2
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute WFE1 - WFE2 for common nodes."""
ref_map = {int(nid): (x, y, w) for nid, x, y, w in zip(nids2, X2, Y2, WFE2)}
X_rel, Y_rel, WFE_rel = [], [], []
for nid, x, y, w in zip(nids1, X1, Y1, WFE1):
nid = int(nid)
if nid in ref_map:
_, _, w_ref = ref_map[nid]
X_rel.append(x)
Y_rel.append(y)
WFE_rel.append(w - w_ref)
return np.array(X_rel), np.array(Y_rel), np.array(WFE_rel)
def _analyze_subcase(
self,
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
cfg: Dict
) -> Dict[str, Any]:
"""
Full MSF analysis for a single subcase.
Returns:
Dict with coefficients, bands, RSS, dominant modes, mesh info
"""
n_modes = cfg['n_modes']
lsf_max = cfg['lsf_max']
msf_max = cfg['msf_max']
# Fit Zernike
coeffs, R = compute_zernike_coeffs(X, Y, W_nm, n_modes)
# Band classification
bands = classify_modes_by_band(n_modes, lsf_max, msf_max)
# RSS per band
rss = compute_band_rss(coeffs, bands)
# Band percentages
total_rss = rss['total']
pct = {
'lsf': 100 * rss['lsf'] / total_rss if total_rss > 0 else 0,
'msf': 100 * rss['msf'] / total_rss if total_rss > 0 else 0,
'hsf': 100 * rss['hsf'] / total_rss if total_rss > 0 else 0,
}
# Dominant MSF modes
dominant_msf = find_dominant_msf_modes(coeffs, bands, top_n=5)
# MSF-only residual
W_msf = compute_band_residual(X, Y, W_nm, coeffs, R, bands, ['msf'])
# Mesh resolution
mesh_info = estimate_mesh_resolution(X, Y)
return {
'coefficients': coeffs,
'R': R,
'bands': bands,
'rss': rss,
'rss_pct': pct,
'dominant_msf': dominant_msf,
'W_msf': W_msf,
'mesh_info': mesh_info,
'n_modes': n_modes,
'lsf_max': lsf_max,
'msf_max': msf_max,
}
def _generate_html(
self,
analyses: Dict[str, Dict],
cfg: Dict,
X: np.ndarray,
Y: np.ndarray
) -> str:
"""Generate comprehensive HTML report."""
_load_dependencies()
n_modes = cfg['n_modes']
amp = cfg.get('amp', 0.5)
pancake = cfg.get('pancake', 3.0)
downsample = cfg.get('plot_downsample', 15000)
colorscale = cfg.get('colorscale', 'Turbo')
# Use first available analysis for visualization
primary_key = list(analyses.keys())[0]
primary = analyses[primary_key]
coeffs = primary['coefficients']
bands = primary['bands']
W_msf = primary['W_msf']
mesh_info = primary['mesh_info']
# Prepare band-colored bar chart data
bar_colors = []
for j in range(1, n_modes + 1):
if j in bands['lsf']:
bar_colors.append(BAND_COLORS['lsf'])
elif j in bands['msf']:
bar_colors.append(BAND_COLORS['msf'])
else:
bar_colors.append(BAND_COLORS['hsf'])
labels = [zernike_label(j) for j in range(1, n_modes + 1)]
coeff_abs = np.abs(coeffs)
# Downsample for 3D plot
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp, Wp = X[sel], Y[sel], W_msf[sel]
else:
Xp, Yp, Wp = X, Y, W_msf
res_amp = amp * Wp
max_amp = float(np.max(np.abs(res_amp))) if res_amp.size else 1.0
# Create figure with subplots
fig = _make_subplots(
rows=4, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}],
[{"type": "table"}], [{"type": "xy"}]],
row_heights=[0.40, 0.20, 0.15, 0.25],
vertical_spacing=0.03,
subplot_titles=[
"<b>MSF Content Only (n=11-50)</b>",
"<b>Band Decomposition (RSS)</b>",
"<b>Dominant MSF Modes</b>",
"<b>Coefficient Magnitude by Band</b>"
]
)
# 3D MSF surface
try:
tri = _Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
fig.add_trace(_go.Mesh3d(
x=Xp, y=Yp, z=res_amp,
i=i, j=j, k=k,
intensity=res_amp,
colorscale=colorscale,
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3,
roughness=0.5, fresnel=0.2),
lightposition=dict(x=100, y=200, z=300),
showscale=True,
colorbar=dict(title=dict(text="MSF WFE (nm)", side="right"),
thickness=15, len=0.5, tickformat=".1f"),
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>MSF: %{z:.2f} nm<extra></extra>"
), row=1, col=1)
except Exception:
fig.add_trace(_go.Scatter3d(
x=Xp, y=Yp, z=res_amp,
mode='markers',
marker=dict(size=2, color=res_amp, colorscale=colorscale, showscale=True),
showlegend=False
), row=1, col=1)
# Configure 3D scene
fig.update_scenes(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
xaxis=dict(title="X (mm)", showgrid=True,
gridcolor='rgba(128,128,128,0.3)',
showbackground=True, backgroundcolor='rgba(240,240,240,0.9)'),
yaxis=dict(title="Y (mm)", showgrid=True,
gridcolor='rgba(128,128,128,0.3)',
showbackground=True, backgroundcolor='rgba(240,240,240,0.9)'),
zaxis=dict(title="MSF WFE (nm)",
range=[-max_amp * pancake, max_amp * pancake],
showgrid=True, gridcolor='rgba(128,128,128,0.3)',
showbackground=True, backgroundcolor='rgba(230,230,250,0.9)'),
aspectmode='manual',
aspectratio=dict(x=1, y=1, z=0.4)
)
# Band decomposition table
band_headers = ["<b>Band</b>", "<b>Order Range</b>", "<b>Feature Size</b>"]
band_values = [
["LSF (Low)", "MSF (Mid)", "HSF (High)", "<b>Total</b>", "<b>Filtered (J5+)</b>"],
[f"n ≤ {cfg['lsf_max']}", f"n = {cfg['lsf_max']+1}-{cfg['msf_max']}",
f"n > {cfg['msf_max']}", "All", "J5+ (excl. piston/tip/tilt/defocus)"],
[f"> {int(mesh_info['diameter_mm']/cfg['lsf_max'])} mm",
f"{int(mesh_info['diameter_mm']/cfg['msf_max'])}-{int(mesh_info['diameter_mm']/(cfg['lsf_max']+1))} mm",
f"< {int(mesh_info['diameter_mm']/cfg['msf_max'])} mm", "-", "M2 correctable removed"],
]
# Add columns for each analysis
for key, analysis in analyses.items():
band_headers.append(f"<b>{key} RSS (nm)</b>")
rss = analysis['rss']
pct = analysis['rss_pct']
# Calculate filtered percentage
filtered_pct = 100 * rss['filtered'] / rss['total'] if rss['total'] > 0 else 0
band_values.append([
f"{rss['lsf']:.2f} ({pct['lsf']:.1f}%)",
f"{rss['msf']:.2f} ({pct['msf']:.1f}%)",
f"{rss['hsf']:.2f} ({pct['hsf']:.1f}%)",
f"<b>{rss['total']:.2f}</b>",
f"<b>{rss['filtered']:.2f}</b> ({filtered_pct:.1f}%)",
])
fig.add_trace(_go.Table(
header=dict(values=band_headers,
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=band_values,
align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
# Dominant MSF modes table
dom_modes = primary['dominant_msf']
if dom_modes:
fig.add_trace(_go.Table(
header=dict(values=["<b>Rank</b>", "<b>Mode</b>", "<b>|Coeff| (nm)</b>", "<b>Order n</b>"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
[f"#{i+1}" for i in range(len(dom_modes))],
[m['label'] for m in dom_modes],
[f"{m['coeff_nm']:.3f}" for m in dom_modes],
[str(m['n']) for m in dom_modes],
], align="left", fill_color='#374151', font=dict(color='white'))
), row=3, col=1)
# Bar chart with band colors
fig.add_trace(
_go.Bar(
x=coeff_abs.tolist(), y=labels,
orientation='h',
marker_color=bar_colors,
hovertemplate="%{y}<br>|Coeff| = %{x:.3f} nm<extra></extra>",
showlegend=False
),
row=4, col=1
)
# Add legend for bands
for band_name, color in BAND_COLORS.items():
fig.add_trace(_go.Scatter(
x=[None], y=[None],
mode='markers',
marker=dict(size=10, color=color),
name=band_name.upper(),
showlegend=True
))
# Layout
fig.update_layout(
width=1400, height=1600,
margin=dict(t=60, b=20, l=20, r=20),
paper_bgcolor='#111827', plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(
text=f"<b>Atomizer MSF Analysis</b> | {n_modes} modes | Mesh: {mesh_info['node_count']} nodes, {mesh_info['avg_spacing_mm']:.1f}mm spacing",
x=0.5, font=dict(size=16)
),
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
)
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate(self, config: InsightConfig) -> InsightResult:
"""Generate MSF analysis."""
self._load_data()
cfg = {**DEFAULT_CONFIG, **config.extra}
disp_unit = cfg['disp_unit']
# Map subcases
disps = self._displacements
if '1' in disps and '2' in disps:
sc_map = {'90°': '1', '20°': '2', '40°': '3', '60°': '4'}
elif '90' in disps and '20' in disps:
sc_map = {'90°': '90', '20°': '20', '40°': '40', '60°': '60'}
else:
available = sorted(disps.keys(), key=lambda x: int(x) if x.isdigit() else 0)
if len(available) >= 4:
sc_map = {'90°': available[0], '20°': available[1],
'40°': available[2], '60°': available[3]}
else:
return InsightResult(success=False,
error=f"Need 4 subcases, found: {available}")
# Validate subcases
for angle, label in sc_map.items():
if label not in disps:
return InsightResult(success=False,
error=f"Subcase '{label}' (angle {angle}) not found")
# Load reference data
X_20, Y_20, WFE_20, nids_20 = self._build_wfe_arrays(sc_map['20°'], disp_unit)
X_90, Y_90, WFE_90, nids_90 = self._build_wfe_arrays(sc_map['90°'], disp_unit)
analyses = {}
# Analyze each key orientation
for angle in ['40°', '60°']:
label = sc_map[angle]
X, Y, WFE, nids = self._build_wfe_arrays(label, disp_unit)
# Absolute
analyses[f"{angle} Abs"] = self._analyze_subcase(X, Y, WFE, cfg)
# Relative to 20°
X_rel, Y_rel, WFE_rel = self._compute_relative_wfe(
X, Y, WFE, nids, X_20, Y_20, WFE_20, nids_20)
analyses[f"{angle} vs 20°"] = self._analyze_subcase(X_rel, Y_rel, WFE_rel, cfg)
# Relative to 90° (manufacturing)
X_rel90, Y_rel90, WFE_rel90 = self._compute_relative_wfe(
X, Y, WFE, nids, X_90, Y_90, WFE_90, nids_90)
analyses[f"{angle} vs 90°"] = self._analyze_subcase(X_rel90, Y_rel90, WFE_rel90, cfg)
# Also analyze 90° absolute (manufacturing baseline)
analyses["90° Mfg"] = self._analyze_subcase(X_90, Y_90, WFE_90, cfg)
# Generate HTML using 40° vs 20° as primary visualization
primary_analysis = analyses["40° vs 20°"]
X_40, Y_40, _, _ = self._build_wfe_arrays(sc_map['40°'], disp_unit)
X_rel40, Y_rel40, _ = self._compute_relative_wfe(
X_40, Y_40, analyses["40° Abs"]['W_msf'], np.arange(len(X_40)),
X_20, Y_20, WFE_20, nids_20)
html = self._generate_html(analyses, cfg, X_40, Y_40)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = config.output_dir or self.insights_path
output_dir.mkdir(parents=True, exist_ok=True)
html_path = output_dir / f"msf_zernike_{timestamp}.html"
html_path.write_text(html, encoding='utf-8')
# Build summary
summary = {
'n_modes': cfg['n_modes'],
'lsf_max_order': cfg['lsf_max'],
'msf_max_order': cfg['msf_max'],
'mesh_nodes': primary_analysis['mesh_info']['node_count'],
'mesh_spacing_mm': primary_analysis['mesh_info']['avg_spacing_mm'],
'max_resolvable_order': primary_analysis['mesh_info']['max_resolvable_order'],
}
# Add key metrics for each analysis
for key, analysis in analyses.items():
safe_key = key.replace('°', 'deg').replace(' ', '_')
summary[f'{safe_key}_lsf_rss'] = analysis['rss']['lsf']
summary[f'{safe_key}_msf_rss'] = analysis['rss']['msf']
summary[f'{safe_key}_hsf_rss'] = analysis['rss']['hsf']
summary[f'{safe_key}_total_rss'] = analysis['rss']['total']
summary[f'{safe_key}_filtered_rss'] = analysis['rss']['filtered']
summary[f'{safe_key}_msf_pct'] = analysis['rss_pct']['msf']
return InsightResult(
success=True,
html_path=html_path,
summary=summary
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,625 @@
"""
Zernike OPD Method Comparison Insight
=====================================
Compares standard Z-only Zernike method against rigorous OPD method that
accounts for lateral (X, Y) displacements. Critical for understanding
whether your optimization objective is capturing true optical performance.
WHY THIS MATTERS:
-----------------
When supports pinch or laterally deform the mirror, nodes shift in X and Y.
The standard Zernike method ignores this, potentially leading to:
- Optimized designs that "cheat" by distorting laterally
- False low WFE when the actual optical surface is degraded
- Optimizer convergence to non-optimal solutions
This insight visualizes:
1. The difference between methods (where they disagree)
2. Lateral displacement magnitude map (where pinching occurs)
3. Which method you should be using for your study
Applicable to: All mirror/optics studies, CRITICAL for lateral support optimization.
"""
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional, Tuple
import numpy as np
from .base import StudyInsight, InsightConfig, InsightResult, register_insight
# Lazy imports
_plotly_loaded = False
_go = None
_make_subplots = None
_Triangulation = None
def _load_dependencies():
"""Lazy load heavy dependencies."""
global _plotly_loaded, _go, _make_subplots, _Triangulation
if not _plotly_loaded:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.tri import Triangulation
_go = go
_make_subplots = make_subplots
_Triangulation = Triangulation
_plotly_loaded = True
# Default configuration
DEFAULT_CONFIG = {
'amp': 0.5, # Visual deformation scale
'pancake': 3.0, # Z-axis range multiplier
'plot_downsample': 15000,
'colorscale_wfe': 'RdBu', # Diverging for WFE difference
'colorscale_lateral': 'Hot', # Sequential for lateral magnitude
'disp_unit': 'mm',
'n_modes': 50,
'filter_orders': 4,
}
@register_insight
class ZernikeOPDComparisonInsight(StudyInsight):
"""
Compare standard vs OPD-rigorous Zernike methods.
Generates visualizations showing:
- Lateral displacement magnitude map (where pinching occurs)
- WFE difference map (where methods disagree)
- Quantitative comparison tables
- Recommendation for which method to use
"""
insight_type = "zernike_opd_comparison"
name = "Zernike Method Comparison (OPD vs Standard)"
description = "Compare Z-only vs rigorous OPD Zernike methods - reveals lateral displacement impact"
category = "optical"
applicable_to = ["mirror", "optics", "wfe", "lateral"]
required_files = ["*.op2"]
def __init__(self, study_path: Path):
super().__init__(study_path)
self.op2_path: Optional[Path] = None
self.geo_path: Optional[Path] = None
self._node_geo: Optional[Dict] = None
self._displacements: Optional[Dict] = None
def can_generate(self) -> bool:
"""Check if OP2 and geometry files exist."""
search_paths = [
self.results_path,
self.study_path / "2_iterations",
self.setup_path / "model",
]
for search_path in search_paths:
if not search_path.exists():
continue
op2_files = list(search_path.glob("**/*solution*.op2"))
if not op2_files:
op2_files = list(search_path.glob("**/*.op2"))
if op2_files:
self.op2_path = max(op2_files, key=lambda p: p.stat().st_mtime)
break
if self.op2_path is None:
return False
try:
self.geo_path = self._find_geometry_file(self.op2_path)
return True
except FileNotFoundError:
return False
def _find_geometry_file(self, op2_path: Path) -> Path:
"""Find BDF/DAT geometry file for OP2."""
folder = op2_path.parent
base = op2_path.stem
for ext in ['.dat', '.bdf']:
cand = folder / (base + ext)
if cand.exists():
return cand
for f in folder.iterdir():
if f.suffix.lower() in ['.dat', '.bdf']:
return f
raise FileNotFoundError(f"No geometry file found for {op2_path}")
def _load_data(self):
"""Load geometry and displacement data."""
if self._node_geo is not None:
return
_load_dependencies()
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
# Read geometry
bdf = BDF()
bdf.read_bdf(str(self.geo_path))
self._node_geo = {int(nid): node.get_position()
for nid, node in bdf.nodes.items()}
# Read displacements
op2 = OP2()
op2.read_op2(str(self.op2_path))
if not op2.displacements:
raise RuntimeError("No displacement data in OP2")
self._displacements = {}
for key, darr in op2.displacements.items():
data = darr.data
dmat = data[0] if data.ndim == 3 else (data if data.ndim == 2 else None)
if dmat is None:
continue
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
isubcase = getattr(darr, 'isubcase', None)
label = str(isubcase) if isubcase else str(key)
self._displacements[label] = {
'node_ids': node_ids.astype(int),
'disp': dmat.copy()
}
def _build_full_data(
self,
label: str,
disp_unit: str = 'mm'
) -> Dict[str, np.ndarray]:
"""Build complete data arrays including X, Y, Z displacements."""
nm_per_unit = 1e6 if disp_unit == 'mm' else 1e9
um_per_unit = nm_per_unit / 1000.0
data = self._displacements[label]
node_ids = data['node_ids']
dmat = data['disp']
x0, y0, z0 = [], [], []
dx, dy, dz = [], [], []
valid_nids = []
for nid, vec in zip(node_ids, dmat):
geo = self._node_geo.get(int(nid))
if geo is None:
continue
x0.append(geo[0])
y0.append(geo[1])
z0.append(geo[2])
dx.append(vec[0])
dy.append(vec[1])
dz.append(vec[2])
valid_nids.append(nid)
return {
'x0': np.array(x0),
'y0': np.array(y0),
'z0': np.array(z0),
'dx': np.array(dx),
'dy': np.array(dy),
'dz': np.array(dz),
'nids': np.array(valid_nids),
'nm_scale': nm_per_unit,
'um_scale': um_per_unit,
}
def _estimate_focal_length(self, x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float:
"""Estimate parabola focal length from geometry."""
r_squared = x**2 + y**2
mask = r_squared > 0
if not np.any(mask):
return 5000.0 # Default fallback
# Fit z = a * r² (assume centered parabola)
A = r_squared[mask].reshape(-1, 1)
b = z[mask]
try:
a, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
a = a[0]
except:
a = np.median(z[mask] / r_squared[mask])
if abs(a) < 1e-12:
return 5000.0
return 1.0 / (4.0 * abs(a))
def _compute_opd_comparison(
self,
data: Dict[str, np.ndarray],
n_modes: int,
filter_orders: int
) -> Dict[str, Any]:
"""Compute both methods and return comparison data."""
from ..extractors.extract_zernike import compute_zernike_coefficients
x0 = data['x0']
y0 = data['y0']
z0 = data['z0']
dx = data['dx']
dy = data['dy']
dz = data['dz']
nm_scale = data['nm_scale']
um_scale = data['um_scale']
# Estimate focal length
focal = self._estimate_focal_length(x0, y0, z0)
# === STANDARD METHOD (Z-only) ===
# Uses original (x0, y0) with Z-displacement
wfe_standard = dz * 2.0 * nm_scale # WFE = 2 * surface error
coeffs_std, R_std = compute_zernike_coefficients(x0, y0, wfe_standard, n_modes)
# Compute filtered RMS
x_c = x0 - np.mean(x0)
y_c = y0 - np.mean(y0)
r = np.hypot(x_c / R_std, y_c / R_std)
th = np.arctan2(y_c, x_c)
from ..extractors.extract_zernike import zernike_noll
Z_low = np.column_stack([zernike_noll(j, r, th) for j in range(1, filter_orders + 1)])
wfe_std_filtered = wfe_standard - Z_low @ coeffs_std[:filter_orders]
rms_std_filtered = float(np.sqrt(np.mean(wfe_std_filtered**2)))
rms_std_global = float(np.sqrt(np.mean(wfe_standard**2)))
# === RIGOROUS OPD METHOD ===
# Uses deformed (x0+dx, y0+dy) and computes true surface error
# The CORRECT formulation uses DIFFERENTIAL approach:
# surface_error = dz - delta_z_parabola
# where delta_z_parabola is the Z change needed to stay on ideal parabola
x_def = x0 + dx
y_def = y0 + dy
# Compute the CHANGE in parabola Z due to lateral displacement
# For parabola: z = -r²/(4f), so:
# Δz_parabola = z(x_def, y_def) - z(x0, y0)
# = -[(x0+dx)² + (y0+dy)² - x0² - y0²] / (4f)
r0_sq = x0**2 + y0**2
r_def_sq = x_def**2 + y_def**2
delta_r_sq = r_def_sq - r0_sq
# For concave mirror, z = -r²/(4f), so Δz = -Δr²/(4f)
delta_z_parabola = -delta_r_sq / (4.0 * focal)
# TRUE surface error = actual dz - expected dz (to stay on parabola)
surface_error = dz - delta_z_parabola
wfe_opd = surface_error * 2.0 * nm_scale
coeffs_opd, R_opd = compute_zernike_coefficients(x_def, y_def, wfe_opd, n_modes)
x_c_def = x_def - np.mean(x_def)
y_c_def = y_def - np.mean(y_def)
r_def = np.hypot(x_c_def / R_opd, y_c_def / R_opd)
th_def = np.arctan2(y_c_def, x_c_def)
Z_low_opd = np.column_stack([zernike_noll(j, r_def, th_def) for j in range(1, filter_orders + 1)])
wfe_opd_filtered = wfe_opd - Z_low_opd @ coeffs_opd[:filter_orders]
rms_opd_filtered = float(np.sqrt(np.mean(wfe_opd_filtered**2)))
rms_opd_global = float(np.sqrt(np.mean(wfe_opd**2)))
# === LATERAL DISPLACEMENT ===
lateral_disp = np.sqrt(dx**2 + dy**2) * um_scale # in µm
# === DIFFERENCE ANALYSIS ===
# Where methods disagree (use original coords for visualization)
wfe_difference = wfe_opd - wfe_standard # Could be different due to coordinate system
# More meaningful: compare the residuals
return {
# Coordinates for plotting
'x_plot': x0,
'y_plot': y0,
# Standard method
'wfe_standard': wfe_standard,
'wfe_std_filtered': wfe_std_filtered,
'rms_std_global': rms_std_global,
'rms_std_filtered': rms_std_filtered,
# OPD method
'wfe_opd': wfe_opd,
'wfe_opd_filtered': wfe_opd_filtered,
'rms_opd_global': rms_opd_global,
'rms_opd_filtered': rms_opd_filtered,
'x_deformed': x_def,
'y_deformed': y_def,
# Lateral displacement
'lateral_disp_um': lateral_disp,
'max_lateral_um': float(np.max(lateral_disp)),
'rms_lateral_um': float(np.sqrt(np.mean(lateral_disp**2))),
'p99_lateral_um': float(np.percentile(lateral_disp, 99)),
# Deltas
'delta_global': rms_opd_global - rms_std_global,
'delta_filtered': rms_opd_filtered - rms_std_filtered,
'pct_diff_filtered': 100.0 * (rms_opd_filtered - rms_std_filtered) / rms_std_filtered if rms_std_filtered > 0 else 0.0,
# Parabola info
'focal_length': focal,
}
def _get_recommendation(self, comparison: Dict[str, Any]) -> Tuple[str, str]:
"""Generate recommendation based on comparison results."""
max_lateral = comparison['max_lateral_um']
pct_diff = abs(comparison['pct_diff_filtered'])
if max_lateral > 10.0:
severity = "CRITICAL"
msg = (f"Large lateral displacements detected (max {max_lateral:.1f} µm). "
f"Methods differ by {pct_diff:.1f}%. "
f"You MUST use OPD method for accurate optimization.")
elif max_lateral > 1.0:
severity = "RECOMMENDED"
msg = (f"Significant lateral displacements (max {max_lateral:.2f} µm). "
f"Methods differ by {pct_diff:.1f}%. "
f"OPD method recommended for better accuracy.")
elif max_lateral > 0.1:
severity = "OPTIONAL"
msg = (f"Small lateral displacements (max {max_lateral:.3f} µm). "
f"Methods differ by {pct_diff:.2f}%. "
f"OPD method provides minor improvement.")
else:
severity = "EQUIVALENT"
msg = (f"Negligible lateral displacements (max {max_lateral:.4f} µm). "
f"Both methods give essentially identical results.")
return severity, msg
def _generate_html(
self,
title: str,
comparison: Dict[str, Any],
config: Dict[str, Any]
) -> str:
"""Generate comparison HTML with multiple views."""
_load_dependencies()
downsample = config.get('plot_downsample', 15000)
colorscale_wfe = config.get('colorscale_wfe', 'RdBu')
colorscale_lateral = config.get('colorscale_lateral', 'Hot')
amp = config.get('amp', 0.5)
x = comparison['x_plot']
y = comparison['y_plot']
lateral = comparison['lateral_disp_um']
wfe_std = comparison['wfe_std_filtered']
wfe_opd = comparison['wfe_opd_filtered']
# Downsample if needed
n = len(x)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
x, y = x[sel], y[sel]
lateral = lateral[sel]
wfe_std = wfe_std[sel]
wfe_opd = wfe_opd[sel]
# Get recommendation
severity, recommendation = self._get_recommendation(comparison)
severity_color = {
'CRITICAL': '#ef4444',
'RECOMMENDED': '#f59e0b',
'OPTIONAL': '#3b82f6',
'EQUIVALENT': '#10b981'
}.get(severity, '#6b7280')
# Create subplots: 2 rows, 2 cols
# Row 1: Lateral displacement map, WFE difference map
# Row 2: Comparison table, Recommendation
fig = _make_subplots(
rows=2, cols=2,
specs=[
[{"type": "scene"}, {"type": "scene"}],
[{"type": "table", "colspan": 2}, None]
],
row_heights=[0.65, 0.35],
column_widths=[0.5, 0.5],
vertical_spacing=0.08,
horizontal_spacing=0.05,
subplot_titles=[
"<b>Lateral Displacement Magnitude (µm)</b>",
"<b>Filtered WFE Comparison (nm)</b>",
"<b>Method Comparison</b>"
]
)
# === Plot 1: Lateral displacement map ===
try:
tri = _Triangulation(x, y)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
fig.add_trace(_go.Mesh3d(
x=x, y=y, z=lateral * amp * 100, # Scale for visibility
i=i, j=j, k=k,
intensity=lateral,
colorscale=colorscale_lateral,
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
showscale=True,
colorbar=dict(
title=dict(text="Lateral (µm)", side="right"),
x=0.42, thickness=15, len=0.5
),
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>Lateral: %{intensity:.3f} µm<extra></extra>"
), row=1, col=1)
except:
fig.add_trace(_go.Scatter3d(
x=x, y=y, z=lateral * amp * 100,
mode='markers',
marker=dict(size=2, color=lateral, colorscale=colorscale_lateral, showscale=True)
), row=1, col=1)
# === Plot 2: WFE comparison (show OPD method) ===
try:
tri = _Triangulation(x, y)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
fig.add_trace(_go.Mesh3d(
x=x, y=y, z=wfe_opd * amp,
i=i, j=j, k=k,
intensity=wfe_opd,
colorscale='Turbo',
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
showscale=True,
colorbar=dict(
title=dict(text="WFE (nm)", side="right"),
x=1.0, thickness=15, len=0.5
),
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>WFE: %{intensity:.2f} nm<extra></extra>"
), row=1, col=2)
except:
fig.add_trace(_go.Scatter3d(
x=x, y=y, z=wfe_opd * amp,
mode='markers',
marker=dict(size=2, color=wfe_opd, colorscale='Turbo', showscale=True)
), row=1, col=2)
# Configure 3D scenes
for scene_name in ['scene', 'scene2']:
fig.update_layout(**{
scene_name: dict(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8)),
xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
zaxis=dict(title="", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
aspectmode='data',
)
})
# === Comparison Table ===
delta_sign = "+" if comparison['delta_filtered'] > 0 else ""
fig.add_trace(_go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Standard (Z-only)</b>", "<b>OPD (Rigorous)</b>", "<b>Difference</b>"],
align="left",
fill_color='#1f2937',
font=dict(color='white', size=12),
height=30
),
cells=dict(
values=[
# Metric names
["Global RMS (nm)", "Filtered RMS (nm)", "Max Lateral Disp (µm)",
"RMS Lateral Disp (µm)", "P99 Lateral Disp (µm)", "Focal Length (mm)",
f"<b>RECOMMENDATION: {severity}</b>"],
# Standard method
[f"{comparison['rms_std_global']:.2f}", f"{comparison['rms_std_filtered']:.2f}",
"N/A", "N/A", "N/A", f"{comparison['focal_length']:.1f}", ""],
# OPD method
[f"{comparison['rms_opd_global']:.2f}", f"{comparison['rms_opd_filtered']:.2f}",
f"{comparison['max_lateral_um']:.3f}", f"{comparison['rms_lateral_um']:.3f}",
f"{comparison['p99_lateral_um']:.3f}", f"{comparison['focal_length']:.1f}", ""],
# Difference
[f"{delta_sign}{comparison['delta_global']:.2f}",
f"{delta_sign}{comparison['delta_filtered']:.2f} ({delta_sign}{comparison['pct_diff_filtered']:.1f}%)",
"-", "-", "-", "-", recommendation],
],
align="left",
fill_color=[['#374151']*7, ['#374151']*7, ['#374151']*7,
['#374151']*6 + [severity_color]],
font=dict(color='white', size=11),
height=25
)
), row=2, col=1)
# Layout
fig.update_layout(
width=1600,
height=1000,
margin=dict(t=80, b=20, l=20, r=20),
paper_bgcolor='#111827',
plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(
text=f"<b>Atomizer Zernike Method Comparison - {title}</b><br>"
f"<span style='font-size:14px;color:{severity_color}'>{severity}: {recommendation[:80]}...</span>",
x=0.5,
font=dict(size=18)
)
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate(self, config: InsightConfig) -> InsightResult:
"""Generate method comparison visualization."""
self._load_data()
cfg = {**DEFAULT_CONFIG, **config.extra}
n_modes = cfg['n_modes']
filter_orders = cfg['filter_orders']
disp_unit = cfg['disp_unit']
# Find subcases
disps = self._displacements
available = list(disps.keys())
if not available:
return InsightResult(success=False, error="No subcases found in OP2")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = config.output_dir or self.insights_path
output_dir.mkdir(parents=True, exist_ok=True)
html_files = []
all_comparisons = {}
# Process each subcase
for label in available:
data = self._build_full_data(label, disp_unit)
comparison = self._compute_opd_comparison(data, n_modes, filter_orders)
title = f"Subcase {label}"
html = self._generate_html(title, comparison, cfg)
path = output_dir / f"zernike_opd_comparison_{timestamp}_{label}.html"
path.write_text(html, encoding='utf-8')
html_files.append(path)
severity, _ = self._get_recommendation(comparison)
all_comparisons[label] = {
'rms_std_filtered': comparison['rms_std_filtered'],
'rms_opd_filtered': comparison['rms_opd_filtered'],
'pct_diff': comparison['pct_diff_filtered'],
'max_lateral_um': comparison['max_lateral_um'],
'severity': severity,
}
# Determine overall recommendation
severities = [c['severity'] for c in all_comparisons.values()]
if 'CRITICAL' in severities:
overall = "CRITICAL: Use OPD method for all optimization"
elif 'RECOMMENDED' in severities:
overall = "RECOMMENDED: Switch to OPD method for better accuracy"
elif 'OPTIONAL' in severities:
overall = "OPTIONAL: OPD method provides minor improvement"
else:
overall = "EQUIVALENT: Standard method is fine for this study"
return InsightResult(
success=True,
html_path=html_files[0],
summary={
'html_files': [str(p) for p in html_files],
'comparisons': all_comparisons,
'overall_recommendation': overall,
}
)

View File

@@ -7,6 +7,10 @@ Zernike polynomial decomposition. Generates three views:
- 60 deg vs 20 deg (operational tilt comparison)
- 90 deg Manufacturing (absolute with optician workload metrics)
Supports two WFE computation methods:
- Standard: Z-displacement only at original (x, y) coordinates
- OPD: Accounts for lateral (X, Y) displacement via interpolation (RECOMMENDED)
Applicable to: Mirror optimization studies with multi-subcase gravity loads.
"""
@@ -19,6 +23,9 @@ from numpy.linalg import LinAlgError
from .base import StudyInsight, InsightConfig, InsightResult, register_insight
# Lazy import for OPD extractor
_ZernikeOPDExtractor = None
# Lazy imports to avoid startup overhead
_plotly_loaded = False
_go = None
@@ -30,18 +37,20 @@ _BDF = None
def _load_dependencies():
"""Lazy load heavy dependencies."""
global _plotly_loaded, _go, _make_subplots, _Triangulation, _OP2, _BDF
global _plotly_loaded, _go, _make_subplots, _Triangulation, _OP2, _BDF, _ZernikeOPDExtractor
if not _plotly_loaded:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.tri import Triangulation
from pyNastran.op2.op2 import OP2
from pyNastran.bdf.bdf import BDF
from ..extractors.extract_zernike_figure import ZernikeOPDExtractor
_go = go
_make_subplots = make_subplots
_Triangulation = Triangulation
_OP2 = OP2
_BDF = BDF
_ZernikeOPDExtractor = ZernikeOPDExtractor
_plotly_loaded = True
@@ -144,10 +153,10 @@ def compute_zernike_coeffs(
# Configuration Defaults
# ============================================================================
DEFAULT_CONFIG = {
'n_modes': 50,
'n_modes': 36, # Reduced from 50 for faster computation (covers through 7th order)
'amp': 0.5, # Visual deformation scale
'pancake': 3.0, # Z-axis range multiplier
'plot_downsample': 10000,
'plot_downsample': 8000, # Reduced from 10000 for faster rendering
'filter_low_orders': 4, # Piston, tip, tilt, defocus
'colorscale': 'Turbo',
'disp_unit': 'mm',
@@ -170,6 +179,7 @@ class ZernikeWFEInsight(StudyInsight):
insight_type = "zernike_wfe"
name = "Zernike WFE Analysis"
description = "3D wavefront error surface with Zernike decomposition"
category = "optical"
applicable_to = ["mirror", "optics", "wfe"]
required_files = ["*.op2"]
@@ -179,29 +189,59 @@ class ZernikeWFEInsight(StudyInsight):
self.geo_path: Optional[Path] = None
self._node_geo: Optional[Dict] = None
self._displacements: Optional[Dict] = None
self._opd_extractor: Optional[Any] = None # Cached OPD extractor
def can_generate(self) -> bool:
"""Check if OP2 and geometry files exist."""
# Look for OP2 in results or iterations
search_paths = [
self.results_path,
self.study_path / "2_iterations",
self.setup_path / "model",
"""Check if OP2 and geometry files exist.
Uses fast non-recursive search to avoid slow glob operations.
"""
# Fast search order: best_design_archive first, then iterations
search_locations = [
(self.study_path / "3_results" / "best_design_archive", False), # (path, is_iter_parent)
(self.study_path / "2_iterations", True), # iterations have subdirs
(self.setup_path / "model", False),
]
for search_path in search_paths:
op2_candidates = []
for search_path, is_iter_parent in search_locations:
if not search_path.exists():
continue
op2_files = list(search_path.glob("**/*solution*.op2"))
if not op2_files:
op2_files = list(search_path.glob("**/*.op2"))
if op2_files:
self.op2_path = max(op2_files, key=lambda p: p.stat().st_mtime)
break
if self.op2_path is None:
if is_iter_parent:
# For iterations, check each subdir (one level only)
try:
for subdir in search_path.iterdir():
if subdir.is_dir():
for f in subdir.iterdir():
if f.suffix.lower() == '.op2':
op2_candidates.append(f)
except (PermissionError, OSError):
continue
else:
# Direct check (non-recursive)
try:
for f in search_path.iterdir():
if f.suffix.lower() == '.op2':
op2_candidates.append(f)
# Also check one level down for best_design_archive
elif f.is_dir():
for sub in f.iterdir():
if sub.suffix.lower() == '.op2':
op2_candidates.append(sub)
except (PermissionError, OSError):
continue
if op2_candidates:
break # Found some, stop searching
if not op2_candidates:
return False
# Pick newest OP2
self.op2_path = max(op2_candidates, key=lambda p: p.stat().st_mtime)
# Find geometry
try:
self.geo_path = self._find_geometry_file(self.op2_path)
@@ -262,12 +302,16 @@ class ZernikeWFEInsight(StudyInsight):
'disp': dmat.copy()
}
def _build_wfe_arrays(
def _build_wfe_arrays_standard(
self,
label: str,
disp_unit: str = 'mm'
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Build X, Y, WFE arrays for a subcase."""
) -> Dict[str, np.ndarray]:
"""Build X, Y, WFE arrays using standard Z-only method.
This is the original method that uses only Z-displacement
at the original (x, y) coordinates.
"""
nm_per_unit = 1e6 if disp_unit == 'mm' else 1e9
data = self._displacements[label]
@@ -276,6 +320,8 @@ class ZernikeWFEInsight(StudyInsight):
X, Y, WFE = [], [], []
valid_nids = []
dx_arr, dy_arr, dz_arr = [], [], []
for nid, vec in zip(node_ids, dmat):
geo = self._node_geo.get(int(nid))
if geo is None:
@@ -285,27 +331,148 @@ class ZernikeWFEInsight(StudyInsight):
wfe = vec[2] * 2.0 * nm_per_unit # Z-disp to WFE
WFE.append(wfe)
valid_nids.append(nid)
dx_arr.append(vec[0])
dy_arr.append(vec[1])
dz_arr.append(vec[2])
return (np.array(X), np.array(Y), np.array(WFE), np.array(valid_nids))
return {
'X': np.array(X),
'Y': np.array(Y),
'WFE': np.array(WFE),
'node_ids': np.array(valid_nids),
'dx': np.array(dx_arr),
'dy': np.array(dy_arr),
'dz': np.array(dz_arr),
'lateral_disp': np.sqrt(np.array(dx_arr)**2 + np.array(dy_arr)**2),
'method': 'standard',
}
def _build_wfe_arrays_opd(
self,
label: str,
disp_unit: str = 'mm'
) -> Dict[str, np.ndarray]:
"""Build X, Y, WFE arrays using OPD method (accounts for lateral displacement).
This is the RECOMMENDED method that:
1. Uses deformed (x+dx, y+dy) coordinates for Zernike fitting
2. Computes true surface error via interpolation of undeformed geometry
Uses cached OPD extractor to avoid re-reading OP2/BDF files for each subcase.
"""
_load_dependencies()
# Reuse cached extractor to avoid re-reading files
if self._opd_extractor is None:
self._opd_extractor = _ZernikeOPDExtractor(
self.op2_path,
bdf_path=self.geo_path,
displacement_unit=disp_unit
)
opd_data = self._opd_extractor._build_figure_opd_data(label)
return {
'X': opd_data['x_deformed'],
'Y': opd_data['y_deformed'],
'WFE': opd_data['wfe_nm'],
'node_ids': opd_data['node_ids'],
'dx': opd_data['dx'],
'dy': opd_data['dy'],
'dz': opd_data['dz'],
'lateral_disp': opd_data['lateral_disp'],
'x_original': opd_data['x_original'],
'y_original': opd_data['y_original'],
'method': 'opd',
}
def _build_wfe_arrays(
self,
label: str,
disp_unit: str = 'mm',
method: str = 'opd'
) -> Dict[str, np.ndarray]:
"""Build X, Y, WFE arrays for a subcase.
Args:
label: Subcase label
disp_unit: Displacement unit ('mm' or 'm')
method: 'opd' (recommended) or 'standard'
Returns:
Dict with X, Y, WFE, node_ids, dx, dy, dz, lateral_disp arrays
"""
if method == 'opd':
return self._build_wfe_arrays_opd(label, disp_unit)
else:
return self._build_wfe_arrays_standard(label, disp_unit)
def _compute_relative_wfe(
self,
X1, Y1, WFE1, nids1,
X2, Y2, WFE2, nids2
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute WFE1 - WFE2 for common nodes."""
data1: Dict[str, np.ndarray],
data2: Dict[str, np.ndarray]
) -> Dict[str, np.ndarray]:
"""Compute relative displacement and WFE (target - reference) for common nodes.
Args:
data1: Target subcase data dict
data2: Reference subcase data dict
Returns:
Dict with relative WFE arrays AND relative displacement components (dx, dy, dz)
"""
X1, Y1, WFE1, nids1 = data1['X'], data1['Y'], data1['WFE'], data1['node_ids']
X2, Y2, WFE2, nids2 = data2['X'], data2['Y'], data2['WFE'], data2['node_ids']
# Build reference maps for all data
ref_map = {int(nid): (x, y, w) for nid, x, y, w in zip(nids2, X2, Y2, WFE2)}
X_rel, Y_rel, WFE_rel = [], [], []
# Maps for displacement components
dx1_map = {int(nid): d for nid, d in zip(data1['node_ids'], data1['dx'])}
dy1_map = {int(nid): d for nid, d in zip(data1['node_ids'], data1['dy'])}
dz1_map = {int(nid): d for nid, d in zip(data1['node_ids'], data1['dz'])}
dx2_map = {int(nid): d for nid, d in zip(data2['node_ids'], data2['dx'])}
dy2_map = {int(nid): d for nid, d in zip(data2['node_ids'], data2['dy'])}
dz2_map = {int(nid): d for nid, d in zip(data2['node_ids'], data2['dz'])}
X_rel, Y_rel, WFE_rel, nids_rel = [], [], [], []
dx_rel, dy_rel, dz_rel = [], [], []
lateral_rel = []
for nid, x, y, w in zip(nids1, X1, Y1, WFE1):
nid = int(nid)
if nid in ref_map:
_, _, w_ref = ref_map[nid]
X_rel.append(x)
Y_rel.append(y)
WFE_rel.append(w - w_ref)
if nid not in ref_map:
continue
if nid not in dx2_map:
continue
return np.array(X_rel), np.array(Y_rel), np.array(WFE_rel)
_, _, w_ref = ref_map[nid]
X_rel.append(x)
Y_rel.append(y)
WFE_rel.append(w - w_ref)
nids_rel.append(nid)
# Compute relative displacements (target - reference)
dx_rel.append(dx1_map[nid] - dx2_map[nid])
dy_rel.append(dy1_map[nid] - dy2_map[nid])
dz_rel.append(dz1_map[nid] - dz2_map[nid])
# Relative lateral displacement magnitude
lat1 = np.sqrt(dx1_map[nid]**2 + dy1_map[nid]**2)
lat2 = np.sqrt(dx2_map[nid]**2 + dy2_map[nid]**2)
lateral_rel.append(lat1 - lat2)
return {
'X': np.array(X_rel),
'Y': np.array(Y_rel),
'WFE': np.array(WFE_rel),
'node_ids': np.array(nids_rel),
'dx': np.array(dx_rel), # Relative X displacement (mm)
'dy': np.array(dy_rel), # Relative Y displacement (mm)
'dz': np.array(dz_rel), # Relative Z displacement (mm)
'lateral_disp': np.array(lateral_rel) if lateral_rel else np.zeros(len(X_rel)),
'method': data1.get('method', 'unknown'),
}
def _compute_metrics(
self,
@@ -585,8 +752,430 @@ class ZernikeWFEInsight(StudyInsight):
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate_lateral_map_html(
self,
title: str,
data: Dict[str, np.ndarray],
config: Dict,
) -> str:
"""Generate HTML for lateral displacement visualization.
Shows a 3D surface colored by lateral displacement magnitude,
with metrics table showing max/RMS/mean lateral displacement.
"""
_load_dependencies()
X = data['X']
Y = data['Y']
lateral_disp = data['lateral_disp'] # in mm
downsample = config.get('plot_downsample', 10000)
# Convert to µm for display
lateral_um = lateral_disp * 1000.0 # mm to µm
# Downsample
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp, Lp = X[sel], Y[sel], lateral_um[sel]
else:
Xp, Yp, Lp = X, Y, lateral_um
# Build mesh
mesh_traces = []
try:
tri = _Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
mesh_traces.append(_go.Mesh3d(
x=Xp, y=Yp, z=Lp,
i=i, j=j, k=k,
intensity=Lp,
colorscale='Viridis',
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
lightposition=dict(x=100, y=200, z=300),
showscale=True,
colorbar=dict(title=dict(text="Lateral (µm)", side="right"),
thickness=15, len=0.6, tickformat=".3f"),
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>Lateral: %{z:.4f} µm<extra></extra>"
))
except Exception:
pass
if not mesh_traces:
mesh_traces.append(_go.Scatter3d(
x=Xp, y=Yp, z=Lp,
mode='markers',
marker=dict(size=2, color=Lp, colorscale='Viridis', showscale=True),
showlegend=False
))
# Create figure with subplots
fig = _make_subplots(
rows=2, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}]],
row_heights=[0.75, 0.25],
vertical_spacing=0.03,
subplot_titles=[
f"<b>Lateral Displacement Map - {title}</b>",
"<b>Lateral Displacement Statistics</b>"
]
)
for tr in mesh_traces:
fig.add_trace(tr, row=1, col=1)
# Stats
max_lat = float(np.max(lateral_um))
rms_lat = float(np.sqrt(np.mean(lateral_um**2)))
mean_lat = float(np.mean(lateral_um))
min_lat = float(np.min(lateral_um))
fig.add_trace(_go.Table(
header=dict(values=["<b>Statistic</b>", "<b>Value (µm)</b>"],
align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[
["Max Lateral", "RMS Lateral", "Mean Lateral", "Min Lateral"],
[f"{max_lat:.4f}", f"{rms_lat:.4f}", f"{mean_lat:.4f}", f"{min_lat:.4f}"]
], align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
# Configure 3D scene
max_z = float(np.max(Lp)) if Lp.size else 1.0
fig.update_scenes(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
zaxis=dict(title="Lateral (µm)",
range=[0, max_z * 1.2],
showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
aspectmode='manual',
aspectratio=dict(x=1, y=1, z=0.4)
)
fig.update_layout(
width=1200, height=900,
margin=dict(t=60, b=20, l=20, r=20),
paper_bgcolor='#111827', plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(text=f"<b>Lateral Displacement Analysis - {title}</b>",
x=0.5, font=dict(size=18))
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate_dual_method_view_html(
self,
title: str,
data_std: Dict[str, np.ndarray],
data_opd: Dict[str, np.ndarray],
rms_std: Dict,
rms_opd: Dict,
config: Dict,
is_relative: bool = False,
ref_title: str = "20 deg",
) -> str:
"""Generate HTML with toggle between Standard/OPD methods AND X/Y/Z displacement components.
For relative views, provides toggles to see:
- WFE (Z): The main wavefront error view (default)
- ΔX: Relative X displacement between subcases
- ΔY: Relative Y displacement between subcases
- ΔZ: Relative Z displacement between subcases
"""
_load_dependencies()
n_modes = config.get('n_modes', 50)
amp = config.get('amp', 0.5)
pancake = config.get('pancake', 3.0)
downsample = config.get('plot_downsample', 10000)
colorscale = config.get('colorscale', 'Turbo')
title_suffix = f" (relative to {ref_title})" if is_relative else " (absolute)"
# Build traces for both methods (WFE view)
traces_std_wfe = []
traces_opd_wfe = []
# Build displacement component traces (OPD method only, for relative views)
traces_dx = []
traces_dy = []
traces_dz = []
# Helper to build mesh trace
def build_mesh_trace(Xp, Yp, Zp, colorscale, label, unit, colorbar_title):
try:
tri = _Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i, j, k = tri.triangles.T
return _go.Mesh3d(
x=Xp, y=Yp, z=Zp,
i=i, j=j, k=k,
intensity=Zp,
colorscale=colorscale,
opacity=1.0,
flatshading=False,
lighting=dict(ambient=0.4, diffuse=0.8, specular=0.3),
lightposition=dict(x=100, y=200, z=300),
showscale=True,
colorbar=dict(title=dict(text=colorbar_title, side="right"),
thickness=15, len=0.6, tickformat=".2f" if 'µm' in unit else ".1f"),
hovertemplate=f"{label}<br>X: %{{x:.1f}}<br>Y: %{{y:.1f}}<br>Value: %{{z:.3f}} {unit}<extra></extra>"
)
except Exception:
pass
return _go.Scatter3d(
x=Xp, y=Yp, z=Zp,
mode='markers',
marker=dict(size=2, color=Zp, colorscale=colorscale, showscale=True),
showlegend=False
)
# Build WFE traces for both methods
for data, rms_data, traces, method_name in [
(data_std, rms_std, traces_std_wfe, 'Standard'),
(data_opd, rms_opd, traces_opd_wfe, 'OPD')
]:
X, Y = data['X'], data['Y']
W_res_filt = rms_data['W_res_filt']
# Downsample
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp, Wp = X[sel], Y[sel], W_res_filt[sel]
else:
Xp, Yp, Wp = X, Y, W_res_filt
res_amp = amp * Wp
traces.append(build_mesh_trace(Xp, Yp, res_amp, colorscale, method_name, 'nm', f'{method_name} WFE (nm)'))
# Build displacement component traces (for relative views)
has_displacement_data = is_relative and 'dx' in data_opd and len(data_opd.get('dx', [])) > 0
if has_displacement_data:
X, Y = data_opd['X'], data_opd['Y']
dx_mm = data_opd['dx'] # mm
dy_mm = data_opd['dy'] # mm
dz_mm = data_opd['dz'] # mm
# Convert to µm for display
dx_um = dx_mm * 1000.0
dy_um = dy_mm * 1000.0
dz_um = dz_mm * 1000.0
n = len(X)
if n > downsample:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=downsample, replace=False)
Xp, Yp = X[sel], Y[sel]
dxp, dyp, dzp = dx_um[sel], dy_um[sel], dz_um[sel]
else:
Xp, Yp = X, Y
dxp, dyp, dzp = dx_um, dy_um, dz_um
# Apply visual amplification for displacement views
disp_amp = amp * 1000.0 # Scale factor for µm display
traces_dx.append(build_mesh_trace(Xp, Yp, dxp * amp, 'RdBu_r', 'ΔX Displacement', 'µm', 'ΔX (µm)'))
traces_dy.append(build_mesh_trace(Xp, Yp, dyp * amp, 'RdBu_r', 'ΔY Displacement', 'µm', 'ΔY (µm)'))
traces_dz.append(build_mesh_trace(Xp, Yp, dzp * amp, 'RdBu_r', 'ΔZ Displacement', 'µm', 'ΔZ (µm)'))
# Create figure
fig = _make_subplots(
rows=2, cols=1,
specs=[[{"type": "scene"}], [{"type": "table"}]],
row_heights=[0.65, 0.35],
vertical_spacing=0.05,
subplot_titles=[
f"<b>Surface Analysis - {title}{title_suffix}</b>",
"<b>Metrics Comparison</b>"
]
)
# Add all traces in order: [std_wfe, opd_wfe, dx, dy, dz, table]
# Start with OPD WFE visible (default view)
for tr in traces_std_wfe:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
for tr in traces_opd_wfe:
tr.visible = True # Default view
fig.add_trace(tr, row=1, col=1)
for tr in traces_dx:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
for tr in traces_dy:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
for tr in traces_dz:
tr.visible = False
fig.add_trace(tr, row=1, col=1)
# Compute lateral stats (from OPD data)
lateral_um = data_opd.get('lateral_disp', np.zeros(1)) * 1000.0
max_lat = float(np.max(np.abs(lateral_um)))
rms_lat = float(np.sqrt(np.mean(lateral_um**2)))
# Compute % difference
std_filt = rms_std['filtered_rms']
opd_filt = rms_opd['filtered_rms']
pct_diff = 100.0 * (opd_filt - std_filt) / std_filt if std_filt > 0 else 0.0
# Displacement stats (for relative views)
disp_stats_rows = []
if has_displacement_data:
dx_um = data_opd['dx'] * 1000.0
dy_um = data_opd['dy'] * 1000.0
dz_um = data_opd['dz'] * 1000.0
disp_stats_rows = [
"ΔX RMS (µm)", "ΔY RMS (µm)", "ΔZ RMS (µm)"
]
disp_stats_values_std = ["", "", ""]
disp_stats_values_opd = [
f"{float(np.sqrt(np.mean(dx_um**2))):.4f}",
f"{float(np.sqrt(np.mean(dy_um**2))):.4f}",
f"{float(np.sqrt(np.mean(dz_um**2))):.4f}"
]
disp_stats_diff = ["", "", ""]
# Comparison table
table_headers = ["<b>Metric</b>", "<b>Standard (Z-only)</b>", "<b>OPD (X,Y,Z)</b>", "<b>Difference</b>"]
table_rows = ["Global RMS (nm)", "Filtered RMS (nm)", "Method", "Max Lateral (µm)", "RMS Lateral (µm)"]
table_std = [f"{rms_std['global_rms']:.2f}", f"{std_filt:.2f}", "Z-displacement only", "", ""]
table_opd = [f"{rms_opd['global_rms']:.2f}", f"{opd_filt:.2f}", "Deformed coords + OPD", f"{max_lat:.3f}", f"{rms_lat:.3f}"]
table_diff = ["", f"{pct_diff:+.2f}%", "← RECOMMENDED", "", ""]
if has_displacement_data:
table_rows.extend(disp_stats_rows)
table_std.extend(disp_stats_values_std)
table_opd.extend(disp_stats_values_opd)
table_diff.extend(disp_stats_diff)
fig.add_trace(_go.Table(
header=dict(values=table_headers, align="left", fill_color='#1f2937', font=dict(color='white')),
cells=dict(values=[table_rows, table_std, table_opd, table_diff],
align="left", fill_color='#374151', font=dict(color='white'))
), row=2, col=1)
# Build visibility arrays for toggles
n_std_wfe = len(traces_std_wfe)
n_opd_wfe = len(traces_opd_wfe)
n_dx = len(traces_dx)
n_dy = len(traces_dy)
n_dz = len(traces_dz)
# Total traces before table: n_std_wfe + n_opd_wfe + n_dx + n_dy + n_dz
# Then table is last
def make_visibility(show_std_wfe=False, show_opd_wfe=False, show_dx=False, show_dy=False, show_dz=False):
vis = []
vis.extend([show_std_wfe] * n_std_wfe)
vis.extend([show_opd_wfe] * n_opd_wfe)
vis.extend([show_dx] * n_dx)
vis.extend([show_dy] * n_dy)
vis.extend([show_dz] * n_dz)
vis.append(True) # Table always visible
return vis
# Build button definitions
buttons_method = [
dict(label="OPD Method (Recommended)",
method="update",
args=[{"visible": make_visibility(show_opd_wfe=True)}]),
dict(label="Standard Method (Z-only)",
method="update",
args=[{"visible": make_visibility(show_std_wfe=True)}]),
]
buttons_component = [
dict(label="WFE (Z)",
method="update",
args=[{"visible": make_visibility(show_opd_wfe=True)}]),
]
if has_displacement_data:
buttons_component.extend([
dict(label="ΔX Disp",
method="update",
args=[{"visible": make_visibility(show_dx=True)}]),
dict(label="ΔY Disp",
method="update",
args=[{"visible": make_visibility(show_dy=True)}]),
dict(label="ΔZ Disp",
method="update",
args=[{"visible": make_visibility(show_dz=True)}]),
])
# Create update menus
updatemenus = [
dict(
type="buttons",
direction="right",
x=0.0, y=1.15,
xanchor="left",
showactive=True,
buttons=buttons_method,
font=dict(size=11),
pad=dict(r=10, t=10),
),
]
if has_displacement_data:
updatemenus.append(
dict(
type="buttons",
direction="right",
x=0.55, y=1.15,
xanchor="left",
showactive=True,
buttons=buttons_component,
font=dict(size=11),
pad=dict(r=10, t=10),
)
)
fig.update_layout(updatemenus=updatemenus)
# Configure 3D scene
max_amp_opd = float(np.max(np.abs(amp * rms_opd['W_res_filt']))) if rms_opd['W_res_filt'].size else 1.0
fig.update_scenes(
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
xaxis=dict(title="X (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
yaxis=dict(title="Y (mm)", showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
zaxis=dict(title="Value",
range=[-max_amp_opd * pancake, max_amp_opd * pancake],
showgrid=True, gridcolor='rgba(128,128,128,0.3)'),
aspectmode='manual',
aspectratio=dict(x=1, y=1, z=0.4)
)
fig.update_layout(
width=1400, height=1100,
margin=dict(t=100, b=20, l=20, r=20),
paper_bgcolor='#111827', plot_bgcolor='#1f2937',
font=dict(color='white'),
title=dict(text=f"<b>Atomizer Zernike Analysis - {title}</b>",
x=0.5, font=dict(size=18)),
annotations=[
dict(text="<b>Method:</b>", x=0.0, y=1.18, xref="paper", yref="paper",
showarrow=False, font=dict(size=12, color='white'), xanchor='left'),
dict(text="<b>View:</b>", x=0.55, y=1.18, xref="paper", yref="paper",
showarrow=False, font=dict(size=12, color='white'), xanchor='left') if has_displacement_data else {},
]
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
def _generate(self, config: InsightConfig) -> InsightResult:
"""Generate all Zernike WFE views."""
"""Generate all Zernike WFE views with Standard/OPD toggle and lateral maps.
Performance optimizations:
- Uses cached OPD extractor (reads OP2/BDF only once)
- Loads all subcase data upfront to minimize I/O
- Standard method reuses geometry from already-loaded data
"""
self._load_data()
# Merge config
@@ -626,66 +1215,110 @@ class ZernikeWFEInsight(StudyInsight):
html_files = []
summary = {}
# Load data for BOTH methods - OPD method shares cached extractor
# Standard method uses already-loaded _node_geo and _displacements
# Reference: 20 deg
X_ref, Y_ref, WFE_ref, nids_ref = self._build_wfe_arrays(sc_map['20'], disp_unit)
rms_ref = self._compute_metrics(X_ref, Y_ref, WFE_ref, n_modes, filter_orders)
data_ref_std = self._build_wfe_arrays_standard(sc_map['20'], disp_unit)
data_ref_opd = self._build_wfe_arrays_opd(sc_map['20'], disp_unit)
# 40 deg
data_40_std = self._build_wfe_arrays_standard(sc_map['40'], disp_unit)
data_40_opd = self._build_wfe_arrays_opd(sc_map['40'], disp_unit)
# 60 deg
data_60_std = self._build_wfe_arrays_standard(sc_map['60'], disp_unit)
data_60_opd = self._build_wfe_arrays_opd(sc_map['60'], disp_unit)
# 90 deg
X_90, Y_90, WFE_90, nids_90 = self._build_wfe_arrays(sc_map['90'], disp_unit)
rms_90 = self._compute_metrics(X_90, Y_90, WFE_90, n_modes, filter_orders)
mfg_metrics = self._compute_aberration_magnitudes(rms_90['coefficients'])
data_90_std = self._build_wfe_arrays_standard(sc_map['90'], disp_unit)
data_90_opd = self._build_wfe_arrays_opd(sc_map['90'], disp_unit)
# 40 deg vs 20 deg
X_40, Y_40, WFE_40, nids_40 = self._build_wfe_arrays(sc_map['40'], disp_unit)
X_40_rel, Y_40_rel, WFE_40_rel = self._compute_relative_wfe(
X_40, Y_40, WFE_40, nids_40, X_ref, Y_ref, WFE_ref, nids_ref)
rms_40_abs = self._compute_metrics(X_40, Y_40, WFE_40, n_modes, filter_orders)
rms_40_rel = self._compute_metrics(X_40_rel, Y_40_rel, WFE_40_rel, n_modes, filter_orders)
# =========================================
# 40 deg vs 20 deg (with dual method toggle)
# =========================================
rel_40_std = self._compute_relative_wfe(data_40_std, data_ref_std)
rel_40_opd = self._compute_relative_wfe(data_40_opd, data_ref_opd)
html_40 = self._generate_view_html(
"40 deg", X_40_rel, Y_40_rel, WFE_40_rel, rms_40_rel, cfg,
is_relative=True, ref_title="20 deg",
abs_pair=(rms_40_abs['global_rms'], rms_40_abs['filtered_rms']))
rms_40_std = self._compute_metrics(rel_40_std['X'], rel_40_std['Y'], rel_40_std['WFE'],
n_modes, filter_orders)
rms_40_opd = self._compute_metrics(rel_40_opd['X'], rel_40_opd['Y'], rel_40_opd['WFE'],
n_modes, filter_orders)
html_40 = self._generate_dual_method_view_html(
"40 deg", rel_40_std, rel_40_opd, rms_40_std, rms_40_opd, cfg,
is_relative=True, ref_title="20 deg")
path_40 = output_dir / f"zernike_{timestamp}_40_vs_20.html"
path_40.write_text(html_40, encoding='utf-8')
html_files.append(path_40)
summary['40_vs_20_filtered_rms'] = rms_40_rel['filtered_rms']
# 60 deg vs 20 deg
X_60, Y_60, WFE_60, nids_60 = self._build_wfe_arrays(sc_map['60'], disp_unit)
X_60_rel, Y_60_rel, WFE_60_rel = self._compute_relative_wfe(
X_60, Y_60, WFE_60, nids_60, X_ref, Y_ref, WFE_ref, nids_ref)
rms_60_abs = self._compute_metrics(X_60, Y_60, WFE_60, n_modes, filter_orders)
rms_60_rel = self._compute_metrics(X_60_rel, Y_60_rel, WFE_60_rel, n_modes, filter_orders)
# Lateral map for 40 deg
html_40_lat = self._generate_lateral_map_html("40 deg", data_40_opd, cfg)
path_40_lat = output_dir / f"zernike_{timestamp}_40_lateral.html"
path_40_lat.write_text(html_40_lat, encoding='utf-8')
html_files.append(path_40_lat)
html_60 = self._generate_view_html(
"60 deg", X_60_rel, Y_60_rel, WFE_60_rel, rms_60_rel, cfg,
is_relative=True, ref_title="20 deg",
abs_pair=(rms_60_abs['global_rms'], rms_60_abs['filtered_rms']))
summary['40_vs_20_filtered_rms_std'] = rms_40_std['filtered_rms']
summary['40_vs_20_filtered_rms_opd'] = rms_40_opd['filtered_rms']
# =========================================
# 60 deg vs 20 deg (with dual method toggle)
# =========================================
rel_60_std = self._compute_relative_wfe(data_60_std, data_ref_std)
rel_60_opd = self._compute_relative_wfe(data_60_opd, data_ref_opd)
rms_60_std = self._compute_metrics(rel_60_std['X'], rel_60_std['Y'], rel_60_std['WFE'],
n_modes, filter_orders)
rms_60_opd = self._compute_metrics(rel_60_opd['X'], rel_60_opd['Y'], rel_60_opd['WFE'],
n_modes, filter_orders)
html_60 = self._generate_dual_method_view_html(
"60 deg", rel_60_std, rel_60_opd, rms_60_std, rms_60_opd, cfg,
is_relative=True, ref_title="20 deg")
path_60 = output_dir / f"zernike_{timestamp}_60_vs_20.html"
path_60.write_text(html_60, encoding='utf-8')
html_files.append(path_60)
summary['60_vs_20_filtered_rms'] = rms_60_rel['filtered_rms']
# 90 deg Manufacturing
X_90_rel, Y_90_rel, WFE_90_rel = self._compute_relative_wfe(
X_90, Y_90, WFE_90, nids_90, X_ref, Y_ref, WFE_ref, nids_ref)
rms_90_rel = self._compute_metrics(X_90_rel, Y_90_rel, WFE_90_rel, n_modes, filter_orders)
corr_abr = self._compute_aberration_magnitudes(rms_90_rel['coefficients'])
correction_metrics = {
'rms_filter_j1to3': rms_90_rel['rms_filter_j1to3'],
**corr_abr
}
# Lateral map for 60 deg
html_60_lat = self._generate_lateral_map_html("60 deg", data_60_opd, cfg)
path_60_lat = output_dir / f"zernike_{timestamp}_60_lateral.html"
path_60_lat.write_text(html_60_lat, encoding='utf-8')
html_files.append(path_60_lat)
html_90 = self._generate_view_html(
"90 deg (Manufacturing)", X_90, Y_90, WFE_90, rms_90, cfg,
is_relative=False, is_manufacturing=True,
mfg_metrics=mfg_metrics, correction_metrics=correction_metrics)
summary['60_vs_20_filtered_rms_std'] = rms_60_std['filtered_rms']
summary['60_vs_20_filtered_rms_opd'] = rms_60_opd['filtered_rms']
# =========================================
# 90 deg Manufacturing (absolute, with dual method toggle)
# =========================================
rms_90_std = self._compute_metrics(data_90_std['X'], data_90_std['Y'], data_90_std['WFE'],
n_modes, filter_orders)
rms_90_opd = self._compute_metrics(data_90_opd['X'], data_90_opd['Y'], data_90_opd['WFE'],
n_modes, filter_orders)
html_90 = self._generate_dual_method_view_html(
"90 deg (Manufacturing)", data_90_std, data_90_opd, rms_90_std, rms_90_opd, cfg,
is_relative=False)
path_90 = output_dir / f"zernike_{timestamp}_90_mfg.html"
path_90.write_text(html_90, encoding='utf-8')
html_files.append(path_90)
summary['90_mfg_filtered_rms'] = rms_90['filtered_rms']
summary['90_optician_workload'] = rms_90['rms_filter_j1to3']
# Lateral map for 90 deg
html_90_lat = self._generate_lateral_map_html("90 deg (Manufacturing)", data_90_opd, cfg)
path_90_lat = output_dir / f"zernike_{timestamp}_90_mfg_lateral.html"
path_90_lat.write_text(html_90_lat, encoding='utf-8')
html_files.append(path_90_lat)
summary['90_mfg_filtered_rms_std'] = rms_90_std['filtered_rms']
summary['90_mfg_filtered_rms_opd'] = rms_90_opd['filtered_rms']
summary['90_optician_workload'] = rms_90_opd['rms_filter_j1to3']
# Lateral displacement summary
lateral_40 = data_40_opd.get('lateral_disp', np.zeros(1)) * 1000.0 # mm to µm
lateral_60 = data_60_opd.get('lateral_disp', np.zeros(1)) * 1000.0
lateral_90 = data_90_opd.get('lateral_disp', np.zeros(1)) * 1000.0
summary['lateral_40_max_um'] = float(np.max(lateral_40))
summary['lateral_60_max_um'] = float(np.max(lateral_60))
summary['lateral_90_max_um'] = float(np.max(lateral_90))
return InsightResult(
success=True,