From 1bb201e0b78fdda67b5ad1d4fa3710ce8dc5908e Mon Sep 17 00:00:00 2001 From: Antoine Date: Fri, 12 Dec 2025 10:28:35 -0500 Subject: [PATCH] feat: Add post-optimization tools and mandatory best design archiving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New Tools (tools/): - analyze_study.py: Generate comprehensive optimization reports - find_best_iteration.py: Find best iteration folder, optionally copy it - archive_best_design.py: Archive best design to 3_results/best_design_archive// Protocol Updates: - OP_02_RUN_OPTIMIZATION.md v1.1: Add mandatory archive_best_design step in Post-Run Actions. This MUST be done after every optimization run. V14 Updates: - run_optimization.py: Auto-archive best design at end of optimization - optimization_config.json: Expand bounds for V14 continuation - lateral_outer_angle: min 13->11 deg (was at 4.7%) - lateral_inner_pivot: min 7->5 mm (was at 8.1%) - lateral_middle_pivot: max 23->27 mm (was at 99.4%) - whiffle_min: max 60->72 mm (was at 96.3%) Usage: python tools/analyze_study.py m1_mirror_adaptive_V14 python tools/find_best_iteration.py m1_mirror_adaptive_V14 python tools/archive_best_design.py m1_mirror_adaptive_V14 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../operations/OP_02_RUN_OPTIMIZATION.md | 36 +- .../1_setup/optimization_config.json | 22 +- .../run_optimization.py | 26 ++ tools/analyze_study.py | 405 ++++++++++++++++++ tools/archive_best_design.py | 247 +++++++++++ tools/find_best_iteration.py | 192 +++++++++ 6 files changed, 913 insertions(+), 15 deletions(-) create mode 100644 tools/analyze_study.py create mode 100644 tools/archive_best_design.py create mode 100644 tools/find_best_iteration.py diff --git a/docs/protocols/operations/OP_02_RUN_OPTIMIZATION.md b/docs/protocols/operations/OP_02_RUN_OPTIMIZATION.md index 5d97358d..20b7e157 100644 --- a/docs/protocols/operations/OP_02_RUN_OPTIMIZATION.md +++ b/docs/protocols/operations/OP_02_RUN_OPTIMIZATION.md @@ -5,7 +5,7 @@ PROTOCOL: Run Optimization LAYER: Operations VERSION: 1.0 STATUS: Active -LAST_UPDATED: 2025-12-05 +LAST_UPDATED: 2025-12-12 PRIVILEGE: user LOAD_WITH: [] --> @@ -237,16 +237,39 @@ Continues from last completed trial. Same study database used. After optimization completes: -1. **Check results**: +1. **Archive best design** (REQUIRED): ```bash - python -c "import optuna; s=optuna.load_study(...); print(s.best_params)" + python tools/archive_best_design.py {study_name} ``` + This copies the best iteration folder to `3_results/best_design_archive//` + with metadata. **Always do this** to preserve the winning design. -2. **View in dashboard**: `http://localhost:3000` +2. **Analyze results**: + ```bash + python tools/analyze_study.py {study_name} + ``` + Generates comprehensive report with statistics, parameter bounds analysis. -3. **Generate report**: See [OP_04_ANALYZE_RESULTS](./OP_04_ANALYZE_RESULTS.md) +3. **Find best iteration folder**: + ```bash + python tools/find_best_iteration.py {study_name} + ``` + Shows which `iter{N}` folder contains the best design. -4. **Update STUDY_REPORT.md**: Fill in results template +4. **View in dashboard**: `http://localhost:3000` + +5. **Generate detailed report**: See [OP_04_ANALYZE_RESULTS](./OP_04_ANALYZE_RESULTS.md) + +### Automated Archiving + +The `run_optimization.py` script should call `archive_best_design()` automatically +at the end of each run. If implementing a new study, add this at the end: + +```python +# At end of optimization +from tools.archive_best_design import archive_best_design +archive_best_design(study_name) +``` --- @@ -294,4 +317,5 @@ If `--neural` flag, uses trained surrogate for fast evaluation. | Version | Date | Changes | |---------|------|---------| +| 1.1 | 2025-12-12 | Added mandatory archive_best_design step, analyze_study and find_best_iteration tools | | 1.0 | 2025-12-05 | Initial release | diff --git a/studies/m1_mirror_adaptive_V14/1_setup/optimization_config.json b/studies/m1_mirror_adaptive_V14/1_setup/optimization_config.json index 14b451eb..1c133069 100644 --- a/studies/m1_mirror_adaptive_V14/1_setup/optimization_config.json +++ b/studies/m1_mirror_adaptive_V14/1_setup/optimization_config.json @@ -1,7 +1,7 @@ { "$schema": "Atomizer M1 Mirror TPE Optimization V14", "study_name": "m1_mirror_adaptive_V14", - "description": "V14 - TPE single-objective optimization seeded from V11+V12+V13 FEA trials. Weighted-sum objective for efficient convergence.", + "description": "V14 continuation - TPE with expanded bounds based on V14 analysis. 4 params were at bounds: lateral_middle_pivot (99.4%), whiffle_min (96.3%), lateral_outer_angle (4.7%), lateral_inner_pivot (8.1%).", "source_studies": { "v11": { @@ -31,11 +31,12 @@ { "name": "lateral_outer_angle", "expression_name": "lateral_outer_angle", - "min": 13.0, + "min": 11.0, "max": 17.0, "baseline": 14.64, "units": "degrees", - "enabled": true + "enabled": true, + "_note": "Expanded min from 13 to 11 (was at 4.7% of range)" }, { "name": "lateral_outer_pivot", @@ -49,20 +50,22 @@ { "name": "lateral_inner_pivot", "expression_name": "lateral_inner_pivot", - "min": 7.0, + "min": 5.0, "max": 12.0, "baseline": 10.07, "units": "mm", - "enabled": true + "enabled": true, + "_note": "Expanded min from 7 to 5 (was at 8.1% of range)" }, { "name": "lateral_middle_pivot", "expression_name": "lateral_middle_pivot", "min": 15.0, - "max": 23.0, + "max": 27.0, "baseline": 20.73, "units": "mm", - "enabled": true + "enabled": true, + "_note": "Expanded max from 23 to 27 (was at 99.4% of range)" }, { "name": "lateral_closeness", @@ -77,10 +80,11 @@ "name": "whiffle_min", "expression_name": "whiffle_min", "min": 30.0, - "max": 60.0, + "max": 72.0, "baseline": 40.55, "units": "mm", - "enabled": true + "enabled": true, + "_note": "Expanded max from 60 to 72 (was at 96.3% of range)" }, { "name": "whiffle_outer_to_vertical", diff --git a/studies/m1_mirror_adaptive_V14/run_optimization.py b/studies/m1_mirror_adaptive_V14/run_optimization.py index 41fba4f0..edf12c69 100644 --- a/studies/m1_mirror_adaptive_V14/run_optimization.py +++ b/studies/m1_mirror_adaptive_V14/run_optimization.py @@ -600,6 +600,32 @@ class TPEOptimizer: logger.info(f"\nResults saved to {RESULTS_DIR / 'final_results.json'}") + # Archive best design + self._archive_best_design() + + def _archive_best_design(self): + """Archive the best design iteration folder.""" + try: + # Import archive tool + tools_dir = Path(__file__).parent.parent.parent / "tools" + sys.path.insert(0, str(tools_dir)) + from archive_best_design import archive_best_design + + logger.info("\n" + "-" * 70) + logger.info("ARCHIVING BEST DESIGN") + logger.info("-" * 70) + + result = archive_best_design(str(Path(__file__).parent)) + + if result.get('success'): + logger.info(f"[OK] Best design archived to: {result['archive_path']}") + logger.info(f" Trial #{result['trial_number']}, WS={result['weighted_sum']:.2f}") + else: + logger.warning(f"[WARN] Archive skipped: {result.get('reason', 'Unknown')}") + + except Exception as e: + logger.error(f"[ERROR] Failed to archive best design: {e}") + # ============================================================================ # Main diff --git a/tools/analyze_study.py b/tools/analyze_study.py new file mode 100644 index 00000000..c0837fb0 --- /dev/null +++ b/tools/analyze_study.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python +""" +Atomizer Study Analysis Tool + +Generates comprehensive optimization reports for any Atomizer study. +Detects study type (single-objective TPE, multi-objective NSGA-II) automatically. + +Usage: + python tools/analyze_study.py + python tools/analyze_study.py m1_mirror_adaptive_V14 + python tools/analyze_study.py m1_mirror_adaptive_V14 --export report.md + +Author: Atomizer +Created: 2025-12-12 +""" + +import argparse +import json +import sqlite3 +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import sys + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +try: + import numpy as np + HAS_NUMPY = True +except ImportError: + HAS_NUMPY = False + + +def find_study_path(study_name: str) -> Path: + """Find study directory by name.""" + studies_dir = Path(__file__).parent.parent / "studies" + study_path = studies_dir / study_name + + if not study_path.exists(): + raise FileNotFoundError(f"Study not found: {study_path}") + + return study_path + + +def load_config(study_path: Path) -> Dict: + """Load optimization config.""" + config_path = study_path / "1_setup" / "optimization_config.json" + if not config_path.exists(): + raise FileNotFoundError(f"Config not found: {config_path}") + + with open(config_path) as f: + return json.load(f) + + +def get_db_connection(study_path: Path) -> sqlite3.Connection: + """Get database connection.""" + db_path = study_path / "3_results" / "study.db" + if not db_path.exists(): + raise FileNotFoundError(f"Database not found: {db_path}") + + return sqlite3.connect(str(db_path)) + + +def detect_study_type(conn: sqlite3.Connection) -> str: + """Detect if study is single or multi-objective.""" + cursor = conn.cursor() + cursor.execute("SELECT DISTINCT objective FROM trial_values") + objectives = [r[0] for r in cursor.fetchall()] + + if len(objectives) == 1: + return "single_objective" + else: + return "multi_objective" + + +def get_trial_counts(conn: sqlite3.Connection) -> Dict[str, int]: + """Get trial counts by source.""" + cursor = conn.cursor() + + # Total completed + cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'") + total = cursor.fetchone()[0] + + # By source + cursor.execute(""" + SELECT tua.value_json, COUNT(*) as cnt + FROM trials t + JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id + WHERE t.state = 'COMPLETE' AND tua.key = 'source' + GROUP BY tua.value_json + """) + sources = {json.loads(r[0]): r[1] for r in cursor.fetchall()} + + fea_count = sources.get("FEA", 0) + seeded_count = total - fea_count + + return { + "total": total, + "fea": fea_count, + "seeded": seeded_count, + "sources": sources + } + + +def get_all_trials_with_objectives(conn: sqlite3.Connection) -> List[Dict]: + """Get all trials with their objective values from user attributes.""" + cursor = conn.cursor() + + # Get all user attribute keys that look like objectives + cursor.execute("SELECT DISTINCT key FROM trial_user_attributes") + all_keys = [r[0] for r in cursor.fetchall()] + + # Common objective-related keys + obj_keys = [k for k in all_keys if k not in ['source', 'solve_time', 'iter_num']] + + # Build query dynamically + select_parts = ["t.number", "t.trial_id"] + join_parts = [] + + for i, key in enumerate(obj_keys): + alias = f"tua_{i}" + select_parts.append(f"{alias}.value_json as {key}") + join_parts.append( + f"LEFT JOIN trial_user_attributes {alias} ON t.trial_id = {alias}.trial_id AND {alias}.key = '{key}'" + ) + + # Add source + select_parts.append("tua_src.value_json as source") + join_parts.append( + "LEFT JOIN trial_user_attributes tua_src ON t.trial_id = tua_src.trial_id AND tua_src.key = 'source'" + ) + + query = f""" + SELECT {', '.join(select_parts)} + FROM trials t + {' '.join(join_parts)} + WHERE t.state = 'COMPLETE' + """ + + cursor.execute(query) + rows = cursor.fetchall() + + # Parse results + trials = [] + for row in rows: + trial = { + "number": row[0], + "trial_id": row[1], + } + + # Parse objective values + for i, key in enumerate(obj_keys): + val = row[2 + i] + if val is not None: + try: + trial[key] = float(val) + except (ValueError, TypeError): + trial[key] = json.loads(val) if val else None + + # Parse source + source_val = row[-1] + trial["source"] = json.loads(source_val) if source_val else "unknown" + + trials.append(trial) + + return trials, obj_keys + + +def get_trial_params(conn: sqlite3.Connection, trial_number: int) -> Dict[str, float]: + """Get parameters for a specific trial.""" + cursor = conn.cursor() + cursor.execute("SELECT trial_id FROM trials WHERE number = ?", (trial_number,)) + result = cursor.fetchone() + if not result: + return {} + + trial_id = result[0] + cursor.execute( + "SELECT param_name, param_value FROM trial_params WHERE trial_id = ?", + (trial_id,) + ) + return {name: float(val) for name, val in cursor.fetchall()} + + +def find_best_iteration_folder(study_path: Path, trial_number: int, conn: sqlite3.Connection) -> Optional[str]: + """Map trial number to iteration folder.""" + cursor = conn.cursor() + + # Get all FEA trial numbers in order + cursor.execute(""" + SELECT t.number + FROM trials t + JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id + WHERE t.state = 'COMPLETE' AND tua.key = 'source' AND tua.value_json = '"FEA"' + ORDER BY t.number + """) + fea_trials = [r[0] for r in cursor.fetchall()] + + if trial_number in fea_trials: + iter_num = fea_trials.index(trial_number) + 1 + return f"iter{iter_num}" + + return None + + +def analyze_parameter_bounds(params: Dict[str, float], config: Dict) -> List[Dict]: + """Check which parameters are near bounds.""" + near_bounds = [] + + for var in config.get("design_variables", []): + name = var["name"] + if name not in params: + continue + + val = params[name] + vmin, vmax = var["min"], var["max"] + position = (val - vmin) / (vmax - vmin) * 100 + + if position < 10: + near_bounds.append({ + "name": name, + "bound": "lower", + "position": position, + "value": val, + "min": vmin, + "max": vmax + }) + elif position > 90: + near_bounds.append({ + "name": name, + "bound": "upper", + "position": position, + "value": val, + "min": vmin, + "max": vmax + }) + + return near_bounds + + +def generate_report(study_name: str) -> str: + """Generate comprehensive study report.""" + study_path = find_study_path(study_name) + config = load_config(study_path) + conn = get_db_connection(study_path) + + # Gather data + study_type = detect_study_type(conn) + counts = get_trial_counts(conn) + trials, obj_keys = get_all_trials_with_objectives(conn) + + # Filter valid trials (exclude failed with WS > 1000) + if "weighted_sum" in obj_keys: + valid_trials = [t for t in trials if t.get("weighted_sum", 0) < 1000] + failed_count = len(trials) - len(valid_trials) + else: + valid_trials = trials + failed_count = 0 + + # Sort by weighted_sum if available, else by first objective + sort_key = "weighted_sum" if "weighted_sum" in obj_keys else obj_keys[0] if obj_keys else None + if sort_key: + valid_trials.sort(key=lambda x: x.get(sort_key, float('inf'))) + + # Separate V14 FEA trials + fea_trials = [t for t in valid_trials if t.get("source") == "FEA"] + + # Get best trial + best_trial = valid_trials[0] if valid_trials else None + best_fea = fea_trials[0] if fea_trials else None + + # Get best params and check bounds + best_params = get_trial_params(conn, best_trial["number"]) if best_trial else {} + near_bounds = analyze_parameter_bounds(best_params, config) if best_params else [] + + # Find iteration folder + iter_folder = None + if best_trial and best_trial.get("source") == "FEA": + iter_folder = find_best_iteration_folder(study_path, best_trial["number"], conn) + + conn.close() + + # Build report + lines = [] + lines.append("=" * 80) + lines.append(f" {study_name.upper()} - OPTIMIZATION REPORT") + lines.append("=" * 80) + lines.append("") + lines.append(f" Study Type: {study_type.replace('_', ' ').title()}") + lines.append(f" Design Variables: {len(config.get('design_variables', []))}") + lines.append(f" Objectives: {len(config.get('objectives', []))}") + lines.append("") + + # Counts + lines.append("=" * 80) + lines.append("1. STUDY SUMMARY") + lines.append("=" * 80) + lines.append("") + lines.append(f" Total trials: {counts['total']}") + lines.append(f" - Seeded (prior data): {counts['seeded']}") + lines.append(f" - New FEA evaluations: {counts['fea']}") + if failed_count: + lines.append(f" - Failed: {failed_count}") + lines.append("") + + # Best design + if best_trial: + lines.append("=" * 80) + lines.append("2. BEST DESIGN FOUND") + lines.append("=" * 80) + lines.append("") + lines.append(f" Trial #{best_trial['number']} (Source: {best_trial.get('source', 'unknown')})") + if iter_folder: + lines.append(f" Iteration folder: {iter_folder}") + lines.append("") + lines.append(" Objectives:") + lines.append(" " + "-" * 45) + + for obj in config.get("objectives", []): + name = obj["name"] + if name in best_trial: + target = obj.get("target", "N/A") + lines.append(f" {name}: {best_trial[name]:.2f} (target: {target})") + + if "weighted_sum" in best_trial: + lines.append(f" Weighted Sum: {best_trial['weighted_sum']:.2f}") + + # Parameters near bounds + if near_bounds: + lines.append("") + lines.append("=" * 80) + lines.append("3. PARAMETERS NEAR BOUNDS") + lines.append("=" * 80) + lines.append("") + lines.append(f" {'Parameter':<25} | {'Bound':>8} | {'Position':>8} | {'Value':>10}") + lines.append(" " + "-" * 60) + for nb in near_bounds: + lines.append(f" {nb['name']:<25} | {nb['bound']:>8} | {nb['position']:>7.1f}% | {nb['value']:>10.3f}") + + # Top 10 + lines.append("") + lines.append("=" * 80) + lines.append("4. TOP 10 DESIGNS") + lines.append("=" * 80) + lines.append("") + + if sort_key: + lines.append(f" {'Rank':>4} | {'Trial':>6} | {sort_key:>15} | Source") + lines.append(" " + "-" * 50) + for i, t in enumerate(valid_trials[:10], 1): + src = t.get("source", "unknown")[:12] + val = t.get(sort_key, 0) + lines.append(f" {i:>4} | {t['number']:>6} | {val:>15.2f} | {src}") + + # Statistics + if HAS_NUMPY and sort_key: + lines.append("") + lines.append("=" * 80) + lines.append("5. STATISTICS") + lines.append("=" * 80) + lines.append("") + + all_vals = [t[sort_key] for t in valid_trials if sort_key in t] + if all_vals: + lines.append(f" All trials (n={len(all_vals)}):") + lines.append(f" min={min(all_vals):.2f}, median={np.median(all_vals):.2f}, mean={np.mean(all_vals):.2f}") + + fea_vals = [t[sort_key] for t in fea_trials if sort_key in t] + if fea_vals: + lines.append(f" FEA trials (n={len(fea_vals)}):") + lines.append(f" min={min(fea_vals):.2f}, median={np.median(fea_vals):.2f}, mean={np.mean(fea_vals):.2f}") + + lines.append("") + lines.append("=" * 80) + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Analyze Atomizer optimization study") + parser.add_argument("study_name", help="Name of the study to analyze") + parser.add_argument("--export", "-e", help="Export report to file") + parser.add_argument("--json", "-j", action="store_true", help="Output as JSON") + + args = parser.parse_args() + + try: + report = generate_report(args.study_name) + + if args.export: + with open(args.export, "w") as f: + f.write(report) + print(f"Report exported to: {args.export}") + else: + print(report) + + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tools/archive_best_design.py b/tools/archive_best_design.py new file mode 100644 index 00000000..23b03f45 --- /dev/null +++ b/tools/archive_best_design.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python +""" +Atomizer Best Design Archiver + +Archives the best design iteration folder to 3_results/best_design_archive/ +with timestamp. Should be called at the end of every optimization run. + +Usage: + python tools/archive_best_design.py + python tools/archive_best_design.py m1_mirror_adaptive_V14 + + # Or call from run_optimization.py at the end: + from tools.archive_best_design import archive_best_design + archive_best_design("my_study") + +Author: Atomizer +Created: 2025-12-12 +""" + +import argparse +import json +import shutil +import sqlite3 +from datetime import datetime +from pathlib import Path +import sys + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def find_study_path(study_name: str) -> Path: + """Find study directory by name.""" + # Check if already a path + if Path(study_name).exists(): + return Path(study_name) + + # Check relative to tools dir + studies_dir = Path(__file__).parent.parent / "studies" + study_path = studies_dir / study_name + + if not study_path.exists(): + raise FileNotFoundError(f"Study not found: {study_path}") + + return study_path + + +def get_best_trial_info(study_path: Path) -> dict: + """Get the best trial information from the database.""" + db_path = study_path / "3_results" / "study.db" + if not db_path.exists(): + raise FileNotFoundError(f"Database not found: {db_path}") + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get trial with best weighted_sum + cursor.execute(""" + SELECT t.number, + tua_ws.value_json as weighted_sum, + tua_40.value_json as obj_40, + tua_60.value_json as obj_60, + tua_mfg.value_json as obj_mfg, + tua_src.value_json as source + FROM trials t + LEFT JOIN trial_user_attributes tua_ws ON t.trial_id = tua_ws.trial_id AND tua_ws.key = 'weighted_sum' + LEFT JOIN trial_user_attributes tua_40 ON t.trial_id = tua_40.trial_id AND tua_40.key = 'rel_filtered_rms_40_vs_20' + LEFT JOIN trial_user_attributes tua_60 ON t.trial_id = tua_60.trial_id AND tua_60.key = 'rel_filtered_rms_60_vs_20' + LEFT JOIN trial_user_attributes tua_mfg ON t.trial_id = tua_mfg.trial_id AND tua_mfg.key = 'mfg_90_optician_workload' + LEFT JOIN trial_user_attributes tua_src ON t.trial_id = tua_src.trial_id AND tua_src.key = 'source' + WHERE t.state = 'COMPLETE' + AND CAST(tua_ws.value_json AS REAL) < 1000 + ORDER BY CAST(tua_ws.value_json AS REAL) ASC + LIMIT 1 + """) + + result = cursor.fetchone() + if not result: + # Fallback to trial_values objective 0 + cursor.execute(""" + SELECT t.number, tv.value + FROM trials t + JOIN trial_values tv ON t.trial_id = tv.trial_id AND tv.objective = 0 + WHERE t.state = 'COMPLETE' AND tv.value < 1000 + ORDER BY tv.value ASC + LIMIT 1 + """) + result = cursor.fetchone() + if result: + conn.close() + return { + "trial_number": result[0], + "weighted_sum": result[1], + "objectives": {}, + "source": "unknown" + } + raise ValueError("No valid trials found") + + conn.close() + + return { + "trial_number": result[0], + "weighted_sum": float(result[1]) if result[1] else None, + "objectives": { + "rel_filtered_rms_40_vs_20": float(result[2]) if result[2] else None, + "rel_filtered_rms_60_vs_20": float(result[3]) if result[3] else None, + "mfg_90_optician_workload": float(result[4]) if result[4] else None, + }, + "source": json.loads(result[5]) if result[5] else "unknown" + } + + +def trial_to_iteration(study_path: Path, trial_number: int) -> int: + """Convert trial number to iteration number.""" + db_path = study_path / "3_results" / "study.db" + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + cursor.execute(""" + SELECT t.number + FROM trials t + JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id + WHERE t.state = 'COMPLETE' AND tua.key = 'source' AND tua.value_json = '"FEA"' + ORDER BY t.number + """) + fea_trials = [r[0] for r in cursor.fetchall()] + conn.close() + + if trial_number not in fea_trials: + raise ValueError(f"Trial {trial_number} is not an FEA trial (seeded data has no iteration folder)") + + return fea_trials.index(trial_number) + 1 + + +def archive_best_design(study_name: str, verbose: bool = True) -> dict: + """ + Archive the best design to 3_results/best_design_archive// + + Args: + study_name: Name of the study or path to study directory + verbose: Print progress messages + + Returns: + Dictionary with archive info + """ + study_path = find_study_path(study_name) + + # Get best trial info + best_trial = get_best_trial_info(study_path) + + if best_trial["source"] != "FEA": + if verbose: + print(f"[WARN] Best trial #{best_trial['trial_number']} is from seeded data ({best_trial['source']})") + print("[WARN] No iteration folder to archive. Skipping.") + return { + "success": False, + "reason": "Best trial is from seeded data, no iteration folder", + "trial_number": best_trial["trial_number"], + "source": best_trial["source"] + } + + # Find iteration folder + iter_num = trial_to_iteration(study_path, best_trial["trial_number"]) + iter_folder = f"iter{iter_num}" + iter_path = study_path / "2_iterations" / iter_folder + + if not iter_path.exists(): + raise FileNotFoundError(f"Iteration folder not found: {iter_path}") + + # Create archive directory with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + archive_dir = study_path / "3_results" / "best_design_archive" / timestamp + archive_dir.mkdir(parents=True, exist_ok=True) + + if verbose: + print(f"[INFO] Archiving best design...") + print(f" Trial: #{best_trial['trial_number']}") + print(f" Iteration: {iter_folder}") + print(f" Weighted Sum: {best_trial['weighted_sum']:.2f}") + print(f" Archive: {archive_dir}") + + # Copy all files from iteration folder + files_copied = 0 + for src_file in iter_path.iterdir(): + dst_file = archive_dir / src_file.name + if src_file.is_file(): + shutil.copy2(src_file, dst_file) + files_copied += 1 + elif src_file.is_dir(): + shutil.copytree(src_file, dst_file) + files_copied += 1 + + # Write metadata file + metadata = { + "study_name": study_path.name, + "trial_number": best_trial["trial_number"], + "iteration_folder": iter_folder, + "weighted_sum": best_trial["weighted_sum"], + "objectives": best_trial["objectives"], + "source": best_trial["source"], + "archived_at": datetime.now().isoformat(), + "files_copied": files_copied + } + + metadata_path = archive_dir / "_archive_info.json" + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + + if verbose: + print(f"[OK] Archived {files_copied} files to {archive_dir.name}/") + + return { + "success": True, + "archive_path": str(archive_dir), + "trial_number": best_trial["trial_number"], + "iteration_folder": iter_folder, + "weighted_sum": best_trial["weighted_sum"], + "files_copied": files_copied + } + + +def main(): + parser = argparse.ArgumentParser( + description="Archive best design iteration folder", + epilog="This should be run at the end of every optimization." + ) + parser.add_argument("study_name", help="Name of the study to archive") + parser.add_argument("--quiet", "-q", action="store_true", help="Suppress output") + parser.add_argument("--json", "-j", action="store_true", help="Output as JSON") + + args = parser.parse_args() + + try: + result = archive_best_design(args.study_name, verbose=not args.quiet) + + if args.json: + print(json.dumps(result, indent=2)) + elif result["success"]: + print(f"\n[SUCCESS] Best design archived to: {result['archive_path']}") + + except Exception as e: + print(f"[ERROR] {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tools/find_best_iteration.py b/tools/find_best_iteration.py new file mode 100644 index 00000000..30d7acf6 --- /dev/null +++ b/tools/find_best_iteration.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python +""" +Atomizer Best Iteration Finder + +Finds the iteration folder containing the best design for a study. +Useful for extracting the best model files after optimization. + +Usage: + python tools/find_best_iteration.py + python tools/find_best_iteration.py m1_mirror_adaptive_V14 + python tools/find_best_iteration.py m1_mirror_adaptive_V14 --copy-to ./best_design + +Author: Atomizer +Created: 2025-12-12 +""" + +import argparse +import json +import shutil +import sqlite3 +from pathlib import Path +import sys + + +def find_study_path(study_name: str) -> Path: + """Find study directory by name.""" + studies_dir = Path(__file__).parent.parent / "studies" + study_path = studies_dir / study_name + + if not study_path.exists(): + raise FileNotFoundError(f"Study not found: {study_path}") + + return study_path + + +def get_best_trial(study_path: Path) -> dict: + """Get the best trial from the database.""" + db_path = study_path / "3_results" / "study.db" + if not db_path.exists(): + raise FileNotFoundError(f"Database not found: {db_path}") + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get trial with best weighted_sum (or lowest objective 0 value) + cursor.execute(""" + SELECT t.number, tua_ws.value_json, tua_src.value_json + FROM trials t + LEFT JOIN trial_user_attributes tua_ws ON t.trial_id = tua_ws.trial_id AND tua_ws.key = 'weighted_sum' + LEFT JOIN trial_user_attributes tua_src ON t.trial_id = tua_src.trial_id AND tua_src.key = 'source' + WHERE t.state = 'COMPLETE' + AND CAST(tua_ws.value_json AS REAL) < 1000 + ORDER BY CAST(tua_ws.value_json AS REAL) ASC + LIMIT 1 + """) + + result = cursor.fetchone() + if not result: + # Fallback to trial_values + cursor.execute(""" + SELECT t.number, tv.value + FROM trials t + JOIN trial_values tv ON t.trial_id = tv.trial_id AND tv.objective = 0 + WHERE t.state = 'COMPLETE' + AND tv.value < 1000 + ORDER BY tv.value ASC + LIMIT 1 + """) + result = cursor.fetchone() + if result: + conn.close() + return { + "number": result[0], + "weighted_sum": result[1], + "source": "unknown" + } + raise ValueError("No valid trials found") + + conn.close() + return { + "number": result[0], + "weighted_sum": float(result[1]) if result[1] else None, + "source": json.loads(result[2]) if result[2] else "unknown" + } + + +def trial_to_iteration(study_path: Path, trial_number: int) -> int: + """Convert trial number to iteration number.""" + db_path = study_path / "3_results" / "study.db" + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get all FEA trial numbers in order + cursor.execute(""" + SELECT t.number + FROM trials t + JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id + WHERE t.state = 'COMPLETE' AND tua.key = 'source' AND tua.value_json = '"FEA"' + ORDER BY t.number + """) + fea_trials = [r[0] for r in cursor.fetchall()] + conn.close() + + if trial_number not in fea_trials: + raise ValueError(f"Trial {trial_number} is not an FEA trial (seeded data has no iteration folder)") + + return fea_trials.index(trial_number) + 1 + + +def find_best_iteration(study_name: str) -> dict: + """Find the best iteration folder for a study.""" + study_path = find_study_path(study_name) + best_trial = get_best_trial(study_path) + + result = { + "study_name": study_name, + "trial_number": best_trial["number"], + "weighted_sum": best_trial["weighted_sum"], + "source": best_trial["source"], + "iteration_folder": None, + "iteration_path": None + } + + # Only FEA trials have iteration folders + if best_trial["source"] == "FEA": + iter_num = trial_to_iteration(study_path, best_trial["number"]) + iter_folder = f"iter{iter_num}" + iter_path = study_path / "2_iterations" / iter_folder + + if iter_path.exists(): + result["iteration_folder"] = iter_folder + result["iteration_path"] = str(iter_path) + + return result + + +def copy_best_iteration(study_name: str, dest_path: str): + """Copy the best iteration folder to a destination.""" + result = find_best_iteration(study_name) + + if not result["iteration_path"]: + raise ValueError(f"Best trial ({result['trial_number']}) is from seeded data, no iteration folder") + + src = Path(result["iteration_path"]) + dst = Path(dest_path) + + if dst.exists(): + print(f"Removing existing destination: {dst}") + shutil.rmtree(dst) + + print(f"Copying {src} to {dst}") + shutil.copytree(src, dst) + + return result + + +def main(): + parser = argparse.ArgumentParser(description="Find best iteration folder for Atomizer study") + parser.add_argument("study_name", help="Name of the study") + parser.add_argument("--copy-to", "-c", help="Copy best iteration folder to this path") + parser.add_argument("--json", "-j", action="store_true", help="Output as JSON") + + args = parser.parse_args() + + try: + if args.copy_to: + result = copy_best_iteration(args.study_name, args.copy_to) + print(f"\nCopied best iteration to: {args.copy_to}") + else: + result = find_best_iteration(args.study_name) + + if args.json: + print(json.dumps(result, indent=2)) + else: + print(f"\n{'='*60}") + print(f"BEST ITERATION FOR: {result['study_name']}") + print(f"{'='*60}") + print(f" Trial Number: {result['trial_number']}") + print(f" Weighted Sum: {result['weighted_sum']:.2f}" if result['weighted_sum'] else " Weighted Sum: N/A") + print(f" Source: {result['source']}") + print(f" Iteration Folder: {result['iteration_folder'] or 'N/A (seeded data)'}") + if result['iteration_path']: + print(f" Full Path: {result['iteration_path']}") + print(f"{'='*60}") + + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main()