feat: Add Zernike wavefront viewer and V14 TPE optimization study

Dashboard Zernike Analysis:
- Add ZernikeViewer component with tabbed UI (40°, 60°, 90° vs 20°)
- Generate 3D surface mesh plots with Mesh3d triangulation
- Full 50-mode Zernike coefficient tables with mode names
- Manufacturing metrics for 90_vs_20 (optician workload analysis)
- OP2 availability filter for FEA trials only
- Fix duplicate trial display with unique React keys
- Tab switching with proper event propagation

Backend API Enhancements:
- GET /studies/{id}/trials/{num}/zernike - Generate Zernike HTML on-demand
- GET /studies/{id}/zernike-available - List trials with OP2 files
- compute_manufacturing_metrics() for aberration analysis
- compute_rms_filter_j1to3() for optician workload metric

M1 Mirror V14 Study:
- TPE (Tree-structured Parzen Estimator) optimization
- Seeds from 496 prior FEA trials (V11+V12+V13)
- Weighted-sum objective: 5*obj_40 + 5*obj_60 + 1*obj_mfg
- Multivariate TPE with constant_liar for efficient exploration
- Ready for 8-hour overnight runs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Antoine
2025-12-10 21:34:07 -05:00
parent 96b196de58
commit 48404fd743
5 changed files with 1785 additions and 13 deletions

View File

