"""Persistent trial history — append-only, survives Optuna resets. Every trial is logged to `history.db` (SQLite) and exported to `history.csv`. Never deleted by --clean. Full lineage across all studies and phases. Usage: history = TrialHistory(results_dir) history.log_trial(study_name, trial_id, params, results, ...) history.export_csv() df = history.query("SELECT * FROM trials WHERE mass_kg < 100") """ from __future__ import annotations import csv import json import logging import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional logger = logging.getLogger(__name__) # Schema version — bump if columns change SCHEMA_VERSION = 1 CREATE_TABLE = """ CREATE TABLE IF NOT EXISTS trials ( id INTEGER PRIMARY KEY AUTOINCREMENT, study_name TEXT NOT NULL, trial_id INTEGER NOT NULL, iteration TEXT, timestamp TEXT NOT NULL, -- Design variables beam_half_core_thickness REAL, beam_face_thickness REAL, holes_diameter REAL, hole_count INTEGER, -- Results mass_kg REAL, tip_displacement_mm REAL, max_von_mises_mpa REAL, -- Constraint checks disp_feasible INTEGER, -- 0/1 stress_feasible INTEGER, -- 0/1 geo_feasible INTEGER, -- 0/1 fully_feasible INTEGER, -- 0/1 -- Meta status TEXT DEFAULT 'COMPLETE', -- COMPLETE, FAILED, PRUNED error_message TEXT, solve_time_s REAL, iter_path TEXT, notes TEXT, -- Unique constraint: no duplicate (study, trial) pairs UNIQUE(study_name, trial_id) ); CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY ); """ # Constraint thresholds (from OPTIMIZATION_STRATEGY.md) DISP_LIMIT_MM = 10.0 STRESS_LIMIT_MPA = 130.0 # CSV column order CSV_COLUMNS = [ "study_name", "trial_id", "iteration", "timestamp", "beam_half_core_thickness", "beam_face_thickness", "holes_diameter", "hole_count", "mass_kg", "tip_displacement_mm", "max_von_mises_mpa", "disp_feasible", "stress_feasible", "geo_feasible", "fully_feasible", "status", "error_message", "solve_time_s", "iter_path", ] class TrialHistory: """Append-only trial history database.""" def __init__(self, results_dir: Path | str): self.results_dir = Path(results_dir) self.results_dir.mkdir(parents=True, exist_ok=True) self.db_path = self.results_dir / "history.db" self.csv_path = self.results_dir / "history.csv" self._conn = sqlite3.connect(str(self.db_path)) self._conn.row_factory = sqlite3.Row self._conn.execute("PRAGMA journal_mode=WAL") # safe concurrent reads self._init_schema() count = self._conn.execute("SELECT COUNT(*) FROM trials").fetchone()[0] logger.info("Trial history: %s (%d records)", self.db_path.name, count) def _init_schema(self) -> None: """Create tables if they don't exist.""" self._conn.executescript(CREATE_TABLE) # Check/set schema version row = self._conn.execute( "SELECT version FROM schema_version ORDER BY version DESC LIMIT 1" ).fetchone() if row is None: self._conn.execute( "INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,), ) self._conn.commit() def log_trial( self, study_name: str, trial_id: int, params: dict[str, float], mass_kg: float = float("nan"), tip_displacement_mm: float = float("nan"), max_von_mises_mpa: float = float("nan"), geo_feasible: bool = True, status: str = "COMPLETE", error_message: str | None = None, solve_time_s: float = 0.0, iter_path: str | None = None, notes: str | None = None, iteration_number: int | None = None, ) -> None: """Log a single trial result. Uses INSERT OR REPLACE so re-runs of the same trial update cleanly. """ import math disp_ok = ( not math.isnan(tip_displacement_mm) and tip_displacement_mm <= DISP_LIMIT_MM ) stress_ok = ( not math.isnan(max_von_mises_mpa) and max_von_mises_mpa <= STRESS_LIMIT_MPA ) iteration = f"iter{iteration_number:03d}" if iteration_number else None try: self._conn.execute( """ INSERT OR REPLACE INTO trials ( study_name, trial_id, iteration, timestamp, beam_half_core_thickness, beam_face_thickness, holes_diameter, hole_count, mass_kg, tip_displacement_mm, max_von_mises_mpa, disp_feasible, stress_feasible, geo_feasible, fully_feasible, status, error_message, solve_time_s, iter_path, notes ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( study_name, trial_id, iteration, datetime.now(timezone.utc).isoformat(), params.get("beam_half_core_thickness"), params.get("beam_face_thickness"), params.get("holes_diameter"), params.get("hole_count"), mass_kg, tip_displacement_mm, max_von_mises_mpa, int(disp_ok), int(stress_ok), int(geo_feasible), int(disp_ok and stress_ok and geo_feasible), status, error_message, solve_time_s, iter_path, notes, ), ) self._conn.commit() except sqlite3.Error as e: logger.error("Failed to log trial %d: %s", trial_id, e) def export_csv(self) -> Path: """Export all trials to CSV (overwrite). Returns path.""" rows = self._conn.execute( f"SELECT {', '.join(CSV_COLUMNS)} FROM trials ORDER BY study_name, trial_id" ).fetchall() with open(self.csv_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(CSV_COLUMNS) for row in rows: writer.writerow([row[col] for col in CSV_COLUMNS]) logger.info("Exported %d trials to %s", len(rows), self.csv_path.name) return self.csv_path def query(self, sql: str, params: tuple = ()) -> list[dict]: """Run an arbitrary SELECT query. Returns list of dicts.""" rows = self._conn.execute(sql, params).fetchall() return [dict(row) for row in rows] def get_study_summary(self, study_name: str) -> dict[str, Any]: """Get summary stats for a study.""" rows = self.query( "SELECT * FROM trials WHERE study_name = ?", (study_name,) ) if not rows: return {"study_name": study_name, "total": 0} complete = [r for r in rows if r["status"] == "COMPLETE"] feasible = [r for r in complete if r["fully_feasible"]] masses = [r["mass_kg"] for r in feasible if r["mass_kg"] is not None] return { "study_name": study_name, "total": len(rows), "complete": len(complete), "failed": len(rows) - len(complete), "feasible": len(feasible), "best_mass_kg": min(masses) if masses else None, "solve_rate": len(complete) / len(rows) * 100 if rows else 0, "feasibility_rate": len(feasible) / len(complete) * 100 if complete else 0, } def close(self) -> None: """Export CSV and close connection.""" self.export_csv() self._conn.close()