#!/usr/bin/env python """ Extract all M1 mirror optimization trial data from Optuna study databases. Outputs a consolidated CSV file with all parameters and objectives. """ import sqlite3 import json import csv from pathlib import Path from collections import defaultdict # Studies to extract (in order) STUDIES = [ "m1_mirror_zernike_optimization", "m1_mirror_adaptive_V11", "m1_mirror_adaptive_V13", "m1_mirror_adaptive_V14", "m1_mirror_adaptive_V15", "m1_mirror_cost_reduction", "m1_mirror_cost_reduction_V2", ] # All possible design variables (superset across all studies) DESIGN_VARS = [ "lateral_inner_angle", "lateral_outer_angle", "lateral_outer_pivot", "lateral_inner_pivot", "lateral_middle_pivot", "lateral_closeness", "whiffle_min", "whiffle_outer_to_vertical", "whiffle_triangle_closeness", "blank_backface_angle", "inner_circular_rib_dia", "center_thickness", ] # All objectives OBJECTIVES = [ "rel_filtered_rms_40_vs_20", "rel_filtered_rms_60_vs_20", "mfg_90_optician_workload", "mass_kg", ] def get_db_path(study_name: str) -> Path: """Get the database path for a study.""" # Check in M1_Mirror topic folder first (new structure) base = Path(__file__).parent / "studies" / "M1_Mirror" / study_name for subdir in ["3_results", "2_results"]: db_path = base / subdir / "study.db" if db_path.exists(): return db_path # Fallback to flat structure (backwards compatibility) base = Path(__file__).parent / "studies" / study_name for subdir in ["3_results", "2_results"]: db_path = base / subdir / "study.db" if db_path.exists(): return db_path return None def get_config_path(study_name: str) -> Path: """Get the config path for a study.""" # Check in M1_Mirror topic folder first (new structure) config_path = Path(__file__).parent / "studies" / "M1_Mirror" / study_name / "1_setup" / "optimization_config.json" if config_path.exists(): return config_path # Fallback to flat structure return Path(__file__).parent / "studies" / study_name / "1_setup" / "optimization_config.json" def load_objective_mapping(config_path: Path) -> dict: """Load objective names from config to map objective_id to name.""" with open(config_path) as f: config = json.load(f) objectives = config.get("objectives", []) # objective_id 0, 1, 2, ... maps to objectives in order return {i: obj["name"] for i, obj in enumerate(objectives)} def extract_trials_from_db(db_path: Path, obj_mapping: dict) -> list: """Extract all completed trials from an Optuna study database.""" conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Get all completed trials cursor.execute(""" SELECT trial_id FROM trials WHERE state = 'COMPLETE' """) trial_ids = [row[0] for row in cursor.fetchall()] trials = [] for trial_id in trial_ids: trial_data = {"trial_id": trial_id} # Get parameters cursor.execute(""" SELECT param_name, param_value FROM trial_params WHERE trial_id = ? """, (trial_id,)) for param_name, param_value in cursor.fetchall(): trial_data[param_name] = param_value # Get individual objective values from user attributes # (Atomizer stores individual objectives here, weighted_sum in trial_values) cursor.execute(""" SELECT key, value_json FROM trial_user_attributes WHERE trial_id = ? """, (trial_id,)) for key, value in cursor.fetchall(): # The value is JSON-encoded (string with quotes for strings, plain for numbers) try: # Try to parse as float first trial_data[key] = float(value) except ValueError: # Keep as string (e.g., source tag) trial_data[key] = value.strip('"') trials.append(trial_data) conn.close() return trials def main(): studies_dir = Path(__file__).parent / "studies" output_path = studies_dir / "m1_mirror_all_trials_export.csv" # CSV header header = ["study", "trial"] + DESIGN_VARS + OBJECTIVES all_rows = [] stats = {} for study_name in STUDIES: db_path = get_db_path(study_name) config_path = get_config_path(study_name) if not db_path or not db_path.exists(): print(f"[SKIP] {study_name}: No database found") stats[study_name] = 0 continue if not config_path.exists(): print(f"[SKIP] {study_name}: No config found") stats[study_name] = 0 continue print(f"[LOAD] {study_name}...") # Load objective mapping from config obj_mapping = load_objective_mapping(config_path) # Extract trials trials = extract_trials_from_db(db_path, obj_mapping) stats[study_name] = len(trials) # Convert to rows for trial in trials: row = { "study": study_name, "trial": trial["trial_id"], } # Add design variables for var in DESIGN_VARS: row[var] = trial.get(var, "") # Add objectives for obj in OBJECTIVES: row[obj] = trial.get(obj, "") all_rows.append(row) # Write CSV with open(output_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=header) writer.writeheader() writer.writerows(all_rows) print(f"\n{'='*60}") print(f"EXPORT COMPLETE: {output_path}") print(f"{'='*60}") print(f"\nTotal trials exported: {len(all_rows)}") print(f"\nTrials per study:") for study, count in stats.items(): print(f" {study}: {count}") if __name__ == "__main__": main()