#!/usr/bin/env python3 """ Cross-Validation Against prysm (Phase 2) ========================================== Compares the Atomizer Zernike implementation against prysm, an established open-source physical optics library used by the astronomical optics community. Tests: 1. Noll index → (n,m) mapping agreement 2. Zernike polynomial evaluation agreement (pointwise) 3. Coefficient recovery agreement on same synthetic data 4. RMS wavefront error agreement Author: Mario (Atomizer V&V) Created: 2026-03-09 Project: P-Zernike-Validation (GigaBIT) """ import sys import json import csv from pathlib import Path from math import factorial from typing import Tuple, Dict import numpy as np from numpy.linalg import lstsq # prysm imports from prysm.polynomials import noll_to_nm, zernike_nm from prysm.coordinates import make_xy_grid, cart_to_polar # ============================================================================ # Atomizer Zernike (copy from extract_zernike.py for self-contained comparison) # ============================================================================ def atomizer_noll_indices(j): if j < 1: raise ValueError count, n = 0, 0 while True: if n == 0: ms = [0] elif n % 2 == 0: ms = [0] + [m for k in range(1, n // 2 + 1) for m in (-2 * k, 2 * k)] else: ms = [m for k in range(0, (n + 1) // 2) for m in (-(2 * k + 1), (2 * k + 1))] for m in ms: count += 1 if count == j: return n, m n += 1 def atomizer_zernike_radial(n, m, r): R = np.zeros_like(r) m_abs = abs(m) for s in range((n - m_abs) // 2 + 1): coef = ((-1)**s * factorial(n - s) / (factorial(s) * factorial((n + m_abs)//2 - s) * factorial((n - m_abs)//2 - s))) R += coef * r**(n - 2*s) return R def atomizer_zernike_noll(j, r, theta): n, m = atomizer_noll_indices(j) R = atomizer_zernike_radial(n, m, r) if m == 0: return R elif m > 0: return R * np.cos(m * theta) else: return R * np.sin(-m * theta) def zernike_name(j): n, m = atomizer_noll_indices(j) names = { (0,0):"Piston",(1,-1):"Tilt X",(1,1):"Tilt Y", (2,0):"Defocus",(2,-2):"Astig 45°",(2,2):"Astig 0°", (3,-1):"Coma X",(3,1):"Coma Y",(3,-3):"Trefoil X",(3,3):"Trefoil Y", (4,0):"Spherical",(4,-2):"2nd Astig X",(4,2):"2nd Astig Y", (6,0):"2nd Sph", } return names.get((n, m), f"Z{j}") # ============================================================================ # Test 1: Noll Index Mapping Agreement # ============================================================================ def test_noll_mapping(n_modes=50): """Compare Noll index → (n, m) between Atomizer and prysm.""" print("=" * 70) print("TEST 1: Noll Index Mapping Agreement") print("=" * 70) mismatches = [] for j in range(1, n_modes + 1): # Atomizer n_ato, m_ato = atomizer_noll_indices(j) # prysm n_pry, m_pry = noll_to_nm(j) match = (n_ato == n_pry and m_ato == m_pry) if not match: mismatches.append((j, (n_ato, m_ato), (n_pry, m_pry))) print(f" ❌ j={j}: Atomizer=({n_ato},{m_ato}) vs prysm=({n_pry},{m_pry})") if not mismatches: print(f" ✅ All {n_modes} Noll indices match perfectly") else: print(f" ❌ {len(mismatches)} mismatches found!") return len(mismatches) == 0, mismatches # ============================================================================ # Test 2: Polynomial Evaluation Agreement # ============================================================================ def test_polynomial_evaluation(n_modes=50, n_points=5000): """Compare Zernike polynomial values at same (r, θ) points.""" print("\n" + "=" * 70) print("TEST 2: Polynomial Evaluation Agreement") print("=" * 70) # Generate test points on unit disk rng = np.random.default_rng(42) r = rng.uniform(0, 1, n_points) theta = rng.uniform(0, 2 * np.pi, n_points) max_diffs = [] rms_diffs = [] all_pass = True print(f"\n {'j':>4} {'Name':>20} {'Max Diff':>12} {'RMS Diff':>12} {'Status':>8}") print(f" {'-'*4} {'-'*20} {'-'*12} {'-'*12} {'-'*8}") for j in range(1, n_modes + 1): # Atomizer evaluation z_ato = atomizer_zernike_noll(j, r, theta) # prysm evaluation n, m = noll_to_nm(j) z_pry = zernike_nm(n, m, r, theta, norm=False) diff = np.abs(z_ato - z_pry) max_diff = np.max(diff) rms_diff = np.sqrt(np.mean(diff**2)) max_diffs.append(max_diff) rms_diffs.append(rms_diff) passed = max_diff < 1e-10 if not passed: all_pass = False status = "✅" if passed else "❌" if max_diff > 1e-14 or j <= 11: print(f" Z{j:>3} {zernike_name(j):>20} {max_diff:>12.2e} {rms_diff:>12.2e} {status:>8}") print(f"\n Overall max diff: {max(max_diffs):.2e}") print(f" Overall RMS diff: {np.sqrt(np.mean(np.array(rms_diffs)**2)):.2e}") print(f" Result: {'✅ ALL MATCH' if all_pass else '❌ DISCREPANCIES FOUND'}") return all_pass, max_diffs, rms_diffs # ============================================================================ # Test 3: Coefficient Recovery Agreement # ============================================================================ def test_coefficient_recovery(n_modes=50, n_points=2000): """ Generate surface with known coefficients, fit with both implementations, compare recovered coefficients. """ print("\n" + "=" * 70) print("TEST 3: Coefficient Recovery Agreement") print("=" * 70) # Generate random test points on unit disk rng = np.random.default_rng(42) x = rng.uniform(-1, 1, n_points * 2) y = rng.uniform(-1, 1, n_points * 2) mask = x**2 + y**2 <= 1.0 x = x[mask][:n_points] y = y[mask][:n_points] r = np.sqrt(x**2 + y**2) theta = np.arctan2(y, x) # Known coefficients (realistic mirror) input_coeffs = {5: 80, 6: 45, 7: 30, 8: 20, 9: 15, 11: 10, 13: 5, 16: 3, 22: 2} # Generate surface using Atomizer math surface = np.zeros(len(x)) for j, amp in input_coeffs.items(): surface += amp * atomizer_zernike_noll(j, r, theta) # Fit with Atomizer basis Z_ato = np.zeros((len(x), n_modes)) for j in range(1, n_modes + 1): Z_ato[:, j-1] = atomizer_zernike_noll(j, r, theta) coeffs_ato, _, _, _ = lstsq(Z_ato, surface, rcond=None) # Fit with prysm basis Z_pry = np.zeros((len(x), n_modes)) for j in range(1, n_modes + 1): n, m = noll_to_nm(j) Z_pry[:, j-1] = zernike_nm(n, m, r, theta, norm=False) coeffs_pry, _, _, _ = lstsq(Z_pry, surface, rcond=None) # Compare print(f"\n {'j':>4} {'Name':>20} {'Input':>10} {'Atomizer':>12} {'prysm':>12} {'Ato-prysm':>12} {'Status':>8}") print(f" {'-'*4} {'-'*20} {'-'*10} {'-'*12} {'-'*12} {'-'*12} {'-'*8}") max_diff = 0 all_pass = True for j in range(1, n_modes + 1): inp = input_coeffs.get(j, 0.0) ato = coeffs_ato[j-1] pry = coeffs_pry[j-1] diff = abs(ato - pry) max_diff = max(max_diff, diff) passed = diff < 1e-8 if not passed: all_pass = False if abs(inp) > 0.01 or diff > 1e-10: status = "✅" if passed else "❌" print(f" Z{j:>3} {zernike_name(j):>20} {inp:>10.3f} {ato:>12.6f} {pry:>12.6f} {diff:>12.2e} {status:>8}") print(f"\n Max Atomizer-prysm difference: {max_diff:.2e}") print(f" Result: {'✅ IMPLEMENTATIONS AGREE' if all_pass else '❌ DISCREPANCIES FOUND'}") return all_pass, coeffs_ato, coeffs_pry # ============================================================================ # Test 4: RMS Wavefront Error Agreement # ============================================================================ def test_rms_agreement(n_modes=50, n_points=5000): """Compare RMS WFE computation between implementations.""" print("\n" + "=" * 70) print("TEST 4: RMS Wavefront Error Agreement") print("=" * 70) rng = np.random.default_rng(42) x = rng.uniform(-1, 1, n_points * 2) y = rng.uniform(-1, 1, n_points * 2) mask = x**2 + y**2 <= 1.0 x = x[mask][:n_points] y = y[mask][:n_points] r = np.sqrt(x**2 + y**2) theta = np.arctan2(y, x) # Test multiple coefficient sets test_sets = { "Single astig": {5: 100}, "Gravity-like": {5: 80, 6: 45, 7: 30, 8: 20, 9: 15, 11: 10}, "High-order": {22: 50, 37: 30}, "All modes": {j: 100/j for j in range(5, 51)}, } all_pass = True print(f"\n {'Test':>20} {'RMS Atomizer':>14} {'RMS prysm':>14} {'Diff':>12} {'Status':>8}") print(f" {'-'*20} {'-'*14} {'-'*14} {'-'*12} {'-'*8}") for name, coeffs in test_sets.items(): # Build surface with Atomizer surf_ato = np.zeros(len(x)) for j, amp in coeffs.items(): surf_ato += amp * atomizer_zernike_noll(j, r, theta) # Build surface with prysm surf_pry = np.zeros(len(x)) for j, amp in coeffs.items(): n, m = noll_to_nm(j) surf_pry += amp * zernike_nm(n, m, r, theta, norm=False) rms_ato = np.sqrt(np.mean(surf_ato**2)) rms_pry = np.sqrt(np.mean(surf_pry**2)) diff = abs(rms_ato - rms_pry) passed = diff < 1e-8 if not passed: all_pass = False status = "✅" if passed else "❌" print(f" {name:>20} {rms_ato:>14.6f} {rms_pry:>14.6f} {diff:>12.2e} {status:>8}") print(f"\n Result: {'✅ RMS VALUES AGREE' if all_pass else '❌ DISCREPANCIES FOUND'}") return all_pass # ============================================================================ # Generate Phase 2 Report Figures # ============================================================================ def generate_phase2_figures(output_dir, n_modes=50): """Generate figures for Phase 2 report.""" import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # --- Figure: Polynomial evaluation comparison --- rng = np.random.default_rng(42) r = rng.uniform(0, 1, 5000) theta = rng.uniform(0, 2*np.pi, 5000) max_diffs = [] for j in range(1, n_modes + 1): z_ato = atomizer_zernike_noll(j, r, theta) n, m = noll_to_nm(j) z_pry = zernike_nm(n, m, r, theta, norm=False) max_diffs.append(np.max(np.abs(z_ato - z_pry))) fig, ax = plt.subplots(figsize=(14, 5)) colors = ['#4CAF50' if d < 1e-10 else '#F44336' for d in max_diffs] ax.bar(range(1, 51), max_diffs, color=colors, alpha=0.85) ax.set_xlabel('Noll Index j', fontsize=11) ax.set_ylabel('Max |Atomizer - prysm|', fontsize=11) ax.set_title('Phase 2: Polynomial Evaluation Agreement — Atomizer vs prysm\n(5000 random points on unit disk, 50 modes)', fontsize=13, fontweight='bold') ax.set_yscale('symlog', linthresh=1e-16) ax.axhline(y=1e-10, color='red', linestyle='--', alpha=0.5, label='Threshold (1e-10)') ax.legend() ax.grid(axis='y', alpha=0.3) plt.tight_layout() fig.savefig(output_dir / "fig7_prysm_polynomial_agreement.png", dpi=150, bbox_inches='tight') plt.close() print(f" ✅ fig7_prysm_polynomial_agreement.png") # --- Figure: Coefficient recovery comparison --- rng = np.random.default_rng(42) x = rng.uniform(-1, 1, 4000) y = rng.uniform(-1, 1, 4000) mask = x**2 + y**2 <= 1.0 x, y = x[mask][:2000], y[mask][:2000] r2 = np.sqrt(x**2 + y**2) th2 = np.arctan2(y, x) input_coeffs = {5: 80, 6: 45, 7: 30, 8: 20, 9: 15, 11: 10, 13: 5, 16: 3, 22: 2} surface = sum(amp * atomizer_zernike_noll(j, r2, th2) for j, amp in input_coeffs.items()) # Fit with both Z_ato = np.column_stack([atomizer_zernike_noll(j, r2, th2) for j in range(1, 51)]) Z_pry = np.column_stack([zernike_nm(*noll_to_nm(j), r2, th2, norm=False) for j in range(1, 51)]) c_ato, _, _, _ = lstsq(Z_ato, surface, rcond=None) c_pry, _, _, _ = lstsq(Z_pry, surface, rcond=None) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6)) modes = np.arange(1, 51) input_arr = np.array([input_coeffs.get(j, 0) for j in modes]) ax1.bar(modes - 0.3, input_arr, 0.3, label='Input (Truth)', color='#2196F3', alpha=0.85) ax1.bar(modes, c_ato, 0.3, label='Atomizer', color='#FF9800', alpha=0.85) ax1.bar(modes + 0.3, c_pry, 0.3, label='prysm', color='#4CAF50', alpha=0.85) ax1.set_xlabel('Noll Index j', fontsize=11) ax1.set_ylabel('Coefficient (nm)', fontsize=11) ax1.set_title('Coefficient Recovery: Truth vs Atomizer vs prysm', fontsize=12, fontweight='bold') ax1.legend() ax1.grid(axis='y', alpha=0.3) diff = np.abs(c_ato - c_pry) ax2.bar(modes, diff, color='#9C27B0', alpha=0.85) ax2.set_xlabel('Noll Index j', fontsize=11) ax2.set_ylabel('|Atomizer - prysm| (nm)', fontsize=11) ax2.set_title('Inter-Implementation Difference\n(should be ~machine epsilon)', fontsize=12, fontweight='bold') ax2.set_yscale('symlog', linthresh=1e-14) ax2.grid(axis='y', alpha=0.3) plt.tight_layout() fig.savefig(output_dir / "fig8_prysm_coefficient_agreement.png", dpi=150, bbox_inches='tight') plt.close() print(f" ✅ fig8_prysm_coefficient_agreement.png") # --- Figure: Side-by-side surface evaluation --- n_grid = 200 x_g = np.linspace(-1, 1, n_grid) y_g = np.linspace(-1, 1, n_grid) xx, yy = np.meshgrid(x_g, y_g) rr = np.sqrt(xx**2 + yy**2) tt = np.arctan2(yy, xx) mask_g = rr <= 1.0 # Evaluate Z7 (coma) with both j_test = 7 z_ato_g = np.full_like(xx, np.nan) z_pry_g = np.full_like(xx, np.nan) z_ato_g[mask_g] = atomizer_zernike_noll(j_test, rr[mask_g], tt[mask_g]) n7, m7 = noll_to_nm(j_test) z_pry_g[mask_g] = zernike_nm(n7, m7, rr[mask_g], tt[mask_g], norm=False) diff_g = np.full_like(xx, np.nan) diff_g[mask_g] = z_ato_g[mask_g] - z_pry_g[mask_g] fig, axes = plt.subplots(1, 3, figsize=(18, 5.5)) for ax, data, title, cmap in zip( axes, [z_ato_g, z_pry_g, diff_g], [f'Atomizer Z{j_test} (Coma X)', f'prysm Z{j_test} (Coma X)', 'Difference'], ['RdBu_r', 'RdBu_r', 'RdBu_r'] ): im = ax.imshow(data, extent=[-1, 1, -1, 1], cmap=cmap, origin='lower') ax.set_title(title, fontsize=12, fontweight='bold') ax.set_xlabel('x (normalized)') ax.set_ylabel('y (normalized)') plt.colorbar(im, ax=ax, shrink=0.8) fig.suptitle('Phase 2: Zernike Polynomial Visual Comparison — Atomizer vs prysm', fontsize=13, fontweight='bold', y=1.02) plt.tight_layout() fig.savefig(output_dir / "fig9_prysm_surface_comparison.png", dpi=150, bbox_inches='tight') plt.close() print(f" ✅ fig9_prysm_surface_comparison.png") # ============================================================================ # Main # ============================================================================ def main(): output_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("/home/papa/obsidian-vault/2-Projects/P-Zernike-Validation/figures") print("=" * 70) print("ZERNIKE CROSS-VALIDATION: Atomizer vs prysm") print("=" * 70) print(f"prysm version: 0.21.1") print() results = {} # Test 1 passed, mismatches = test_noll_mapping() results["noll_mapping"] = {"passed": passed, "mismatches": len(mismatches)} # Test 2 passed, max_diffs, rms_diffs = test_polynomial_evaluation() results["polynomial_eval"] = {"passed": passed, "max_diff": float(max(max_diffs))} # Test 3 passed, c_ato, c_pry = test_coefficient_recovery() results["coefficient_recovery"] = {"passed": passed, "max_diff": float(max(abs(c_ato - c_pry)))} # Test 4 passed = test_rms_agreement() results["rms_agreement"] = {"passed": passed} # Generate figures print("\n" + "=" * 70) print("GENERATING PHASE 2 FIGURES") print("=" * 70) generate_phase2_figures(output_dir) # Summary all_passed = all(v["passed"] for v in results.values()) print("\n" + "=" * 70) print("CROSS-VALIDATION SUMMARY") print("=" * 70) for test, result in results.items(): status = "✅ PASS" if result["passed"] else "❌ FAIL" print(f" {test:>25}: {status}") print(f"\n Overall: {'✅ ATOMIZER MATCHES PRYSM' if all_passed else '❌ DISCREPANCIES FOUND'}") # Save results results_path = output_dir / "phase2_cross_validation_results.json" with open(results_path, 'w') as f: json.dump(results, f, indent=2) print(f"\n Results saved: {results_path}") return 0 if all_passed else 1 if __name__ == "__main__": sys.exit(main())