404 lines
13 KiB
Python
404 lines
13 KiB
Python
|
|
"""
|
||
|
|
Zernike Helper Functions for Atomizer Optimization
|
||
|
|
===================================================
|
||
|
|
|
||
|
|
Convenience wrappers and utilities for using Zernike analysis
|
||
|
|
in optimization studies. These helpers simplify integration with
|
||
|
|
the standard Atomizer optimization patterns.
|
||
|
|
|
||
|
|
Usage in run_optimization.py:
|
||
|
|
from optimization_engine.extractors.zernike_helpers import (
|
||
|
|
create_zernike_objective,
|
||
|
|
ZernikeObjectiveBuilder
|
||
|
|
)
|
||
|
|
|
||
|
|
# Simple: create objective function
|
||
|
|
zernike_obj = create_zernike_objective(
|
||
|
|
op2_finder=lambda: sim_dir / "model-solution_1.op2",
|
||
|
|
subcase="20",
|
||
|
|
metric="filtered_rms_nm"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Use in Optuna trial
|
||
|
|
rms = zernike_obj()
|
||
|
|
"""
|
||
|
|
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Callable, Dict, Any, Optional, Union, List
|
||
|
|
import logging
|
||
|
|
|
||
|
|
from optimization_engine.extractors.extract_zernike import (
|
||
|
|
ZernikeExtractor,
|
||
|
|
extract_zernike_from_op2,
|
||
|
|
extract_zernike_filtered_rms,
|
||
|
|
)
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
def create_zernike_objective(
|
||
|
|
op2_finder: Callable[[], Path],
|
||
|
|
bdf_finder: Optional[Callable[[], Path]] = None,
|
||
|
|
subcase: Union[int, str] = "20",
|
||
|
|
metric: str = "filtered_rms_nm",
|
||
|
|
displacement_unit: str = "mm",
|
||
|
|
**kwargs
|
||
|
|
) -> Callable[[], float]:
|
||
|
|
"""
|
||
|
|
Create a Zernike objective function for optimization.
|
||
|
|
|
||
|
|
This factory creates a callable that:
|
||
|
|
1. Finds the OP2 file (using op2_finder)
|
||
|
|
2. Extracts Zernike metrics
|
||
|
|
3. Returns the specified metric value
|
||
|
|
|
||
|
|
Args:
|
||
|
|
op2_finder: Callable that returns path to current OP2 file
|
||
|
|
bdf_finder: Callable that returns path to BDF file (auto-detect if None)
|
||
|
|
subcase: Subcase to analyze (e.g., "20" for 20 deg elevation)
|
||
|
|
metric: Metric to return (see available_metrics below)
|
||
|
|
displacement_unit: Unit of displacement in OP2 file
|
||
|
|
**kwargs: Additional arguments for ZernikeExtractor
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Callable that returns the metric value
|
||
|
|
|
||
|
|
Available metrics:
|
||
|
|
- global_rms_nm: Global RMS wavefront error
|
||
|
|
- filtered_rms_nm: Filtered RMS (low orders removed)
|
||
|
|
- defocus_nm: Defocus aberration
|
||
|
|
- astigmatism_rms_nm: Combined astigmatism
|
||
|
|
- coma_rms_nm: Combined coma
|
||
|
|
- trefoil_rms_nm: Combined trefoil
|
||
|
|
- spherical_nm: Primary spherical aberration
|
||
|
|
|
||
|
|
Example:
|
||
|
|
op2_finder = lambda: Path("sim_dir") / "model-solution_1.op2"
|
||
|
|
objective = create_zernike_objective(op2_finder, subcase="20")
|
||
|
|
|
||
|
|
# In optimization loop
|
||
|
|
rms_value = objective() # Returns filtered RMS in nm
|
||
|
|
"""
|
||
|
|
def evaluate() -> float:
|
||
|
|
op2_path = op2_finder()
|
||
|
|
bdf_path = bdf_finder() if bdf_finder else None
|
||
|
|
|
||
|
|
result = extract_zernike_from_op2(
|
||
|
|
op2_path,
|
||
|
|
bdf_path,
|
||
|
|
subcase=subcase,
|
||
|
|
displacement_unit=displacement_unit,
|
||
|
|
**kwargs
|
||
|
|
)
|
||
|
|
|
||
|
|
if metric not in result:
|
||
|
|
available = [k for k in result.keys() if isinstance(result[k], (int, float))]
|
||
|
|
raise ValueError(f"Metric '{metric}' not found. Available: {available}")
|
||
|
|
|
||
|
|
return result[metric]
|
||
|
|
|
||
|
|
return evaluate
|
||
|
|
|
||
|
|
|
||
|
|
def create_relative_zernike_objective(
|
||
|
|
op2_finder: Callable[[], Path],
|
||
|
|
target_subcase: Union[int, str],
|
||
|
|
reference_subcase: Union[int, str],
|
||
|
|
bdf_finder: Optional[Callable[[], Path]] = None,
|
||
|
|
metric: str = "relative_filtered_rms_nm",
|
||
|
|
**kwargs
|
||
|
|
) -> Callable[[], float]:
|
||
|
|
"""
|
||
|
|
Create objective for relative Zernike metrics between subcases.
|
||
|
|
|
||
|
|
Useful for minimizing gravity-induced deformation relative to
|
||
|
|
a reference orientation (e.g., polishing position at 90 deg).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
op2_finder: Callable returning OP2 path
|
||
|
|
target_subcase: Subcase to analyze
|
||
|
|
reference_subcase: Reference subcase to subtract
|
||
|
|
bdf_finder: Optional BDF path finder
|
||
|
|
metric: Relative metric to return
|
||
|
|
**kwargs: Additional ZernikeExtractor arguments
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Callable that returns relative metric value
|
||
|
|
"""
|
||
|
|
def evaluate() -> float:
|
||
|
|
op2_path = op2_finder()
|
||
|
|
bdf_path = bdf_finder() if bdf_finder else None
|
||
|
|
|
||
|
|
extractor = ZernikeExtractor(op2_path, bdf_path, **kwargs)
|
||
|
|
result = extractor.extract_relative(
|
||
|
|
str(target_subcase),
|
||
|
|
str(reference_subcase)
|
||
|
|
)
|
||
|
|
|
||
|
|
if metric not in result:
|
||
|
|
available = [k for k in result.keys() if isinstance(result[k], (int, float))]
|
||
|
|
raise ValueError(f"Metric '{metric}' not found. Available: {available}")
|
||
|
|
|
||
|
|
return result[metric]
|
||
|
|
|
||
|
|
return evaluate
|
||
|
|
|
||
|
|
|
||
|
|
class ZernikeObjectiveBuilder:
|
||
|
|
"""
|
||
|
|
Builder for complex Zernike objectives with multiple subcases.
|
||
|
|
|
||
|
|
This is useful for multi-subcase optimization where you want
|
||
|
|
to combine metrics from different gravity orientations.
|
||
|
|
|
||
|
|
Example:
|
||
|
|
builder = ZernikeObjectiveBuilder(
|
||
|
|
op2_finder=lambda: sim_dir / "model.op2"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Add objectives for different subcases
|
||
|
|
builder.add_subcase_objective("20", "filtered_rms_nm", weight=1.0)
|
||
|
|
builder.add_subcase_objective("40", "filtered_rms_nm", weight=0.5)
|
||
|
|
builder.add_subcase_objective("60", "filtered_rms_nm", weight=0.5)
|
||
|
|
|
||
|
|
# Create combined objective
|
||
|
|
objective = builder.build_weighted_sum()
|
||
|
|
combined_rms = objective() # Returns weighted sum
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
op2_finder: Callable[[], Path],
|
||
|
|
bdf_finder: Optional[Callable[[], Path]] = None,
|
||
|
|
displacement_unit: str = "mm",
|
||
|
|
**kwargs
|
||
|
|
):
|
||
|
|
self.op2_finder = op2_finder
|
||
|
|
self.bdf_finder = bdf_finder
|
||
|
|
self.displacement_unit = displacement_unit
|
||
|
|
self.kwargs = kwargs
|
||
|
|
self.objectives: List[Dict[str, Any]] = []
|
||
|
|
self._extractor = None
|
||
|
|
|
||
|
|
def add_subcase_objective(
|
||
|
|
self,
|
||
|
|
subcase: Union[int, str],
|
||
|
|
metric: str = "filtered_rms_nm",
|
||
|
|
weight: float = 1.0,
|
||
|
|
name: Optional[str] = None
|
||
|
|
) -> "ZernikeObjectiveBuilder":
|
||
|
|
"""Add a subcase objective to the builder."""
|
||
|
|
self.objectives.append({
|
||
|
|
"subcase": str(subcase),
|
||
|
|
"metric": metric,
|
||
|
|
"weight": weight,
|
||
|
|
"name": name or f"{metric}_{subcase}"
|
||
|
|
})
|
||
|
|
return self
|
||
|
|
|
||
|
|
def add_relative_objective(
|
||
|
|
self,
|
||
|
|
target_subcase: Union[int, str],
|
||
|
|
reference_subcase: Union[int, str],
|
||
|
|
metric: str = "relative_filtered_rms_nm",
|
||
|
|
weight: float = 1.0,
|
||
|
|
name: Optional[str] = None
|
||
|
|
) -> "ZernikeObjectiveBuilder":
|
||
|
|
"""Add a relative objective between subcases."""
|
||
|
|
self.objectives.append({
|
||
|
|
"target_subcase": str(target_subcase),
|
||
|
|
"reference_subcase": str(reference_subcase),
|
||
|
|
"metric": metric,
|
||
|
|
"weight": weight,
|
||
|
|
"name": name or f"rel_{target_subcase}_vs_{reference_subcase}",
|
||
|
|
"is_relative": True
|
||
|
|
})
|
||
|
|
return self
|
||
|
|
|
||
|
|
def _get_extractor(self) -> ZernikeExtractor:
|
||
|
|
"""Lazy-create extractor (reused for all objectives)."""
|
||
|
|
if self._extractor is None:
|
||
|
|
op2_path = self.op2_finder()
|
||
|
|
bdf_path = self.bdf_finder() if self.bdf_finder else None
|
||
|
|
self._extractor = ZernikeExtractor(
|
||
|
|
op2_path, bdf_path,
|
||
|
|
displacement_unit=self.displacement_unit,
|
||
|
|
**self.kwargs
|
||
|
|
)
|
||
|
|
return self._extractor
|
||
|
|
|
||
|
|
def _reset_extractor(self):
|
||
|
|
"""Reset extractor (call after OP2 changes)."""
|
||
|
|
self._extractor = None
|
||
|
|
|
||
|
|
def evaluate_all(self) -> Dict[str, float]:
|
||
|
|
"""
|
||
|
|
Evaluate all objectives and return dict of values.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict mapping objective name to value
|
||
|
|
"""
|
||
|
|
self._reset_extractor()
|
||
|
|
extractor = self._get_extractor()
|
||
|
|
results = {}
|
||
|
|
|
||
|
|
for obj in self.objectives:
|
||
|
|
try:
|
||
|
|
if obj.get("is_relative"):
|
||
|
|
rel_result = extractor.extract_relative(
|
||
|
|
obj["target_subcase"],
|
||
|
|
obj["reference_subcase"]
|
||
|
|
)
|
||
|
|
results[obj["name"]] = rel_result.get(obj["metric"], 0.0)
|
||
|
|
else:
|
||
|
|
sub_result = extractor.extract_subcase(obj["subcase"])
|
||
|
|
results[obj["name"]] = sub_result.get(obj["metric"], 0.0)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Failed to evaluate {obj['name']}: {e}")
|
||
|
|
results[obj["name"]] = float("inf")
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
def build_weighted_sum(self) -> Callable[[], float]:
|
||
|
|
"""
|
||
|
|
Build a callable that returns weighted sum of all objectives.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Callable returning combined objective value
|
||
|
|
"""
|
||
|
|
def evaluate() -> float:
|
||
|
|
values = self.evaluate_all()
|
||
|
|
total = 0.0
|
||
|
|
for obj in self.objectives:
|
||
|
|
val = values.get(obj["name"], 0.0)
|
||
|
|
total += obj["weight"] * val
|
||
|
|
return total
|
||
|
|
|
||
|
|
return evaluate
|
||
|
|
|
||
|
|
def build_max(self) -> Callable[[], float]:
|
||
|
|
"""
|
||
|
|
Build a callable that returns maximum of all objectives.
|
||
|
|
|
||
|
|
Useful for worst-case optimization across subcases.
|
||
|
|
"""
|
||
|
|
def evaluate() -> float:
|
||
|
|
values = self.evaluate_all()
|
||
|
|
weighted = [
|
||
|
|
obj["weight"] * values.get(obj["name"], 0.0)
|
||
|
|
for obj in self.objectives
|
||
|
|
]
|
||
|
|
return max(weighted) if weighted else 0.0
|
||
|
|
|
||
|
|
return evaluate
|
||
|
|
|
||
|
|
def build_individual(self) -> Callable[[], Dict[str, float]]:
|
||
|
|
"""
|
||
|
|
Build a callable that returns dict of individual objective values.
|
||
|
|
|
||
|
|
Useful for multi-objective optimization (NSGA-II).
|
||
|
|
"""
|
||
|
|
return self.evaluate_all
|
||
|
|
|
||
|
|
|
||
|
|
def extract_zernike_for_trial(
|
||
|
|
op2_path: Path,
|
||
|
|
bdf_path: Optional[Path] = None,
|
||
|
|
subcases: Optional[List[str]] = None,
|
||
|
|
reference_subcase: str = "20",
|
||
|
|
metrics: Optional[List[str]] = None,
|
||
|
|
**kwargs
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""
|
||
|
|
Extract comprehensive Zernike data for a trial.
|
||
|
|
|
||
|
|
This is a high-level function for logging/exporting trial data.
|
||
|
|
It extracts all metrics for specified subcases and computes
|
||
|
|
relative metrics vs the reference.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
op2_path: Path to OP2 file
|
||
|
|
bdf_path: Path to BDF file (auto-detect if None)
|
||
|
|
subcases: List of subcases to extract (None = all available)
|
||
|
|
reference_subcase: Reference for relative calculations
|
||
|
|
metrics: Specific metrics to extract (None = all)
|
||
|
|
**kwargs: Additional ZernikeExtractor arguments
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with complete trial Zernike data:
|
||
|
|
{
|
||
|
|
'subcases': {
|
||
|
|
'20': {'global_rms_nm': ..., 'filtered_rms_nm': ..., ...},
|
||
|
|
'40': {...},
|
||
|
|
...
|
||
|
|
},
|
||
|
|
'relative': {
|
||
|
|
'40_vs_20': {'relative_filtered_rms_nm': ..., ...},
|
||
|
|
...
|
||
|
|
},
|
||
|
|
'summary': {
|
||
|
|
'best_filtered_rms': ...,
|
||
|
|
'worst_filtered_rms': ...,
|
||
|
|
...
|
||
|
|
}
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
extractor = ZernikeExtractor(op2_path, bdf_path, **kwargs)
|
||
|
|
|
||
|
|
# Get available subcases
|
||
|
|
available = list(extractor.displacements.keys())
|
||
|
|
if subcases:
|
||
|
|
subcases = [s for s in subcases if str(s) in available]
|
||
|
|
else:
|
||
|
|
subcases = available
|
||
|
|
|
||
|
|
# Extract per-subcase data
|
||
|
|
subcase_data = {}
|
||
|
|
for sc in subcases:
|
||
|
|
try:
|
||
|
|
subcase_data[sc] = extractor.extract_subcase(str(sc))
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Failed to extract subcase {sc}: {e}")
|
||
|
|
|
||
|
|
# Extract relative data
|
||
|
|
relative_data = {}
|
||
|
|
if reference_subcase in subcases:
|
||
|
|
for sc in subcases:
|
||
|
|
if sc != reference_subcase:
|
||
|
|
try:
|
||
|
|
key = f"{sc}_vs_{reference_subcase}"
|
||
|
|
relative_data[key] = extractor.extract_relative(
|
||
|
|
str(sc), str(reference_subcase)
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Failed to extract relative {key}: {e}")
|
||
|
|
|
||
|
|
# Summary statistics
|
||
|
|
filtered_rms_values = [
|
||
|
|
d.get('filtered_rms_nm', float('inf'))
|
||
|
|
for d in subcase_data.values()
|
||
|
|
]
|
||
|
|
|
||
|
|
summary = {
|
||
|
|
'best_filtered_rms': min(filtered_rms_values) if filtered_rms_values else None,
|
||
|
|
'worst_filtered_rms': max(filtered_rms_values) if filtered_rms_values else None,
|
||
|
|
'mean_filtered_rms': sum(filtered_rms_values) / len(filtered_rms_values) if filtered_rms_values else None,
|
||
|
|
'n_subcases': len(subcases),
|
||
|
|
'reference_subcase': reference_subcase,
|
||
|
|
}
|
||
|
|
|
||
|
|
return {
|
||
|
|
'subcases': subcase_data,
|
||
|
|
'relative': relative_data,
|
||
|
|
'summary': summary,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# Export all helpers
|
||
|
|
__all__ = [
|
||
|
|
'create_zernike_objective',
|
||
|
|
'create_relative_zernike_objective',
|
||
|
|
'ZernikeObjectiveBuilder',
|
||
|
|
'extract_zernike_for_trial',
|
||
|
|
]
|