#!/usr/bin/env python """ Atomizer Study Analysis Tool Generates comprehensive optimization reports for any Atomizer study. Detects study type (single-objective TPE, multi-objective NSGA-II) automatically. Usage: python tools/analyze_study.py python tools/analyze_study.py m1_mirror_adaptive_V14 python tools/analyze_study.py m1_mirror_adaptive_V14 --export report.md Author: Atomizer Created: 2025-12-12 """ import argparse import json import sqlite3 from pathlib import Path from typing import Dict, List, Optional, Tuple import sys # Add parent to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) try: import numpy as np HAS_NUMPY = True except ImportError: HAS_NUMPY = False def find_study_path(study_name: str) -> Path: """Find study directory by name.""" studies_dir = Path(__file__).parent.parent / "studies" study_path = studies_dir / study_name if not study_path.exists(): raise FileNotFoundError(f"Study not found: {study_path}") return study_path def load_config(study_path: Path) -> Dict: """Load optimization config.""" config_path = study_path / "1_setup" / "optimization_config.json" if not config_path.exists(): raise FileNotFoundError(f"Config not found: {config_path}") with open(config_path) as f: return json.load(f) def get_db_connection(study_path: Path) -> sqlite3.Connection: """Get database connection.""" db_path = study_path / "3_results" / "study.db" if not db_path.exists(): raise FileNotFoundError(f"Database not found: {db_path}") return sqlite3.connect(str(db_path)) def detect_study_type(conn: sqlite3.Connection) -> str: """Detect if study is single or multi-objective.""" cursor = conn.cursor() cursor.execute("SELECT DISTINCT objective FROM trial_values") objectives = [r[0] for r in cursor.fetchall()] if len(objectives) == 1: return "single_objective" else: return "multi_objective" def get_trial_counts(conn: sqlite3.Connection) -> Dict[str, int]: """Get trial counts by source.""" cursor = conn.cursor() # Total completed cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'") total = cursor.fetchone()[0] # By source cursor.execute(""" SELECT tua.value_json, COUNT(*) as cnt FROM trials t JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id WHERE t.state = 'COMPLETE' AND tua.key = 'source' GROUP BY tua.value_json """) sources = {json.loads(r[0]): r[1] for r in cursor.fetchall()} fea_count = sources.get("FEA", 0) seeded_count = total - fea_count return { "total": total, "fea": fea_count, "seeded": seeded_count, "sources": sources } def get_all_trials_with_objectives(conn: sqlite3.Connection) -> List[Dict]: """Get all trials with their objective values from user attributes.""" cursor = conn.cursor() # Get all user attribute keys that look like objectives cursor.execute("SELECT DISTINCT key FROM trial_user_attributes") all_keys = [r[0] for r in cursor.fetchall()] # Common objective-related keys obj_keys = [k for k in all_keys if k not in ['source', 'solve_time', 'iter_num']] # Build query dynamically select_parts = ["t.number", "t.trial_id"] join_parts = [] for i, key in enumerate(obj_keys): alias = f"tua_{i}" select_parts.append(f"{alias}.value_json as {key}") join_parts.append( f"LEFT JOIN trial_user_attributes {alias} ON t.trial_id = {alias}.trial_id AND {alias}.key = '{key}'" ) # Add source select_parts.append("tua_src.value_json as source") join_parts.append( "LEFT JOIN trial_user_attributes tua_src ON t.trial_id = tua_src.trial_id AND tua_src.key = 'source'" ) query = f""" SELECT {', '.join(select_parts)} FROM trials t {' '.join(join_parts)} WHERE t.state = 'COMPLETE' """ cursor.execute(query) rows = cursor.fetchall() # Parse results trials = [] for row in rows: trial = { "number": row[0], "trial_id": row[1], } # Parse objective values for i, key in enumerate(obj_keys): val = row[2 + i] if val is not None: try: trial[key] = float(val) except (ValueError, TypeError): trial[key] = json.loads(val) if val else None # Parse source source_val = row[-1] trial["source"] = json.loads(source_val) if source_val else "unknown" trials.append(trial) return trials, obj_keys def get_trial_params(conn: sqlite3.Connection, trial_number: int) -> Dict[str, float]: """Get parameters for a specific trial.""" cursor = conn.cursor() cursor.execute("SELECT trial_id FROM trials WHERE number = ?", (trial_number,)) result = cursor.fetchone() if not result: return {} trial_id = result[0] cursor.execute( "SELECT param_name, param_value FROM trial_params WHERE trial_id = ?", (trial_id,) ) return {name: float(val) for name, val in cursor.fetchall()} def find_best_iteration_folder(study_path: Path, trial_number: int, conn: sqlite3.Connection) -> Optional[str]: """Map trial number to iteration folder.""" cursor = conn.cursor() # Get all FEA trial numbers in order cursor.execute(""" SELECT t.number FROM trials t JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id WHERE t.state = 'COMPLETE' AND tua.key = 'source' AND tua.value_json = '"FEA"' ORDER BY t.number """) fea_trials = [r[0] for r in cursor.fetchall()] if trial_number in fea_trials: iter_num = fea_trials.index(trial_number) + 1 return f"iter{iter_num}" return None def analyze_parameter_bounds(params: Dict[str, float], config: Dict) -> List[Dict]: """Check which parameters are near bounds.""" near_bounds = [] for var in config.get("design_variables", []): name = var["name"] if name not in params: continue val = params[name] vmin, vmax = var["min"], var["max"] position = (val - vmin) / (vmax - vmin) * 100 if position < 10: near_bounds.append({ "name": name, "bound": "lower", "position": position, "value": val, "min": vmin, "max": vmax }) elif position > 90: near_bounds.append({ "name": name, "bound": "upper", "position": position, "value": val, "min": vmin, "max": vmax }) return near_bounds def generate_report(study_name: str) -> str: """Generate comprehensive study report.""" study_path = find_study_path(study_name) config = load_config(study_path) conn = get_db_connection(study_path) # Gather data study_type = detect_study_type(conn) counts = get_trial_counts(conn) trials, obj_keys = get_all_trials_with_objectives(conn) # Filter valid trials (exclude failed with WS > 1000) if "weighted_sum" in obj_keys: valid_trials = [t for t in trials if t.get("weighted_sum", 0) < 1000] failed_count = len(trials) - len(valid_trials) else: valid_trials = trials failed_count = 0 # Sort by weighted_sum if available, else by first objective sort_key = "weighted_sum" if "weighted_sum" in obj_keys else obj_keys[0] if obj_keys else None if sort_key: valid_trials.sort(key=lambda x: x.get(sort_key, float('inf'))) # Separate V14 FEA trials fea_trials = [t for t in valid_trials if t.get("source") == "FEA"] # Get best trial best_trial = valid_trials[0] if valid_trials else None best_fea = fea_trials[0] if fea_trials else None # Get best params and check bounds best_params = get_trial_params(conn, best_trial["number"]) if best_trial else {} near_bounds = analyze_parameter_bounds(best_params, config) if best_params else [] # Find iteration folder iter_folder = None if best_trial and best_trial.get("source") == "FEA": iter_folder = find_best_iteration_folder(study_path, best_trial["number"], conn) conn.close() # Build report lines = [] lines.append("=" * 80) lines.append(f" {study_name.upper()} - OPTIMIZATION REPORT") lines.append("=" * 80) lines.append("") lines.append(f" Study Type: {study_type.replace('_', ' ').title()}") lines.append(f" Design Variables: {len(config.get('design_variables', []))}") lines.append(f" Objectives: {len(config.get('objectives', []))}") lines.append("") # Counts lines.append("=" * 80) lines.append("1. STUDY SUMMARY") lines.append("=" * 80) lines.append("") lines.append(f" Total trials: {counts['total']}") lines.append(f" - Seeded (prior data): {counts['seeded']}") lines.append(f" - New FEA evaluations: {counts['fea']}") if failed_count: lines.append(f" - Failed: {failed_count}") lines.append("") # Best design if best_trial: lines.append("=" * 80) lines.append("2. BEST DESIGN FOUND") lines.append("=" * 80) lines.append("") lines.append(f" Trial #{best_trial['number']} (Source: {best_trial.get('source', 'unknown')})") if iter_folder: lines.append(f" Iteration folder: {iter_folder}") lines.append("") lines.append(" Objectives:") lines.append(" " + "-" * 45) for obj in config.get("objectives", []): name = obj["name"] if name in best_trial: target = obj.get("target", "N/A") lines.append(f" {name}: {best_trial[name]:.2f} (target: {target})") if "weighted_sum" in best_trial: lines.append(f" Weighted Sum: {best_trial['weighted_sum']:.2f}") # Parameters near bounds if near_bounds: lines.append("") lines.append("=" * 80) lines.append("3. PARAMETERS NEAR BOUNDS") lines.append("=" * 80) lines.append("") lines.append(f" {'Parameter':<25} | {'Bound':>8} | {'Position':>8} | {'Value':>10}") lines.append(" " + "-" * 60) for nb in near_bounds: lines.append(f" {nb['name']:<25} | {nb['bound']:>8} | {nb['position']:>7.1f}% | {nb['value']:>10.3f}") # Top 10 lines.append("") lines.append("=" * 80) lines.append("4. TOP 10 DESIGNS") lines.append("=" * 80) lines.append("") if sort_key: lines.append(f" {'Rank':>4} | {'Trial':>6} | {sort_key:>15} | Source") lines.append(" " + "-" * 50) for i, t in enumerate(valid_trials[:10], 1): src = t.get("source", "unknown")[:12] val = t.get(sort_key, 0) lines.append(f" {i:>4} | {t['number']:>6} | {val:>15.2f} | {src}") # Statistics if HAS_NUMPY and sort_key: lines.append("") lines.append("=" * 80) lines.append("5. STATISTICS") lines.append("=" * 80) lines.append("") all_vals = [t[sort_key] for t in valid_trials if sort_key in t] if all_vals: lines.append(f" All trials (n={len(all_vals)}):") lines.append(f" min={min(all_vals):.2f}, median={np.median(all_vals):.2f}, mean={np.mean(all_vals):.2f}") fea_vals = [t[sort_key] for t in fea_trials if sort_key in t] if fea_vals: lines.append(f" FEA trials (n={len(fea_vals)}):") lines.append(f" min={min(fea_vals):.2f}, median={np.median(fea_vals):.2f}, mean={np.mean(fea_vals):.2f}") lines.append("") lines.append("=" * 80) return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Analyze Atomizer optimization study") parser.add_argument("study_name", help="Name of the study to analyze") parser.add_argument("--export", "-e", help="Export report to file") parser.add_argument("--json", "-j", action="store_true", help="Output as JSON") args = parser.parse_args() try: report = generate_report(args.study_name) if args.export: with open(args.export, "w") as f: f.write(report) print(f"Report exported to: {args.export}") else: print(report) except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()