@@ -446,12 +446,17 @@ async def get_optimization_history(study_id: str, limit: Optional[int] = None):
# Extract source for FEA vs NN differentiation
source = user_attrs.get("source", "FEA") # Default to FEA for legacy studies
# Use trial_id as unique identifier when multiple Optuna studies exist
# This avoids trial number collisions between studies
unique_trial_num = trial_id if study_name else trial_num
# Get iter_num from user_attrs if available (this is the actual iteration folder number)
iter_num = user_attrs.get("iter_num", None)
# Use iter_num if available, otherwise use trial_id as unique identifier
# trial_id is unique across all studies in the database
unique_trial_num = iter_num if iter_num is not None else trial_id
trials.append({
"trial_number": unique_trial_num,
"trial_id": trial_id, # Keep original for debugging
"optuna_trial_num": trial_num, # Keep original Optuna trial number
"objective": values[0] if len(values) > 0 else None, # Primary objective
"objectives": values if len(values) > 1 else None, # All objectives for multi-objective
"design_variables": final_design_vars, # Use merged design vars
@@ -2227,6 +2232,575 @@ async def update_study_config(study_id: str, request: UpdateConfigRequest):
raise HTTPException(status_code=500, detail=f"Failed to update config: {str(e)}")
# ============================================================================
# Zernike Analysis Endpoints
# ============================================================================
@router.get("/studies/{study_id}/zernike-available")
async def get_zernike_available_trials(study_id: str):
"""
Get list of trial numbers that have Zernike analysis available (OP2 files).
Returns:
JSON with list of trial numbers that have iteration folders with OP2 files
"""
try:
study_dir = STUDIES_DIR / study_id
if not study_dir.exists():
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
iter_base = study_dir / "2_iterations"
if not iter_base.exists():
return {"study_id": study_id, "available_trials": [], "count": 0}
available_trials = []
for d in iter_base.iterdir():
if d.is_dir() and d.name.startswith('iter'):
# Check for OP2 file
op2_files = list(d.glob("*.op2"))
if op2_files:
iter_num_str = d.name.replace('iter', '')
try:
iter_num = int(iter_num_str)
# Map iter number to trial number (iter1 -> trial 0, etc.)
# But also keep iter_num as possibility
if iter_num != 9999:
available_trials.append(iter_num - 1) # 0-indexed trial
else:
available_trials.append(9999) # Special test iteration
except ValueError:
pass
available_trials.sort()
return {
"study_id": study_id,
"available_trials": available_trials,
"count": len(available_trials)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get available trials: {str(e)}")
@router.get("/studies/{study_id}/trials/{trial_number}/zernike")
async def get_trial_zernike(study_id: str, trial_number: int):
"""
Generate or retrieve Zernike analysis HTML for a specific trial.
This endpoint generates interactive Zernike wavefront analysis for mirror
optimization trials. It produces 3D surface residual plots, RMS metrics,
and coefficient bar charts for each angle comparison (40_vs_20, 60_vs_20, 90_vs_20).
Args:
study_id: Study identifier
trial_number: Trial/iteration number
Returns:
JSON with HTML content for each comparison, or error if OP2 not found
"""
try:
study_dir = STUDIES_DIR / study_id
if not study_dir.exists():
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
# Find iteration directory
# Trial numbers in Optuna DB may differ from iteration folder numbers
# Common patterns:
# 1. iter{trial_number} - direct mapping
# 2. iter{trial_number + 1} - 0-indexed trials vs 1-indexed folders
# 3. Check for actual folder existence
iter_dir = None
possible_iter_nums = [trial_number, trial_number + 1]
for iter_num in possible_iter_nums:
candidate = study_dir / "2_iterations" / f"iter{iter_num}"
if candidate.exists():
iter_dir = candidate
break
if iter_dir is None:
raise HTTPException(
status_code=404,
detail=f"No FEA results for trial {trial_number}. This trial may have used surrogate model (NN) prediction instead of full FEA simulation. Zernike analysis requires OP2 results from actual FEA runs."
)
# Check for OP2 file BEFORE doing expensive imports
op2_files = list(iter_dir.glob("*.op2"))
if not op2_files:
raise HTTPException(
status_code=404,
detail=f"No OP2 results file found in {iter_dir.name}. FEA may not have completed."
)
# Only import heavy dependencies after we know we have an OP2 file
sys.path.append(str(Path(__file__).parent.parent.parent.parent.parent))
from optimization_engine.extractors import ZernikeExtractor
import numpy as np
from math import factorial
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.tri import Triangulation
# Configuration
N_MODES = 50
AMP = 50.0 # Increased from 2.0 for better visibility
PANCAKE = 5.0 # Reduced from 10.0 for more Z range
PLOT_DOWNSAMPLE = 5000 # Reduced for faster loading
FILTER_LOW_ORDERS = 4
SUBCASE_MAP = {
'1': '90', '2': '20', '3': '40', '4': '60',
}
REF_SUBCASE = '2'
def noll_indices(j: int):
if j < 1:
raise ValueError("Noll index j must be >= 1")
count = 0
n = 0
while True:
if n == 0:
ms = [0]
elif n % 2 == 0:
ms = [0] + [m for k in range(1, n//2 + 1) for m in (-2*k, 2*k)]
else:
ms = [m for k in range(0, (n+1)//2) for m in (-(2*k+1), (2*k+1))]
for m in ms:
count += 1
if count == j:
return n, m
n += 1
def zernike_noll(j: int, r: np.ndarray, th: np.ndarray) -> np.ndarray:
n, m = noll_indices(j)
R = np.zeros_like(r)
for s in range((n-abs(m))//2 + 1):
c = ((-1)**s * factorial(n-s) /
(factorial(s) *
factorial((n+abs(m))//2 - s) *
factorial((n-abs(m))//2 - s)))
R += c * r**(n-2*s)
if m == 0:
return R
return R * (np.cos(m*th) if m > 0 else np.sin(-m*th))
def zernike_common_name(n: int, m: int) -> str:
names = {
(0, 0): "Piston", (1, -1): "Tilt X", (1, 1): "Tilt Y",
(2, 0): "Defocus", (2, -2): "Astig 45°", (2, 2): "Astig 0°",
(3, -1): "Coma X", (3, 1): "Coma Y", (3, -3): "Trefoil X", (3, 3): "Trefoil Y",
(4, 0): "Primary Spherical", (4, -2): "Sec Astig X", (4, 2): "Sec Astig Y",
(4, -4): "Quadrafoil X", (4, 4): "Quadrafoil Y",
(5, -1): "Sec Coma X", (5, 1): "Sec Coma Y",
(5, -3): "Sec Trefoil X", (5, 3): "Sec Trefoil Y",
(5, -5): "Pentafoil X", (5, 5): "Pentafoil Y",
(6, 0): "Sec Spherical",
}
return names.get((n, m), f"Z(n={n}, m={m})")
def zernike_label(j: int) -> str:
n, m = noll_indices(j)
return f"J{j:02d} - {zernike_common_name(n, m)}"
def compute_manufacturing_metrics(coefficients: np.ndarray) -> dict:
"""Compute manufacturing-related aberration metrics."""
return {
'defocus_nm': float(abs(coefficients[3])), # J4
'astigmatism_rms': float(np.sqrt(coefficients[4]**2 + coefficients[5]**2)), # J5+J6
'coma_rms': float(np.sqrt(coefficients[6]**2 + coefficients[7]**2)), # J7+J8
'trefoil_rms': float(np.sqrt(coefficients[8]**2 + coefficients[9]**2)), # J9+J10
'spherical_nm': float(abs(coefficients[10])) if len(coefficients) > 10 else 0.0, # J11
}
def compute_rms_filter_j1to3(X, Y, W_nm, coefficients, R):
"""Compute RMS with J1-J3 filtered (keeping defocus for optician workload)."""
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
r = np.hypot(Xc/R, Yc/R)
th = np.arctan2(Yc, Xc)
Z_j1to3 = np.column_stack([zernike_noll(j, r, th) for j in range(1, 4)])
W_filter_j1to3 = W_nm - Z_j1to3 @ coefficients[:3]
return float(np.sqrt(np.mean(W_filter_j1to3**2)))
def generate_zernike_html(
title: str,
X: np.ndarray,
Y: np.ndarray,
W_nm: np.ndarray,
coefficients: np.ndarray,
rms_global: float,
rms_filtered: float,
ref_title: str = "20 deg",
abs_pair = None,
is_manufacturing: bool = False,
mfg_metrics: dict = None,
correction_metrics: dict = None
) -> str:
"""Generate HTML string for Zernike visualization with full tables."""
# Compute residual surface (filtered)
Xc = X - np.mean(X)
Yc = Y - np.mean(Y)
R = float(np.max(np.hypot(Xc, Yc)))
r = np.hypot(Xc/R, Yc/R)
th = np.arctan2(Yc, Xc)
Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, N_MODES+1)])
W_res_filt = W_nm - Z[:, :FILTER_LOW_ORDERS].dot(coefficients[:FILTER_LOW_ORDERS])
# Compute J1-J3 filtered RMS (optician workload metric)
rms_filter_j1to3 = compute_rms_filter_j1to3(X, Y, W_nm, coefficients, R)
# Downsample for display
n = len(X)
if n > PLOT_DOWNSAMPLE:
rng = np.random.default_rng(42)
sel = rng.choice(n, size=PLOT_DOWNSAMPLE, replace=False)
Xp, Yp, Wp = X[sel], Y[sel], W_res_filt[sel]
else:
Xp, Yp, Wp = X, Y, W_res_filt
res_amp = AMP * Wp
max_amp = float(np.max(np.abs(res_amp))) if res_amp.size else 1.0
# Create SURFACE mesh (not just points)
surface_trace = None
try:
tri = Triangulation(Xp, Yp)
if tri.triangles is not None and len(tri.triangles) > 0:
i_idx, j_idx, k_idx = tri.triangles.T
surface_trace = go.Mesh3d(
x=Xp.tolist(), y=Yp.tolist(), z=res_amp.tolist(),
i=i_idx.tolist(), j=j_idx.tolist(), k=k_idx.tolist(),
intensity=res_amp.tolist(),
colorscale='RdBu',
opacity=0.95,
flatshading=True,
showscale=True,
colorbar=dict(title="Residual (nm)", titleside='right', len=0.6)
)
except Exception as e:
print(f"Triangulation failed: {e}")
labels = [zernike_label(j) for j in range(1, N_MODES+1)]
coeff_abs = np.abs(coefficients)
mfg = compute_manufacturing_metrics(coefficients)
# Determine layout based on whether this is manufacturing (90 deg) view
if is_manufacturing and mfg_metrics and correction_metrics:
# Manufacturing view: 5 rows
fig = make_subplots(
rows=5, cols=1,
specs=[[{"type": "scene"}],
[{"type": "table"}],
[{"type": "table"}],
[{"type": "table"}],
[{"type": "xy"}]],
row_heights=[0.35, 0.10, 0.15, 0.15, 0.25],
vertical_spacing=0.025,
subplot_titles=[
f"<b>Surface Residual (relative to {ref_title})</b>",
"<b>RMS Metrics</b>",
"<b>Mode Magnitudes (Absolute 90 deg)</b>",
"<b>Pre-Correction (90 deg - 20 deg)</b>",
f"<b>Zernike Coefficients ({N_MODES} modes)</b>"
]
)
else:
# Standard relative view: 4 rows with full coefficient table
fig = make_subplots(
rows=4, cols=1,
specs=[[{"type": "scene"}],
[{"type": "table"}],
[{"type": "table"}],
[{"type": "xy"}]],
row_heights=[0.40, 0.12, 0.28, 0.20],
vertical_spacing=0.03,
subplot_titles=[
f"<b>Surface Residual (relative to {ref_title})</b>",
"<b>RMS Metrics</b>",
f"<b>Zernike Coefficients ({N_MODES} modes)</b>",
"<b>Top 20 |Zernike Coefficients| (nm)</b>"
]
)
# Add surface mesh (or fallback to scatter)
if surface_trace is not None:
fig.add_trace(surface_trace, row=1, col=1)
else:
# Fallback to scatter if triangulation failed
fig.add_trace(go.Scatter3d(
x=Xp.tolist(), y=Yp.tolist(), z=res_amp.tolist(),
mode='markers',
marker=dict(size=2, color=res_amp.tolist(), colorscale='RdBu', showscale=True),
showlegend=False
), row=1, col=1)
fig.update_scenes(
camera=dict(eye=dict(x=0.8, y=0.8, z=0.6)),
zaxis=dict(range=[-max_amp * PANCAKE, max_amp * PANCAKE]),
aspectmode='data'
)
# Row 2: RMS table with all metrics
if abs_pair is not None:
abs_global, abs_filtered = abs_pair
fig.add_trace(go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Relative (nm)</b>", "<b>Absolute (nm)</b>"],
align="left",
fill_color='rgb(55, 83, 109)',
font=dict(color='white', size=12)
),
cells=dict(
values=[
["Global RMS", "Filtered RMS (J1-J4)", "Filtered RMS (J1-J3, w/ defocus)"],
[f"{rms_global:.2f}", f"{rms_filtered:.2f}", f"{rms_filter_j1to3:.2f}"],
[f"{abs_global:.2f}", f"{abs_filtered:.2f}", "-"],
],
align="left",
fill_color='rgb(243, 243, 243)'
)
), row=2, col=1)
else:
fig.add_trace(go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Value (nm)</b>"],
align="left",
fill_color='rgb(55, 83, 109)',
font=dict(color='white', size=12)
),
cells=dict(
values=[
["Global RMS", "Filtered RMS (J1-J4)", "Filtered RMS (J1-J3, w/ defocus)"],
[f"{rms_global:.2f}", f"{rms_filtered:.2f}", f"{rms_filter_j1to3:.2f}"]
],
align="left",
fill_color='rgb(243, 243, 243)'
)
), row=2, col=1)
if is_manufacturing and mfg_metrics and correction_metrics:
# Row 3: Mode magnitudes at 90 deg (absolute)
fig.add_trace(go.Table(
header=dict(
values=["<b>Mode</b>", "<b>Value (nm)</b>"],
align="left",
fill_color='rgb(55, 83, 109)',
font=dict(color='white', size=11)
),
cells=dict(
values=[
["Defocus (J4)", "Astigmatism (J5+J6)", "Coma (J7+J8)", "Trefoil (J9+J10)", "Spherical (J11)"],
[f"{mfg_metrics['defocus_nm']:.2f}", f"{mfg_metrics['astigmatism_rms']:.2f}",
f"{mfg_metrics['coma_rms']:.2f}", f"{mfg_metrics['trefoil_rms']:.2f}",
f"{mfg_metrics['spherical_nm']:.2f}"]
],
align="left",
fill_color='rgb(243, 243, 243)'
)
), row=3, col=1)
# Row 4: Pre-correction (90 deg - 20 deg)
fig.add_trace(go.Table(
header=dict(
values=["<b>Correction Mode</b>", "<b>Value (nm)</b>"],
align="left",
fill_color='rgb(55, 83, 109)',
font=dict(color='white', size=11)
),
cells=dict(
values=[
["Total RMS (J1-J3 filter)", "Defocus (J4)", "Astigmatism (J5+J6)", "Coma (J7+J8)"],
[f"{correction_metrics.get('rms_filter_j1to3', 0):.2f}",
f"{correction_metrics['defocus_nm']:.2f}",
f"{correction_metrics['astigmatism_rms']:.2f}",
f"{correction_metrics['coma_rms']:.2f}"]
],
align="left",
fill_color='rgb(243, 243, 243)'
)
), row=4, col=1)
# Row 5: Bar chart
sorted_idx = np.argsort(coeff_abs)[::-1][:20]
fig.add_trace(
go.Bar(
x=[float(coeff_abs[i]) for i in sorted_idx],
y=[labels[i] for i in sorted_idx],
orientation='h',
marker_color='rgb(55, 83, 109)',
hovertemplate="%{y}<br>|Coeff| = %{x:.3f} nm<extra></extra>",
showlegend=False
),
row=5, col=1
)
else:
# Row 3: Full coefficient table
fig.add_trace(go.Table(
header=dict(
values=["<b>Noll j</b>", "<b>Mode Name</b>", "<b>Coeff (nm)</b>", "<b>|Coeff| (nm)</b>"],
align="left",
fill_color='rgb(55, 83, 109)',
font=dict(color='white', size=11)
),
cells=dict(
values=[
list(range(1, N_MODES+1)),
labels,
[f"{c:+.3f}" for c in coefficients],
[f"{abs(c):.3f}" for c in coefficients]
],
align="left",
fill_color='rgb(243, 243, 243)',
font=dict(size=10),
height=22
)
), row=3, col=1)
# Row 4: Bar chart - top 20 modes by magnitude
sorted_idx = np.argsort(coeff_abs)[::-1][:20]
fig.add_trace(
go.Bar(
x=[float(coeff_abs[i]) for i in sorted_idx],
y=[labels[i] for i in sorted_idx],
orientation='h',
marker_color='rgb(55, 83, 109)',
hovertemplate="%{y}<br>|Coeff| = %{x:.3f} nm<extra></extra>",
showlegend=False
),
row=4, col=1
)
fig.update_layout(
width=1400,
height=1800 if is_manufacturing else 1600,
margin=dict(t=80, b=20, l=20, r=20),
title=dict(
text=f"<b>{title}</b>",
font=dict(size=20),
x=0.5
),
paper_bgcolor='white',
plot_bgcolor='white'
)
return fig.to_html(include_plotlyjs='cdn', full_html=True)
# Load OP2 and generate reports
op2_path = op2_files[0]
extractor = ZernikeExtractor(str(op2_path), displacement_unit='mm', n_modes=N_MODES)
results = {}
comparisons = [
('3', '2', '40_vs_20', '40 deg vs 20 deg'),
('4', '2', '60_vs_20', '60 deg vs 20 deg'),
('1', '2', '90_vs_20', '90 deg vs 20 deg (manufacturing)'),
]
# Pre-compute absolute 90 deg metrics for manufacturing view
abs_90_data = None
abs_90_metrics = None
if '1' in extractor.displacements:
abs_90_data = extractor.extract_subcase('1', include_coefficients=True)
abs_90_metrics = compute_manufacturing_metrics(np.array(abs_90_data['coefficients']))
for target_sc, ref_sc, key, title_suffix in comparisons:
if target_sc not in extractor.displacements:
continue
# Get relative data with coefficients
rel_data = extractor.extract_relative(target_sc, ref_sc, include_coefficients=True)
# Get absolute data for this subcase
abs_data = extractor.extract_subcase(target_sc, include_coefficients=True)
# Build coordinate arrays
target_disp = extractor.displacements[target_sc]
ref_disp = extractor.displacements[ref_sc]
ref_node_to_idx = {int(nid): i for i, nid in enumerate(ref_disp['node_ids'])}
X_list, Y_list, W_list = [], [], []
for i, nid in enumerate(target_disp['node_ids']):
nid = int(nid)
if nid not in ref_node_to_idx:
continue
geo = extractor.node_geometry.get(nid)
if geo is None:
continue
ref_idx = ref_node_to_idx[nid]
target_wfe = target_disp['disp'][i, 2] * extractor.wfe_factor
ref_wfe = ref_disp['disp'][ref_idx, 2] * extractor.wfe_factor
X_list.append(geo[0])
Y_list.append(geo[1])
W_list.append(target_wfe - ref_wfe)
X = np.array(X_list)
Y = np.array(Y_list)
W = np.array(W_list)
target_angle = SUBCASE_MAP.get(target_sc, target_sc)
ref_angle = SUBCASE_MAP.get(ref_sc, ref_sc)
# Check if this is the manufacturing (90 deg) comparison
is_mfg = (key == '90_vs_20')
# Compute correction metrics (relative coefficients) for manufacturing view
correction_metrics = None
if is_mfg and 'coefficients' in rel_data:
correction_metrics = compute_manufacturing_metrics(np.array(rel_data['coefficients']))
# Also compute rms_filter_j1to3 for the relative data
R = float(np.max(np.hypot(X - np.mean(X), Y - np.mean(Y))))
correction_metrics['rms_filter_j1to3'] = compute_rms_filter_j1to3(
X, Y, W, np.array(rel_data['coefficients']), R
)
html_content = generate_zernike_html(
title=f"iter{trial_number}: {target_angle} deg vs {ref_angle} deg",
X=X, Y=Y, W_nm=W,
coefficients=np.array(rel_data['coefficients']),
rms_global=rel_data['relative_global_rms_nm'],
rms_filtered=rel_data['relative_filtered_rms_nm'],
ref_title=f"{ref_angle} deg",
abs_pair=(abs_data['global_rms_nm'], abs_data['filtered_rms_nm']),
is_manufacturing=is_mfg,
mfg_metrics=abs_90_metrics if is_mfg else None,
correction_metrics=correction_metrics
)
results[key] = {
"html": html_content,
"rms_global": rel_data['relative_global_rms_nm'],
"rms_filtered": rel_data['relative_filtered_rms_nm'],
"title": f"{target_angle}° vs {ref_angle}°"
}
if not results:
raise HTTPException(
status_code=500,
detail="Failed to generate Zernike analysis. Check if subcases are available."
)
return {
"study_id": study_id,
"trial_number": trial_number,
"comparisons": results,
"available_comparisons": list(results.keys())
}
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to generate Zernike analysis: {str(e)}")
@router.get("/studies/{study_id}/export/{format}")
async def export_study_data(study_id: str, format: str):
"""Export study data in various formats: csv, json, excel"""