Files
Atomizer/optimization_engine/extractors/zernike_helpers.py

404 lines
13 KiB
Python
Raw Normal View History

feat: Add M1 mirror Zernike optimization with correct RMS calculation Major improvements to telescope mirror optimization workflow: Assembly FEM Workflow (solve_simulation.py): - Fixed multi-part assembly FEM update sequence - Use ImportFromFile() for reliable expression updates - Add DuplicateNodesCheckBuilder with MergeOccurrenceNodes=True - Switch to Foreground solve mode for multi-subcase solutions - Add detailed logging and diagnostics for node merge operations Zernike RMS Calculation: - CRITICAL FIX: Use correct surface-based RMS formula - Global RMS = sqrt(mean(W^2)) from actual WFE values - Filtered RMS = sqrt(mean(W_residual^2)) after removing low-order fit - This matches zernike_Post_Script_NX.py (optical standard) - Previous WRONG formula was: sqrt(sum(coeffs^2)) - Add compute_rms_filter_j1to3() for optician workload metric Subcase Mapping: - Fix subcase mapping to match NX model: - Subcase 1 = 90 deg (polishing orientation) - Subcase 2 = 20 deg (reference) - Subcase 3 = 40 deg - Subcase 4 = 60 deg New Study: M1 Mirror Zernike Optimization - Full optimization config with 11 design variables - 3 objectives: rel_filtered_rms_40_vs_20, rel_filtered_rms_60_vs_20, mfg_90_optician_workload - Neural surrogate support for accelerated optimization Documentation: - Update ZERNIKE_INTEGRATION.md with correct RMS formula - Update ASSEMBLY_FEM_WORKFLOW.md with expression import and node merge details - Add reference scripts from original zernike_Post_Script_NX.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 16:30:15 -05:00
"""
Zernike Helper Functions for Atomizer Optimization
===================================================
Convenience wrappers and utilities for using Zernike analysis
in optimization studies. These helpers simplify integration with
the standard Atomizer optimization patterns.
Usage in run_optimization.py:
from optimization_engine.extractors.zernike_helpers import (
create_zernike_objective,
ZernikeObjectiveBuilder
)
# Simple: create objective function
zernike_obj = create_zernike_objective(
op2_finder=lambda: sim_dir / "model-solution_1.op2",
subcase="20",
metric="filtered_rms_nm"
)
# Use in Optuna trial
rms = zernike_obj()
"""
from pathlib import Path
from typing import Callable, Dict, Any, Optional, Union, List
import logging
from optimization_engine.extractors.extract_zernike import (
ZernikeExtractor,
extract_zernike_from_op2,
extract_zernike_filtered_rms,
)
logger = logging.getLogger(__name__)
def create_zernike_objective(
op2_finder: Callable[[], Path],
bdf_finder: Optional[Callable[[], Path]] = None,
subcase: Union[int, str] = "20",
metric: str = "filtered_rms_nm",
displacement_unit: str = "mm",
**kwargs
) -> Callable[[], float]:
"""
Create a Zernike objective function for optimization.
This factory creates a callable that:
1. Finds the OP2 file (using op2_finder)
2. Extracts Zernike metrics
3. Returns the specified metric value
Args:
op2_finder: Callable that returns path to current OP2 file
bdf_finder: Callable that returns path to BDF file (auto-detect if None)
subcase: Subcase to analyze (e.g., "20" for 20 deg elevation)
metric: Metric to return (see available_metrics below)
displacement_unit: Unit of displacement in OP2 file
**kwargs: Additional arguments for ZernikeExtractor
Returns:
Callable that returns the metric value
Available metrics:
- global_rms_nm: Global RMS wavefront error
- filtered_rms_nm: Filtered RMS (low orders removed)
- defocus_nm: Defocus aberration
- astigmatism_rms_nm: Combined astigmatism
- coma_rms_nm: Combined coma
- trefoil_rms_nm: Combined trefoil
- spherical_nm: Primary spherical aberration
Example:
op2_finder = lambda: Path("sim_dir") / "model-solution_1.op2"
objective = create_zernike_objective(op2_finder, subcase="20")
# In optimization loop
rms_value = objective() # Returns filtered RMS in nm
"""
def evaluate() -> float:
op2_path = op2_finder()
bdf_path = bdf_finder() if bdf_finder else None
result = extract_zernike_from_op2(
op2_path,
bdf_path,
subcase=subcase,
displacement_unit=displacement_unit,
**kwargs
)
if metric not in result:
available = [k for k in result.keys() if isinstance(result[k], (int, float))]
raise ValueError(f"Metric '{metric}' not found. Available: {available}")
return result[metric]
return evaluate
def create_relative_zernike_objective(
op2_finder: Callable[[], Path],
target_subcase: Union[int, str],
reference_subcase: Union[int, str],
bdf_finder: Optional[Callable[[], Path]] = None,
metric: str = "relative_filtered_rms_nm",
**kwargs
) -> Callable[[], float]:
"""
Create objective for relative Zernike metrics between subcases.
Useful for minimizing gravity-induced deformation relative to
a reference orientation (e.g., polishing position at 90 deg).
Args:
op2_finder: Callable returning OP2 path
target_subcase: Subcase to analyze
reference_subcase: Reference subcase to subtract
bdf_finder: Optional BDF path finder
metric: Relative metric to return
**kwargs: Additional ZernikeExtractor arguments
Returns:
Callable that returns relative metric value
"""
def evaluate() -> float:
op2_path = op2_finder()
bdf_path = bdf_finder() if bdf_finder else None
extractor = ZernikeExtractor(op2_path, bdf_path, **kwargs)
result = extractor.extract_relative(
str(target_subcase),
str(reference_subcase)
)
if metric not in result:
available = [k for k in result.keys() if isinstance(result[k], (int, float))]
raise ValueError(f"Metric '{metric}' not found. Available: {available}")
return result[metric]
return evaluate
class ZernikeObjectiveBuilder:
"""
Builder for complex Zernike objectives with multiple subcases.
This is useful for multi-subcase optimization where you want
to combine metrics from different gravity orientations.
Example:
builder = ZernikeObjectiveBuilder(
op2_finder=lambda: sim_dir / "model.op2"
)
# Add objectives for different subcases
builder.add_subcase_objective("20", "filtered_rms_nm", weight=1.0)
builder.add_subcase_objective("40", "filtered_rms_nm", weight=0.5)
builder.add_subcase_objective("60", "filtered_rms_nm", weight=0.5)
# Create combined objective
objective = builder.build_weighted_sum()
combined_rms = objective() # Returns weighted sum
"""
def __init__(
self,
op2_finder: Callable[[], Path],
bdf_finder: Optional[Callable[[], Path]] = None,
displacement_unit: str = "mm",
**kwargs
):
self.op2_finder = op2_finder
self.bdf_finder = bdf_finder
self.displacement_unit = displacement_unit
self.kwargs = kwargs
self.objectives: List[Dict[str, Any]] = []
self._extractor = None
def add_subcase_objective(
self,
subcase: Union[int, str],
metric: str = "filtered_rms_nm",
weight: float = 1.0,
name: Optional[str] = None
) -> "ZernikeObjectiveBuilder":
"""Add a subcase objective to the builder."""
self.objectives.append({
"subcase": str(subcase),
"metric": metric,
"weight": weight,
"name": name or f"{metric}_{subcase}"
})
return self
def add_relative_objective(
self,
target_subcase: Union[int, str],
reference_subcase: Union[int, str],
metric: str = "relative_filtered_rms_nm",
weight: float = 1.0,
name: Optional[str] = None
) -> "ZernikeObjectiveBuilder":
"""Add a relative objective between subcases."""
self.objectives.append({
"target_subcase": str(target_subcase),
"reference_subcase": str(reference_subcase),
"metric": metric,
"weight": weight,
"name": name or f"rel_{target_subcase}_vs_{reference_subcase}",
"is_relative": True
})
return self
def _get_extractor(self) -> ZernikeExtractor:
"""Lazy-create extractor (reused for all objectives)."""
if self._extractor is None:
op2_path = self.op2_finder()
bdf_path = self.bdf_finder() if self.bdf_finder else None
self._extractor = ZernikeExtractor(
op2_path, bdf_path,
displacement_unit=self.displacement_unit,
**self.kwargs
)
return self._extractor
def _reset_extractor(self):
"""Reset extractor (call after OP2 changes)."""
self._extractor = None
def evaluate_all(self) -> Dict[str, float]:
"""
Evaluate all objectives and return dict of values.
Returns:
Dict mapping objective name to value
"""
self._reset_extractor()
extractor = self._get_extractor()
results = {}
for obj in self.objectives:
try:
if obj.get("is_relative"):
rel_result = extractor.extract_relative(
obj["target_subcase"],
obj["reference_subcase"]
)
results[obj["name"]] = rel_result.get(obj["metric"], 0.0)
else:
sub_result = extractor.extract_subcase(obj["subcase"])
results[obj["name"]] = sub_result.get(obj["metric"], 0.0)
except Exception as e:
logger.warning(f"Failed to evaluate {obj['name']}: {e}")
results[obj["name"]] = float("inf")
return results
def build_weighted_sum(self) -> Callable[[], float]:
"""
Build a callable that returns weighted sum of all objectives.
Returns:
Callable returning combined objective value
"""
def evaluate() -> float:
values = self.evaluate_all()
total = 0.0
for obj in self.objectives:
val = values.get(obj["name"], 0.0)
total += obj["weight"] * val
return total
return evaluate
def build_max(self) -> Callable[[], float]:
"""
Build a callable that returns maximum of all objectives.
Useful for worst-case optimization across subcases.
"""
def evaluate() -> float:
values = self.evaluate_all()
weighted = [
obj["weight"] * values.get(obj["name"], 0.0)
for obj in self.objectives
]
return max(weighted) if weighted else 0.0
return evaluate
def build_individual(self) -> Callable[[], Dict[str, float]]:
"""
Build a callable that returns dict of individual objective values.
Useful for multi-objective optimization (NSGA-II).
"""
return self.evaluate_all
def extract_zernike_for_trial(
op2_path: Path,
bdf_path: Optional[Path] = None,
subcases: Optional[List[str]] = None,
reference_subcase: str = "20",
metrics: Optional[List[str]] = None,
**kwargs
) -> Dict[str, Any]:
"""
Extract comprehensive Zernike data for a trial.
This is a high-level function for logging/exporting trial data.
It extracts all metrics for specified subcases and computes
relative metrics vs the reference.
Args:
op2_path: Path to OP2 file
bdf_path: Path to BDF file (auto-detect if None)
subcases: List of subcases to extract (None = all available)
reference_subcase: Reference for relative calculations
metrics: Specific metrics to extract (None = all)
**kwargs: Additional ZernikeExtractor arguments
Returns:
Dict with complete trial Zernike data:
{
'subcases': {
'20': {'global_rms_nm': ..., 'filtered_rms_nm': ..., ...},
'40': {...},
...
},
'relative': {
'40_vs_20': {'relative_filtered_rms_nm': ..., ...},
...
},
'summary': {
'best_filtered_rms': ...,
'worst_filtered_rms': ...,
...
}
}
"""
extractor = ZernikeExtractor(op2_path, bdf_path, **kwargs)
# Get available subcases
available = list(extractor.displacements.keys())
if subcases:
subcases = [s for s in subcases if str(s) in available]
else:
subcases = available
# Extract per-subcase data
subcase_data = {}
for sc in subcases:
try:
subcase_data[sc] = extractor.extract_subcase(str(sc))
except Exception as e:
logger.warning(f"Failed to extract subcase {sc}: {e}")
# Extract relative data
relative_data = {}
if reference_subcase in subcases:
for sc in subcases:
if sc != reference_subcase:
try:
key = f"{sc}_vs_{reference_subcase}"
relative_data[key] = extractor.extract_relative(
str(sc), str(reference_subcase)
)
except Exception as e:
logger.warning(f"Failed to extract relative {key}: {e}")
# Summary statistics
filtered_rms_values = [
d.get('filtered_rms_nm', float('inf'))
for d in subcase_data.values()
]
summary = {
'best_filtered_rms': min(filtered_rms_values) if filtered_rms_values else None,
'worst_filtered_rms': max(filtered_rms_values) if filtered_rms_values else None,
'mean_filtered_rms': sum(filtered_rms_values) / len(filtered_rms_values) if filtered_rms_values else None,
'n_subcases': len(subcases),
'reference_subcase': reference_subcase,
}
return {
'subcases': subcase_data,
'relative': relative_data,
'summary': summary,
}
# Export all helpers
__all__ = [
'create_zernike_objective',
'create_relative_zernike_objective',
'ZernikeObjectiveBuilder',
'extract_zernike_for_trial',
]