Documentation: - Add docs/06_PHYSICS/ with Zernike fundamentals and OPD method docs - Add docs/guides/CMA-ES_EXPLAINED.md optimization guide - Update CLAUDE.md and ATOMIZER_CONTEXT.md with current architecture - Update OP_01_CREATE_STUDY protocol Planning: - Add DYNAMIC_RESPONSE plans for random vibration/PSD support - Add OPTIMIZATION_ENGINE_MIGRATION_PLAN for code reorganization Insights System: - Update design_space, modal_analysis, stress_field, thermal_field insights - Improve error handling and data validation NX Journals: - Add analyze_wfe_zernike.py for Zernike WFE analysis - Add capture_study_images.py for automated screenshots - Add extract_expressions.py and introspect_part.py utilities - Add user_generated_journals/journal_top_view_image_taking.py Tests & Tools: - Add comprehensive Zernike OPD test suite - Add audit_v10 tests for WFE validation - Add tools for Pareto graphs and mirror data extraction - Add migrate_studies_to_topics.py utility Knowledge Base: - Initialize LAC (Learning Atomizer Core) with failure/success patterns Dashboard: - Update Setup.tsx and launch_dashboard.py - Add restart-dev.bat helper script 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
193 lines
5.8 KiB
Python
193 lines
5.8 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Extract all M1 mirror optimization trial data from Optuna study databases.
|
|
Outputs a consolidated CSV file with all parameters and objectives.
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import csv
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
# Studies to extract (in order)
|
|
STUDIES = [
|
|
"m1_mirror_zernike_optimization",
|
|
"m1_mirror_adaptive_V11",
|
|
"m1_mirror_adaptive_V13",
|
|
"m1_mirror_adaptive_V14",
|
|
"m1_mirror_adaptive_V15",
|
|
"m1_mirror_cost_reduction",
|
|
"m1_mirror_cost_reduction_V2",
|
|
]
|
|
|
|
# All possible design variables (superset across all studies)
|
|
DESIGN_VARS = [
|
|
"lateral_inner_angle",
|
|
"lateral_outer_angle",
|
|
"lateral_outer_pivot",
|
|
"lateral_inner_pivot",
|
|
"lateral_middle_pivot",
|
|
"lateral_closeness",
|
|
"whiffle_min",
|
|
"whiffle_outer_to_vertical",
|
|
"whiffle_triangle_closeness",
|
|
"blank_backface_angle",
|
|
"inner_circular_rib_dia",
|
|
"center_thickness",
|
|
]
|
|
|
|
# All objectives
|
|
OBJECTIVES = [
|
|
"rel_filtered_rms_40_vs_20",
|
|
"rel_filtered_rms_60_vs_20",
|
|
"mfg_90_optician_workload",
|
|
"mass_kg",
|
|
]
|
|
|
|
|
|
def get_db_path(study_name: str) -> Path:
|
|
"""Get the database path for a study."""
|
|
# Check in M1_Mirror topic folder first (new structure)
|
|
base = Path(__file__).parent / "studies" / "M1_Mirror" / study_name
|
|
for subdir in ["3_results", "2_results"]:
|
|
db_path = base / subdir / "study.db"
|
|
if db_path.exists():
|
|
return db_path
|
|
# Fallback to flat structure (backwards compatibility)
|
|
base = Path(__file__).parent / "studies" / study_name
|
|
for subdir in ["3_results", "2_results"]:
|
|
db_path = base / subdir / "study.db"
|
|
if db_path.exists():
|
|
return db_path
|
|
return None
|
|
|
|
|
|
def get_config_path(study_name: str) -> Path:
|
|
"""Get the config path for a study."""
|
|
# Check in M1_Mirror topic folder first (new structure)
|
|
config_path = Path(__file__).parent / "studies" / "M1_Mirror" / study_name / "1_setup" / "optimization_config.json"
|
|
if config_path.exists():
|
|
return config_path
|
|
# Fallback to flat structure
|
|
return Path(__file__).parent / "studies" / study_name / "1_setup" / "optimization_config.json"
|
|
|
|
|
|
def load_objective_mapping(config_path: Path) -> dict:
|
|
"""Load objective names from config to map objective_id to name."""
|
|
with open(config_path) as f:
|
|
config = json.load(f)
|
|
|
|
objectives = config.get("objectives", [])
|
|
# objective_id 0, 1, 2, ... maps to objectives in order
|
|
return {i: obj["name"] for i, obj in enumerate(objectives)}
|
|
|
|
|
|
def extract_trials_from_db(db_path: Path, obj_mapping: dict) -> list:
|
|
"""Extract all completed trials from an Optuna study database."""
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Get all completed trials
|
|
cursor.execute("""
|
|
SELECT trial_id FROM trials WHERE state = 'COMPLETE'
|
|
""")
|
|
trial_ids = [row[0] for row in cursor.fetchall()]
|
|
|
|
trials = []
|
|
for trial_id in trial_ids:
|
|
trial_data = {"trial_id": trial_id}
|
|
|
|
# Get parameters
|
|
cursor.execute("""
|
|
SELECT param_name, param_value FROM trial_params WHERE trial_id = ?
|
|
""", (trial_id,))
|
|
for param_name, param_value in cursor.fetchall():
|
|
trial_data[param_name] = param_value
|
|
|
|
# Get individual objective values from user attributes
|
|
# (Atomizer stores individual objectives here, weighted_sum in trial_values)
|
|
cursor.execute("""
|
|
SELECT key, value_json FROM trial_user_attributes WHERE trial_id = ?
|
|
""", (trial_id,))
|
|
for key, value in cursor.fetchall():
|
|
# The value is JSON-encoded (string with quotes for strings, plain for numbers)
|
|
try:
|
|
# Try to parse as float first
|
|
trial_data[key] = float(value)
|
|
except ValueError:
|
|
# Keep as string (e.g., source tag)
|
|
trial_data[key] = value.strip('"')
|
|
|
|
trials.append(trial_data)
|
|
|
|
conn.close()
|
|
return trials
|
|
|
|
|
|
def main():
|
|
studies_dir = Path(__file__).parent / "studies"
|
|
output_path = studies_dir / "m1_mirror_all_trials_export.csv"
|
|
|
|
# CSV header
|
|
header = ["study", "trial"] + DESIGN_VARS + OBJECTIVES
|
|
|
|
all_rows = []
|
|
stats = {}
|
|
|
|
for study_name in STUDIES:
|
|
db_path = get_db_path(study_name)
|
|
config_path = get_config_path(study_name)
|
|
|
|
if not db_path or not db_path.exists():
|
|
print(f"[SKIP] {study_name}: No database found")
|
|
stats[study_name] = 0
|
|
continue
|
|
|
|
if not config_path.exists():
|
|
print(f"[SKIP] {study_name}: No config found")
|
|
stats[study_name] = 0
|
|
continue
|
|
|
|
print(f"[LOAD] {study_name}...")
|
|
|
|
# Load objective mapping from config
|
|
obj_mapping = load_objective_mapping(config_path)
|
|
|
|
# Extract trials
|
|
trials = extract_trials_from_db(db_path, obj_mapping)
|
|
stats[study_name] = len(trials)
|
|
|
|
# Convert to rows
|
|
for trial in trials:
|
|
row = {
|
|
"study": study_name,
|
|
"trial": trial["trial_id"],
|
|
}
|
|
# Add design variables
|
|
for var in DESIGN_VARS:
|
|
row[var] = trial.get(var, "")
|
|
# Add objectives
|
|
for obj in OBJECTIVES:
|
|
row[obj] = trial.get(obj, "")
|
|
|
|
all_rows.append(row)
|
|
|
|
# Write CSV
|
|
with open(output_path, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=header)
|
|
writer.writeheader()
|
|
writer.writerows(all_rows)
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"EXPORT COMPLETE: {output_path}")
|
|
print(f"{'='*60}")
|
|
print(f"\nTotal trials exported: {len(all_rows)}")
|
|
print(f"\nTrials per study:")
|
|
for study, count in stats.items():
|
|
print(f" {study}: {count}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|