Add persistent trial history DB (append-only, survives --clean)

- history.db: SQLite append-only, never deleted by --clean
- history.csv: Auto-exported after each trial (live updates)
- Logs: DVs, results, feasibility, status, solve time, iter path
- Cross-study queries: full lineage across all runs/phases
- --clean only resets Optuna DB, history preserved
This commit is contained in:
2026-02-11 14:59:52 +00:00
parent 04fdae26ab
commit 815db0fb8d
2 changed files with 288 additions and 3 deletions

View File

@@ -0,0 +1,235 @@
"""Persistent trial history — append-only, survives Optuna resets.
Every trial is logged to `history.db` (SQLite) and exported to `history.csv`.
Never deleted by --clean. Full lineage across all studies and phases.
Usage:
history = TrialHistory(results_dir)
history.log_trial(study_name, trial_id, params, results, ...)
history.export_csv()
df = history.query("SELECT * FROM trials WHERE mass_kg < 100")
"""
from __future__ import annotations
import csv
import json
import logging
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger(__name__)
# Schema version — bump if columns change
SCHEMA_VERSION = 1
CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS trials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
study_name TEXT NOT NULL,
trial_id INTEGER NOT NULL,
iteration TEXT,
timestamp TEXT NOT NULL,
-- Design variables
beam_half_core_thickness REAL,
beam_face_thickness REAL,
holes_diameter REAL,
hole_count INTEGER,
-- Results
mass_kg REAL,
tip_displacement_mm REAL,
max_von_mises_mpa REAL,
-- Constraint checks
disp_feasible INTEGER, -- 0/1
stress_feasible INTEGER, -- 0/1
geo_feasible INTEGER, -- 0/1
fully_feasible INTEGER, -- 0/1
-- Meta
status TEXT DEFAULT 'COMPLETE', -- COMPLETE, FAILED, PRUNED
error_message TEXT,
solve_time_s REAL,
iter_path TEXT,
notes TEXT,
-- Unique constraint: no duplicate (study, trial) pairs
UNIQUE(study_name, trial_id)
);
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
);
"""
# Constraint thresholds (from OPTIMIZATION_STRATEGY.md)
DISP_LIMIT_MM = 10.0
STRESS_LIMIT_MPA = 130.0
# CSV column order
CSV_COLUMNS = [
"study_name", "trial_id", "iteration", "timestamp",
"beam_half_core_thickness", "beam_face_thickness",
"holes_diameter", "hole_count",
"mass_kg", "tip_displacement_mm", "max_von_mises_mpa",
"disp_feasible", "stress_feasible", "geo_feasible", "fully_feasible",
"status", "error_message", "solve_time_s", "iter_path",
]
class TrialHistory:
"""Append-only trial history database."""
def __init__(self, results_dir: Path | str):
self.results_dir = Path(results_dir)
self.results_dir.mkdir(parents=True, exist_ok=True)
self.db_path = self.results_dir / "history.db"
self.csv_path = self.results_dir / "history.csv"
self._conn = sqlite3.connect(str(self.db_path))
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL") # safe concurrent reads
self._init_schema()
count = self._conn.execute("SELECT COUNT(*) FROM trials").fetchone()[0]
logger.info("Trial history: %s (%d records)", self.db_path.name, count)
def _init_schema(self) -> None:
"""Create tables if they don't exist."""
self._conn.executescript(CREATE_TABLE)
# Check/set schema version
row = self._conn.execute(
"SELECT version FROM schema_version ORDER BY version DESC LIMIT 1"
).fetchone()
if row is None:
self._conn.execute(
"INSERT INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
self._conn.commit()
def log_trial(
self,
study_name: str,
trial_id: int,
params: dict[str, float],
mass_kg: float = float("nan"),
tip_displacement_mm: float = float("nan"),
max_von_mises_mpa: float = float("nan"),
geo_feasible: bool = True,
status: str = "COMPLETE",
error_message: str | None = None,
solve_time_s: float = 0.0,
iter_path: str | None = None,
notes: str | None = None,
iteration_number: int | None = None,
) -> None:
"""Log a single trial result.
Uses INSERT OR REPLACE so re-runs of the same trial update cleanly.
"""
import math
disp_ok = (
not math.isnan(tip_displacement_mm)
and tip_displacement_mm <= DISP_LIMIT_MM
)
stress_ok = (
not math.isnan(max_von_mises_mpa)
and max_von_mises_mpa <= STRESS_LIMIT_MPA
)
iteration = f"iter{iteration_number:03d}" if iteration_number else None
try:
self._conn.execute(
"""
INSERT OR REPLACE INTO trials (
study_name, trial_id, iteration, timestamp,
beam_half_core_thickness, beam_face_thickness,
holes_diameter, hole_count,
mass_kg, tip_displacement_mm, max_von_mises_mpa,
disp_feasible, stress_feasible, geo_feasible, fully_feasible,
status, error_message, solve_time_s, iter_path, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
study_name,
trial_id,
iteration,
datetime.now(timezone.utc).isoformat(),
params.get("beam_half_core_thickness"),
params.get("beam_face_thickness"),
params.get("holes_diameter"),
params.get("hole_count"),
mass_kg,
tip_displacement_mm,
max_von_mises_mpa,
int(disp_ok),
int(stress_ok),
int(geo_feasible),
int(disp_ok and stress_ok and geo_feasible),
status,
error_message,
solve_time_s,
iter_path,
notes,
),
)
self._conn.commit()
except sqlite3.Error as e:
logger.error("Failed to log trial %d: %s", trial_id, e)
def export_csv(self) -> Path:
"""Export all trials to CSV (overwrite). Returns path."""
rows = self._conn.execute(
f"SELECT {', '.join(CSV_COLUMNS)} FROM trials ORDER BY study_name, trial_id"
).fetchall()
with open(self.csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(CSV_COLUMNS)
for row in rows:
writer.writerow([row[col] for col in CSV_COLUMNS])
logger.info("Exported %d trials to %s", len(rows), self.csv_path.name)
return self.csv_path
def query(self, sql: str, params: tuple = ()) -> list[dict]:
"""Run an arbitrary SELECT query. Returns list of dicts."""
rows = self._conn.execute(sql, params).fetchall()
return [dict(row) for row in rows]
def get_study_summary(self, study_name: str) -> dict[str, Any]:
"""Get summary stats for a study."""
rows = self.query(
"SELECT * FROM trials WHERE study_name = ?", (study_name,)
)
if not rows:
return {"study_name": study_name, "total": 0}
complete = [r for r in rows if r["status"] == "COMPLETE"]
feasible = [r for r in complete if r["fully_feasible"]]
masses = [r["mass_kg"] for r in feasible if r["mass_kg"] is not None]
return {
"study_name": study_name,
"total": len(rows),
"complete": len(complete),
"failed": len(rows) - len(complete),
"feasible": len(feasible),
"best_mass_kg": min(masses) if masses else None,
"solve_rate": len(complete) / len(rows) * 100 if rows else 0,
"feasibility_rate": len(feasible) / len(complete) * 100 if complete else 0,
}
def close(self) -> None:
"""Export CSV and close connection."""
self.export_csv()
self._conn.close()

View File

@@ -41,6 +41,7 @@ from geometric_checks import (
FeasibilityResult,
check_feasibility,
)
from history import TrialHistory
from nx_interface import TrialInput, TrialResult, create_solver
from sampling import DV_DEFINITIONS, generate_lhs_samples, points_to_dicts
@@ -103,12 +104,16 @@ def constraints_func(trial: optuna.trial.FrozenTrial) -> list[float]:
def evaluate_trial(
trial: optuna.Trial,
solver: Any,
history: TrialHistory | None = None,
study_name: str = "",
) -> float:
"""Evaluate a single trial: geometric check → NX solve → extract.
Args:
trial: Optuna trial (with parameters already suggested/enqueued).
solver: NX solver instance (stub or real).
history: Persistent trial history logger (append-only).
study_name: Study name for history logging.
Returns:
Objective value (mass in kg). Returns INFEASIBLE_MASS for
@@ -152,6 +157,13 @@ def evaluate_trial(
trial.set_user_attr("ligament", geo_result.ligament)
trial.set_user_attr("web_clearance", geo_result.web_clearance)
params = {
"beam_half_core_thickness": dv1,
"beam_face_thickness": dv2,
"holes_diameter": dv3,
"hole_count": dv4,
}
if not geo_result.feasible:
logger.warning(
"Trial %d: GEOMETRICALLY INFEASIBLE — %s",
@@ -162,6 +174,13 @@ def evaluate_trial(
trial.set_user_attr("tip_displacement", INFEASIBLE_DISPLACEMENT)
trial.set_user_attr("max_von_mises", INFEASIBLE_STRESS)
trial.set_user_attr("mass", INFEASIBLE_MASS)
if history:
history.log_trial(
study_name=study_name, trial_id=trial_num, params=params,
geo_feasible=False, status="GEO_INFEASIBLE",
error_message=geo_result.reason,
iteration_number=trial_num + 1,
)
return INFEASIBLE_MASS
# NX evaluation
@@ -188,6 +207,14 @@ def evaluate_trial(
trial.set_user_attr("tip_displacement", INFEASIBLE_DISPLACEMENT)
trial.set_user_attr("max_von_mises", INFEASIBLE_STRESS)
trial.set_user_attr("mass", INFEASIBLE_MASS)
if history:
history.log_trial(
study_name=study_name, trial_id=trial_num, params=params,
status="FAILED", error_message=nx_result.error_message,
solve_time_s=round(t_elapsed, 2),
iter_path=nx_result.iteration_dir,
iteration_number=trial_num + 1,
)
return INFEASIBLE_MASS
# Record successful results
@@ -214,6 +241,20 @@ def evaluate_trial(
t_elapsed,
)
if history:
history.log_trial(
study_name=study_name, trial_id=trial_num, params=params,
mass_kg=nx_result.mass,
tip_displacement_mm=nx_result.tip_displacement,
max_von_mises_mpa=nx_result.max_von_mises,
geo_feasible=True,
status="COMPLETE",
solve_time_s=round(t_elapsed, 2),
iter_path=nx_result.iteration_dir,
iteration_number=trial_num + 1,
)
history.export_csv() # Live update CSV after each trial
return nx_result.mass
@@ -419,7 +460,7 @@ def run_study(args: argparse.Namespace) -> None:
storage = f"sqlite:///{db_path}"
if args.clean and db_path.exists():
logger.info("--clean flag: deleting existing DB at %s", db_path)
logger.info("--clean flag: deleting Optuna DB at %s (history.db preserved)", db_path)
db_path.unlink()
if args.resume:
@@ -449,13 +490,17 @@ def run_study(args: argparse.Namespace) -> None:
logger.info("Enqueued %d trials (1 baseline + %d LHS)", n_trials, n_trials - 1)
# -----------------------------------------------------------------------
# 3. Create solver
# 3. Create solver + history
# -----------------------------------------------------------------------
solver = create_solver(
backend=args.backend,
model_dir=args.model_dir,
)
# Persistent history — NEVER deleted by --clean
history = TrialHistory(results_dir)
study_name = args.study_name
# -----------------------------------------------------------------------
# 4. Run all trials
# -----------------------------------------------------------------------
@@ -469,7 +514,7 @@ def run_study(args: argparse.Namespace) -> None:
optuna.logging.set_verbosity(optuna.logging.WARNING)
study.optimize(
lambda trial: evaluate_trial(trial, solver),
lambda trial: evaluate_trial(trial, solver, history, study_name),
n_trials=n_trials,
callbacks=[_progress_callback],
)
@@ -497,6 +542,11 @@ def run_study(args: argparse.Namespace) -> None:
# Cleanup
solver.close()
# Final history export + summary
history.close()
hist_summary = history.get_study_summary(study_name)
logger.info("History DB: %d total records across all studies", hist_summary["total"])
def _progress_callback(study: optuna.Study, trial: optuna.trial.FrozenTrial) -> None:
"""Log progress after each trial."""