406 lines
13 KiB
Python
406 lines
13 KiB
Python
|
|
#!/usr/bin/env python
|
||
|
|
"""
|
||
|
|
Atomizer Study Analysis Tool
|
||
|
|
|
||
|
|
Generates comprehensive optimization reports for any Atomizer study.
|
||
|
|
Detects study type (single-objective TPE, multi-objective NSGA-II) automatically.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python tools/analyze_study.py <study_name>
|
||
|
|
python tools/analyze_study.py m1_mirror_adaptive_V14
|
||
|
|
python tools/analyze_study.py m1_mirror_adaptive_V14 --export report.md
|
||
|
|
|
||
|
|
Author: Atomizer
|
||
|
|
Created: 2025-12-12
|
||
|
|
"""
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import sqlite3
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, List, Optional, Tuple
|
||
|
|
import sys
|
||
|
|
|
||
|
|
# Add parent to path for imports
|
||
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
|
|
||
|
|
try:
|
||
|
|
import numpy as np
|
||
|
|
HAS_NUMPY = True
|
||
|
|
except ImportError:
|
||
|
|
HAS_NUMPY = False
|
||
|
|
|
||
|
|
|
||
|
|
def find_study_path(study_name: str) -> Path:
|
||
|
|
"""Find study directory by name."""
|
||
|
|
studies_dir = Path(__file__).parent.parent / "studies"
|
||
|
|
study_path = studies_dir / study_name
|
||
|
|
|
||
|
|
if not study_path.exists():
|
||
|
|
raise FileNotFoundError(f"Study not found: {study_path}")
|
||
|
|
|
||
|
|
return study_path
|
||
|
|
|
||
|
|
|
||
|
|
def load_config(study_path: Path) -> Dict:
|
||
|
|
"""Load optimization config."""
|
||
|
|
config_path = study_path / "1_setup" / "optimization_config.json"
|
||
|
|
if not config_path.exists():
|
||
|
|
raise FileNotFoundError(f"Config not found: {config_path}")
|
||
|
|
|
||
|
|
with open(config_path) as f:
|
||
|
|
return json.load(f)
|
||
|
|
|
||
|
|
|
||
|
|
def get_db_connection(study_path: Path) -> sqlite3.Connection:
|
||
|
|
"""Get database connection."""
|
||
|
|
db_path = study_path / "3_results" / "study.db"
|
||
|
|
if not db_path.exists():
|
||
|
|
raise FileNotFoundError(f"Database not found: {db_path}")
|
||
|
|
|
||
|
|
return sqlite3.connect(str(db_path))
|
||
|
|
|
||
|
|
|
||
|
|
def detect_study_type(conn: sqlite3.Connection) -> str:
|
||
|
|
"""Detect if study is single or multi-objective."""
|
||
|
|
cursor = conn.cursor()
|
||
|
|
cursor.execute("SELECT DISTINCT objective FROM trial_values")
|
||
|
|
objectives = [r[0] for r in cursor.fetchall()]
|
||
|
|
|
||
|
|
if len(objectives) == 1:
|
||
|
|
return "single_objective"
|
||
|
|
else:
|
||
|
|
return "multi_objective"
|
||
|
|
|
||
|
|
|
||
|
|
def get_trial_counts(conn: sqlite3.Connection) -> Dict[str, int]:
|
||
|
|
"""Get trial counts by source."""
|
||
|
|
cursor = conn.cursor()
|
||
|
|
|
||
|
|
# Total completed
|
||
|
|
cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'")
|
||
|
|
total = cursor.fetchone()[0]
|
||
|
|
|
||
|
|
# By source
|
||
|
|
cursor.execute("""
|
||
|
|
SELECT tua.value_json, COUNT(*) as cnt
|
||
|
|
FROM trials t
|
||
|
|
JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id
|
||
|
|
WHERE t.state = 'COMPLETE' AND tua.key = 'source'
|
||
|
|
GROUP BY tua.value_json
|
||
|
|
""")
|
||
|
|
sources = {json.loads(r[0]): r[1] for r in cursor.fetchall()}
|
||
|
|
|
||
|
|
fea_count = sources.get("FEA", 0)
|
||
|
|
seeded_count = total - fea_count
|
||
|
|
|
||
|
|
return {
|
||
|
|
"total": total,
|
||
|
|
"fea": fea_count,
|
||
|
|
"seeded": seeded_count,
|
||
|
|
"sources": sources
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def get_all_trials_with_objectives(conn: sqlite3.Connection) -> List[Dict]:
|
||
|
|
"""Get all trials with their objective values from user attributes."""
|
||
|
|
cursor = conn.cursor()
|
||
|
|
|
||
|
|
# Get all user attribute keys that look like objectives
|
||
|
|
cursor.execute("SELECT DISTINCT key FROM trial_user_attributes")
|
||
|
|
all_keys = [r[0] for r in cursor.fetchall()]
|
||
|
|
|
||
|
|
# Common objective-related keys
|
||
|
|
obj_keys = [k for k in all_keys if k not in ['source', 'solve_time', 'iter_num']]
|
||
|
|
|
||
|
|
# Build query dynamically
|
||
|
|
select_parts = ["t.number", "t.trial_id"]
|
||
|
|
join_parts = []
|
||
|
|
|
||
|
|
for i, key in enumerate(obj_keys):
|
||
|
|
alias = f"tua_{i}"
|
||
|
|
select_parts.append(f"{alias}.value_json as {key}")
|
||
|
|
join_parts.append(
|
||
|
|
f"LEFT JOIN trial_user_attributes {alias} ON t.trial_id = {alias}.trial_id AND {alias}.key = '{key}'"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Add source
|
||
|
|
select_parts.append("tua_src.value_json as source")
|
||
|
|
join_parts.append(
|
||
|
|
"LEFT JOIN trial_user_attributes tua_src ON t.trial_id = tua_src.trial_id AND tua_src.key = 'source'"
|
||
|
|
)
|
||
|
|
|
||
|
|
query = f"""
|
||
|
|
SELECT {', '.join(select_parts)}
|
||
|
|
FROM trials t
|
||
|
|
{' '.join(join_parts)}
|
||
|
|
WHERE t.state = 'COMPLETE'
|
||
|
|
"""
|
||
|
|
|
||
|
|
cursor.execute(query)
|
||
|
|
rows = cursor.fetchall()
|
||
|
|
|
||
|
|
# Parse results
|
||
|
|
trials = []
|
||
|
|
for row in rows:
|
||
|
|
trial = {
|
||
|
|
"number": row[0],
|
||
|
|
"trial_id": row[1],
|
||
|
|
}
|
||
|
|
|
||
|
|
# Parse objective values
|
||
|
|
for i, key in enumerate(obj_keys):
|
||
|
|
val = row[2 + i]
|
||
|
|
if val is not None:
|
||
|
|
try:
|
||
|
|
trial[key] = float(val)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
trial[key] = json.loads(val) if val else None
|
||
|
|
|
||
|
|
# Parse source
|
||
|
|
source_val = row[-1]
|
||
|
|
trial["source"] = json.loads(source_val) if source_val else "unknown"
|
||
|
|
|
||
|
|
trials.append(trial)
|
||
|
|
|
||
|
|
return trials, obj_keys
|
||
|
|
|
||
|
|
|
||
|
|
def get_trial_params(conn: sqlite3.Connection, trial_number: int) -> Dict[str, float]:
|
||
|
|
"""Get parameters for a specific trial."""
|
||
|
|
cursor = conn.cursor()
|
||
|
|
cursor.execute("SELECT trial_id FROM trials WHERE number = ?", (trial_number,))
|
||
|
|
result = cursor.fetchone()
|
||
|
|
if not result:
|
||
|
|
return {}
|
||
|
|
|
||
|
|
trial_id = result[0]
|
||
|
|
cursor.execute(
|
||
|
|
"SELECT param_name, param_value FROM trial_params WHERE trial_id = ?",
|
||
|
|
(trial_id,)
|
||
|
|
)
|
||
|
|
return {name: float(val) for name, val in cursor.fetchall()}
|
||
|
|
|
||
|
|
|
||
|
|
def find_best_iteration_folder(study_path: Path, trial_number: int, conn: sqlite3.Connection) -> Optional[str]:
|
||
|
|
"""Map trial number to iteration folder."""
|
||
|
|
cursor = conn.cursor()
|
||
|
|
|
||
|
|
# Get all FEA trial numbers in order
|
||
|
|
cursor.execute("""
|
||
|
|
SELECT t.number
|
||
|
|
FROM trials t
|
||
|
|
JOIN trial_user_attributes tua ON t.trial_id = tua.trial_id
|
||
|
|
WHERE t.state = 'COMPLETE' AND tua.key = 'source' AND tua.value_json = '"FEA"'
|
||
|
|
ORDER BY t.number
|
||
|
|
""")
|
||
|
|
fea_trials = [r[0] for r in cursor.fetchall()]
|
||
|
|
|
||
|
|
if trial_number in fea_trials:
|
||
|
|
iter_num = fea_trials.index(trial_number) + 1
|
||
|
|
return f"iter{iter_num}"
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def analyze_parameter_bounds(params: Dict[str, float], config: Dict) -> List[Dict]:
|
||
|
|
"""Check which parameters are near bounds."""
|
||
|
|
near_bounds = []
|
||
|
|
|
||
|
|
for var in config.get("design_variables", []):
|
||
|
|
name = var["name"]
|
||
|
|
if name not in params:
|
||
|
|
continue
|
||
|
|
|
||
|
|
val = params[name]
|
||
|
|
vmin, vmax = var["min"], var["max"]
|
||
|
|
position = (val - vmin) / (vmax - vmin) * 100
|
||
|
|
|
||
|
|
if position < 10:
|
||
|
|
near_bounds.append({
|
||
|
|
"name": name,
|
||
|
|
"bound": "lower",
|
||
|
|
"position": position,
|
||
|
|
"value": val,
|
||
|
|
"min": vmin,
|
||
|
|
"max": vmax
|
||
|
|
})
|
||
|
|
elif position > 90:
|
||
|
|
near_bounds.append({
|
||
|
|
"name": name,
|
||
|
|
"bound": "upper",
|
||
|
|
"position": position,
|
||
|
|
"value": val,
|
||
|
|
"min": vmin,
|
||
|
|
"max": vmax
|
||
|
|
})
|
||
|
|
|
||
|
|
return near_bounds
|
||
|
|
|
||
|
|
|
||
|
|
def generate_report(study_name: str) -> str:
|
||
|
|
"""Generate comprehensive study report."""
|
||
|
|
study_path = find_study_path(study_name)
|
||
|
|
config = load_config(study_path)
|
||
|
|
conn = get_db_connection(study_path)
|
||
|
|
|
||
|
|
# Gather data
|
||
|
|
study_type = detect_study_type(conn)
|
||
|
|
counts = get_trial_counts(conn)
|
||
|
|
trials, obj_keys = get_all_trials_with_objectives(conn)
|
||
|
|
|
||
|
|
# Filter valid trials (exclude failed with WS > 1000)
|
||
|
|
if "weighted_sum" in obj_keys:
|
||
|
|
valid_trials = [t for t in trials if t.get("weighted_sum", 0) < 1000]
|
||
|
|
failed_count = len(trials) - len(valid_trials)
|
||
|
|
else:
|
||
|
|
valid_trials = trials
|
||
|
|
failed_count = 0
|
||
|
|
|
||
|
|
# Sort by weighted_sum if available, else by first objective
|
||
|
|
sort_key = "weighted_sum" if "weighted_sum" in obj_keys else obj_keys[0] if obj_keys else None
|
||
|
|
if sort_key:
|
||
|
|
valid_trials.sort(key=lambda x: x.get(sort_key, float('inf')))
|
||
|
|
|
||
|
|
# Separate V14 FEA trials
|
||
|
|
fea_trials = [t for t in valid_trials if t.get("source") == "FEA"]
|
||
|
|
|
||
|
|
# Get best trial
|
||
|
|
best_trial = valid_trials[0] if valid_trials else None
|
||
|
|
best_fea = fea_trials[0] if fea_trials else None
|
||
|
|
|
||
|
|
# Get best params and check bounds
|
||
|
|
best_params = get_trial_params(conn, best_trial["number"]) if best_trial else {}
|
||
|
|
near_bounds = analyze_parameter_bounds(best_params, config) if best_params else []
|
||
|
|
|
||
|
|
# Find iteration folder
|
||
|
|
iter_folder = None
|
||
|
|
if best_trial and best_trial.get("source") == "FEA":
|
||
|
|
iter_folder = find_best_iteration_folder(study_path, best_trial["number"], conn)
|
||
|
|
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
# Build report
|
||
|
|
lines = []
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append(f" {study_name.upper()} - OPTIMIZATION REPORT")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("")
|
||
|
|
lines.append(f" Study Type: {study_type.replace('_', ' ').title()}")
|
||
|
|
lines.append(f" Design Variables: {len(config.get('design_variables', []))}")
|
||
|
|
lines.append(f" Objectives: {len(config.get('objectives', []))}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
# Counts
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("1. STUDY SUMMARY")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("")
|
||
|
|
lines.append(f" Total trials: {counts['total']}")
|
||
|
|
lines.append(f" - Seeded (prior data): {counts['seeded']}")
|
||
|
|
lines.append(f" - New FEA evaluations: {counts['fea']}")
|
||
|
|
if failed_count:
|
||
|
|
lines.append(f" - Failed: {failed_count}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
# Best design
|
||
|
|
if best_trial:
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("2. BEST DESIGN FOUND")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("")
|
||
|
|
lines.append(f" Trial #{best_trial['number']} (Source: {best_trial.get('source', 'unknown')})")
|
||
|
|
if iter_folder:
|
||
|
|
lines.append(f" Iteration folder: {iter_folder}")
|
||
|
|
lines.append("")
|
||
|
|
lines.append(" Objectives:")
|
||
|
|
lines.append(" " + "-" * 45)
|
||
|
|
|
||
|
|
for obj in config.get("objectives", []):
|
||
|
|
name = obj["name"]
|
||
|
|
if name in best_trial:
|
||
|
|
target = obj.get("target", "N/A")
|
||
|
|
lines.append(f" {name}: {best_trial[name]:.2f} (target: {target})")
|
||
|
|
|
||
|
|
if "weighted_sum" in best_trial:
|
||
|
|
lines.append(f" Weighted Sum: {best_trial['weighted_sum']:.2f}")
|
||
|
|
|
||
|
|
# Parameters near bounds
|
||
|
|
if near_bounds:
|
||
|
|
lines.append("")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("3. PARAMETERS NEAR BOUNDS")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("")
|
||
|
|
lines.append(f" {'Parameter':<25} | {'Bound':>8} | {'Position':>8} | {'Value':>10}")
|
||
|
|
lines.append(" " + "-" * 60)
|
||
|
|
for nb in near_bounds:
|
||
|
|
lines.append(f" {nb['name']:<25} | {nb['bound']:>8} | {nb['position']:>7.1f}% | {nb['value']:>10.3f}")
|
||
|
|
|
||
|
|
# Top 10
|
||
|
|
lines.append("")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("4. TOP 10 DESIGNS")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
if sort_key:
|
||
|
|
lines.append(f" {'Rank':>4} | {'Trial':>6} | {sort_key:>15} | Source")
|
||
|
|
lines.append(" " + "-" * 50)
|
||
|
|
for i, t in enumerate(valid_trials[:10], 1):
|
||
|
|
src = t.get("source", "unknown")[:12]
|
||
|
|
val = t.get(sort_key, 0)
|
||
|
|
lines.append(f" {i:>4} | {t['number']:>6} | {val:>15.2f} | {src}")
|
||
|
|
|
||
|
|
# Statistics
|
||
|
|
if HAS_NUMPY and sort_key:
|
||
|
|
lines.append("")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("5. STATISTICS")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
all_vals = [t[sort_key] for t in valid_trials if sort_key in t]
|
||
|
|
if all_vals:
|
||
|
|
lines.append(f" All trials (n={len(all_vals)}):")
|
||
|
|
lines.append(f" min={min(all_vals):.2f}, median={np.median(all_vals):.2f}, mean={np.mean(all_vals):.2f}")
|
||
|
|
|
||
|
|
fea_vals = [t[sort_key] for t in fea_trials if sort_key in t]
|
||
|
|
if fea_vals:
|
||
|
|
lines.append(f" FEA trials (n={len(fea_vals)}):")
|
||
|
|
lines.append(f" min={min(fea_vals):.2f}, median={np.median(fea_vals):.2f}, mean={np.mean(fea_vals):.2f}")
|
||
|
|
|
||
|
|
lines.append("")
|
||
|
|
lines.append("=" * 80)
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(description="Analyze Atomizer optimization study")
|
||
|
|
parser.add_argument("study_name", help="Name of the study to analyze")
|
||
|
|
parser.add_argument("--export", "-e", help="Export report to file")
|
||
|
|
parser.add_argument("--json", "-j", action="store_true", help="Output as JSON")
|
||
|
|
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
try:
|
||
|
|
report = generate_report(args.study_name)
|
||
|
|
|
||
|
|
if args.export:
|
||
|
|
with open(args.export, "w") as f:
|
||
|
|
f.write(report)
|
||
|
|
print(f"Report exported to: {args.export}")
|
||
|
|
else:
|
||
|
|
print(report)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error: {e}")
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc()
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|