- Add TrialManager (trial_manager.py) for consistent trial_NNNN naming - Add DashboardDB (dashboard_db.py) for Optuna-compatible database schema - Update CLAUDE.md with trial management documentation - Update ATOMIZER_CONTEXT.md with v1.8 trial system - Update cheatsheet v2.2 with new utilities - Update SYS_14 protocol to v2.3 with TrialManager integration - Add LAC learnings for trial management patterns - Add archive/README.md for deprecated code policy Key principles: - Trial numbers NEVER reset (monotonic) - Folders NEVER get overwritten - Database always synced with filesystem - Surrogate predictions are NOT trials (only FEA results) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
575 lines
20 KiB
Python
575 lines
20 KiB
Python
"""
|
|
Dashboard Database Compatibility Module
|
|
========================================
|
|
|
|
Provides Optuna-compatible database schema for all optimization types,
|
|
ensuring dashboard compatibility regardless of optimization method
|
|
(standard Optuna, turbo/surrogate, GNN, etc.)
|
|
|
|
Usage:
|
|
from optimization_engine.utils.dashboard_db import DashboardDB
|
|
|
|
# Initialize (creates Optuna-compatible schema)
|
|
db = DashboardDB(study_dir / "3_results" / "study.db", study_name="my_study")
|
|
|
|
# Log a trial
|
|
db.log_trial(
|
|
params={"rib_thickness": 10.5, "mass": 118.0},
|
|
objectives={"wfe_40_20": 5.63, "wfe_60_20": 12.75},
|
|
weighted_sum=175.87, # optional, for single-objective ranking
|
|
is_feasible=True,
|
|
metadata={"turbo_iteration": 1, "predicted_ws": 186.77}
|
|
)
|
|
|
|
# Mark best trial
|
|
db.mark_best(trial_id=1)
|
|
|
|
# Get summary
|
|
print(db.get_summary())
|
|
|
|
Schema follows Optuna's native format for full dashboard compatibility.
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List, Union
|
|
|
|
|
|
class DashboardDB:
|
|
"""Optuna-compatible database wrapper for dashboard integration."""
|
|
|
|
SCHEMA_VERSION = 1
|
|
|
|
def __init__(self, db_path: Union[str, Path], study_name: str, direction: str = "MINIMIZE"):
|
|
"""
|
|
Initialize database with Optuna-compatible schema.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
study_name: Name of the optimization study
|
|
direction: "MINIMIZE" or "MAXIMIZE"
|
|
"""
|
|
self.db_path = Path(db_path)
|
|
self.study_name = study_name
|
|
self.direction = direction
|
|
self._init_schema()
|
|
|
|
def _init_schema(self):
|
|
"""Create Optuna-compatible database schema."""
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Core Optuna tables
|
|
|
|
# version_info - tracks schema version
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS version_info (
|
|
version_info_id INTEGER PRIMARY KEY,
|
|
schema_version INTEGER,
|
|
library_version VARCHAR(256)
|
|
)
|
|
''')
|
|
|
|
# Insert version if not exists
|
|
cursor.execute("SELECT COUNT(*) FROM version_info")
|
|
if cursor.fetchone()[0] == 0:
|
|
cursor.execute(
|
|
"INSERT INTO version_info (schema_version, library_version) VALUES (?, ?)",
|
|
(12, "atomizer-dashboard-1.0")
|
|
)
|
|
|
|
# studies - Optuna study metadata
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS studies (
|
|
study_id INTEGER PRIMARY KEY,
|
|
study_name VARCHAR(512) UNIQUE
|
|
)
|
|
''')
|
|
|
|
# Insert study if not exists
|
|
cursor.execute("SELECT study_id FROM studies WHERE study_name = ?", (self.study_name,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
self.study_id = result[0]
|
|
else:
|
|
cursor.execute("INSERT INTO studies (study_name) VALUES (?)", (self.study_name,))
|
|
self.study_id = cursor.lastrowid
|
|
|
|
# study_directions - optimization direction
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS study_directions (
|
|
study_direction_id INTEGER PRIMARY KEY,
|
|
direction VARCHAR(8) NOT NULL,
|
|
study_id INTEGER,
|
|
objective INTEGER,
|
|
FOREIGN KEY (study_id) REFERENCES studies(study_id)
|
|
)
|
|
''')
|
|
|
|
# Insert direction if not exists
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM study_directions WHERE study_id = ?",
|
|
(self.study_id,)
|
|
)
|
|
if cursor.fetchone()[0] == 0:
|
|
cursor.execute(
|
|
"INSERT INTO study_directions (direction, study_id, objective) VALUES (?, ?, ?)",
|
|
(self.direction, self.study_id, 0)
|
|
)
|
|
|
|
# trials - main trial table (Optuna schema)
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trials (
|
|
trial_id INTEGER PRIMARY KEY,
|
|
number INTEGER,
|
|
study_id INTEGER,
|
|
state VARCHAR(8) NOT NULL DEFAULT 'COMPLETE',
|
|
datetime_start DATETIME,
|
|
datetime_complete DATETIME,
|
|
FOREIGN KEY (study_id) REFERENCES studies(study_id)
|
|
)
|
|
''')
|
|
|
|
# trial_values - objective values
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trial_values (
|
|
trial_value_id INTEGER PRIMARY KEY,
|
|
trial_id INTEGER,
|
|
objective INTEGER,
|
|
value FLOAT,
|
|
value_type VARCHAR(7) DEFAULT 'FINITE',
|
|
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
|
|
)
|
|
''')
|
|
|
|
# trial_params - parameter values
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trial_params (
|
|
param_id INTEGER PRIMARY KEY,
|
|
trial_id INTEGER,
|
|
param_name VARCHAR(512),
|
|
param_value FLOAT,
|
|
distribution_json TEXT,
|
|
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
|
|
)
|
|
''')
|
|
|
|
# trial_user_attributes - custom metadata
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trial_user_attributes (
|
|
trial_user_attribute_id INTEGER PRIMARY KEY,
|
|
trial_id INTEGER,
|
|
key VARCHAR(512),
|
|
value_json TEXT,
|
|
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
|
|
)
|
|
''')
|
|
|
|
# trial_system_attributes - system metadata
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trial_system_attributes (
|
|
trial_system_attribute_id INTEGER PRIMARY KEY,
|
|
trial_id INTEGER,
|
|
key VARCHAR(512),
|
|
value_json TEXT,
|
|
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
|
|
)
|
|
''')
|
|
|
|
# study_user_attributes
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS study_user_attributes (
|
|
study_user_attribute_id INTEGER PRIMARY KEY,
|
|
study_id INTEGER,
|
|
key VARCHAR(512),
|
|
value_json TEXT,
|
|
FOREIGN KEY (study_id) REFERENCES studies(study_id)
|
|
)
|
|
''')
|
|
|
|
# study_system_attributes
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS study_system_attributes (
|
|
study_system_attribute_id INTEGER PRIMARY KEY,
|
|
study_id INTEGER,
|
|
key VARCHAR(512),
|
|
value_json TEXT,
|
|
FOREIGN KEY (study_id) REFERENCES studies(study_id)
|
|
)
|
|
''')
|
|
|
|
# trial_intermediate_values (for pruning callbacks)
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trial_intermediate_values (
|
|
trial_intermediate_value_id INTEGER PRIMARY KEY,
|
|
trial_id INTEGER,
|
|
step INTEGER,
|
|
intermediate_value FLOAT,
|
|
intermediate_value_type VARCHAR(7) DEFAULT 'FINITE',
|
|
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
|
|
)
|
|
''')
|
|
|
|
# trial_heartbeats (for distributed optimization)
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS trial_heartbeats (
|
|
trial_heartbeat_id INTEGER PRIMARY KEY,
|
|
trial_id INTEGER,
|
|
heartbeat DATETIME,
|
|
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
|
|
)
|
|
''')
|
|
|
|
# alembic_version (Optuna uses alembic for migrations)
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS alembic_version (
|
|
version_num VARCHAR(32) PRIMARY KEY
|
|
)
|
|
''')
|
|
cursor.execute("INSERT OR IGNORE INTO alembic_version VALUES ('v3.0.0')")
|
|
|
|
# Create indexes for performance
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trials_study_id ON trials(study_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trials_state ON trials(state)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trial_values_trial_id ON trial_values(trial_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trial_params_trial_id ON trial_params(trial_id)")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def log_trial(
|
|
self,
|
|
params: Dict[str, float],
|
|
objectives: Dict[str, float],
|
|
weighted_sum: Optional[float] = None,
|
|
is_feasible: bool = True,
|
|
state: str = "COMPLETE",
|
|
datetime_start: Optional[str] = None,
|
|
datetime_complete: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> int:
|
|
"""
|
|
Log a trial to the database.
|
|
|
|
Args:
|
|
params: Parameter name -> value mapping
|
|
objectives: Objective name -> value mapping
|
|
weighted_sum: Optional weighted sum for single-objective ranking
|
|
is_feasible: Whether trial meets constraints
|
|
state: Trial state ("COMPLETE", "PRUNED", "FAIL", "RUNNING")
|
|
datetime_start: ISO format timestamp
|
|
datetime_complete: ISO format timestamp
|
|
metadata: Additional metadata (turbo_iteration, predicted values, etc.)
|
|
|
|
Returns:
|
|
trial_id of inserted trial
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Get next trial number
|
|
cursor.execute(
|
|
"SELECT COALESCE(MAX(number), -1) + 1 FROM trials WHERE study_id = ?",
|
|
(self.study_id,)
|
|
)
|
|
trial_number = cursor.fetchone()[0]
|
|
|
|
# Default timestamps
|
|
now = datetime.now().isoformat()
|
|
dt_start = datetime_start or now
|
|
dt_complete = datetime_complete or now
|
|
|
|
# Insert trial
|
|
cursor.execute('''
|
|
INSERT INTO trials (number, study_id, state, datetime_start, datetime_complete)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
''', (trial_number, self.study_id, state, dt_start, dt_complete))
|
|
trial_id = cursor.lastrowid
|
|
|
|
# Insert objective values
|
|
# Use weighted_sum as primary objective if provided, else first objective value
|
|
primary_value = weighted_sum if weighted_sum is not None else list(objectives.values())[0]
|
|
cursor.execute('''
|
|
INSERT INTO trial_values (trial_id, objective, value, value_type)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (trial_id, 0, primary_value, 'FINITE'))
|
|
|
|
# Insert all objectives as user attributes
|
|
for obj_name, obj_value in objectives.items():
|
|
cursor.execute('''
|
|
INSERT INTO trial_user_attributes (trial_id, key, value_json)
|
|
VALUES (?, ?, ?)
|
|
''', (trial_id, f"obj_{obj_name}", json.dumps(obj_value)))
|
|
|
|
# Insert parameters
|
|
for param_name, param_value in params.items():
|
|
cursor.execute('''
|
|
INSERT INTO trial_params (trial_id, param_name, param_value, distribution_json)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (trial_id, param_name, param_value, '{}'))
|
|
|
|
# Insert feasibility as user attribute
|
|
cursor.execute('''
|
|
INSERT INTO trial_user_attributes (trial_id, key, value_json)
|
|
VALUES (?, ?, ?)
|
|
''', (trial_id, 'is_feasible', json.dumps(is_feasible)))
|
|
|
|
# Insert metadata
|
|
if metadata:
|
|
for key, value in metadata.items():
|
|
cursor.execute('''
|
|
INSERT INTO trial_user_attributes (trial_id, key, value_json)
|
|
VALUES (?, ?, ?)
|
|
''', (trial_id, key, json.dumps(value)))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return trial_id
|
|
|
|
def mark_best(self, trial_id: int):
|
|
"""Mark a trial as the best (adds user attribute)."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Remove previous best markers
|
|
cursor.execute('''
|
|
DELETE FROM trial_user_attributes
|
|
WHERE key = 'is_best' AND trial_id IN (
|
|
SELECT trial_id FROM trials WHERE study_id = ?
|
|
)
|
|
''', (self.study_id,))
|
|
|
|
# Mark new best
|
|
cursor.execute('''
|
|
INSERT INTO trial_user_attributes (trial_id, key, value_json)
|
|
VALUES (?, 'is_best', 'true')
|
|
''', (trial_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_trial_count(self, state: str = "COMPLETE") -> int:
|
|
"""Get count of trials in given state."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = ?",
|
|
(self.study_id, state)
|
|
)
|
|
count = cursor.fetchone()[0]
|
|
conn.close()
|
|
return count
|
|
|
|
def get_best_trial(self) -> Optional[Dict[str, Any]]:
|
|
"""Get best trial (lowest objective value)."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT t.trial_id, t.number, tv.value
|
|
FROM trials t
|
|
JOIN trial_values tv ON t.trial_id = tv.trial_id
|
|
WHERE t.study_id = ? AND t.state = 'COMPLETE'
|
|
ORDER BY tv.value ASC
|
|
LIMIT 1
|
|
''', (self.study_id,))
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result:
|
|
return {
|
|
'trial_id': result[0],
|
|
'number': result[1],
|
|
'value': result[2]
|
|
}
|
|
return None
|
|
|
|
def get_summary(self) -> Dict[str, Any]:
|
|
"""Get database summary for logging."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = 'COMPLETE'",
|
|
(self.study_id,)
|
|
)
|
|
complete = cursor.fetchone()[0]
|
|
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = 'PRUNED'",
|
|
(self.study_id,)
|
|
)
|
|
pruned = cursor.fetchone()[0]
|
|
|
|
best = self.get_best_trial()
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
'study_name': self.study_name,
|
|
'complete_trials': complete,
|
|
'pruned_trials': pruned,
|
|
'best_value': best['value'] if best else None,
|
|
'best_trial': best['number'] if best else None,
|
|
}
|
|
|
|
def clear(self):
|
|
"""Clear all trials (for re-running)."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM trial_user_attributes WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
|
|
cursor.execute("DELETE FROM trial_system_attributes WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
|
|
cursor.execute("DELETE FROM trial_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
|
|
cursor.execute("DELETE FROM trial_params WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
|
|
cursor.execute("DELETE FROM trial_intermediate_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
|
|
cursor.execute("DELETE FROM trial_heartbeats WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
|
|
cursor.execute("DELETE FROM trials WHERE study_id = ?", (self.study_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def convert_custom_to_optuna(
|
|
db_path: Union[str, Path],
|
|
study_name: str,
|
|
custom_table: str = "trials",
|
|
param_columns: Optional[List[str]] = None,
|
|
objective_column: str = "weighted_sum",
|
|
status_column: str = "status",
|
|
datetime_column: str = "datetime_complete",
|
|
) -> int:
|
|
"""
|
|
Convert a custom database schema to Optuna-compatible format.
|
|
|
|
Args:
|
|
db_path: Path to database
|
|
study_name: Name for the study
|
|
custom_table: Name of custom trials table to convert
|
|
param_columns: List of parameter column names (auto-detect if None)
|
|
objective_column: Column containing objective value
|
|
status_column: Column containing trial status
|
|
datetime_column: Column containing timestamp
|
|
|
|
Returns:
|
|
Number of trials converted
|
|
"""
|
|
db_path = Path(db_path)
|
|
backup_path = db_path.with_suffix('.db.bak')
|
|
|
|
# Backup original
|
|
import shutil
|
|
shutil.copy(db_path, backup_path)
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Check if custom table exists
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
|
(custom_table,)
|
|
)
|
|
if not cursor.fetchone():
|
|
conn.close()
|
|
raise ValueError(f"Table '{custom_table}' not found")
|
|
|
|
# Get column info
|
|
cursor.execute(f"PRAGMA table_info({custom_table})")
|
|
columns = {row[1]: row[2] for row in cursor.fetchall()}
|
|
|
|
# Read all custom trials
|
|
cursor.execute(f"SELECT * FROM {custom_table}")
|
|
custom_trials = cursor.fetchall()
|
|
|
|
# Get column names
|
|
cursor.execute(f"PRAGMA table_info({custom_table})")
|
|
col_names = [row[1] for row in cursor.fetchall()]
|
|
|
|
# Drop ALL existing tables to start fresh
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
existing_tables = [row[0] for row in cursor.fetchall()]
|
|
for table in existing_tables:
|
|
if table != 'sqlite_sequence': # Don't drop internal SQLite table
|
|
cursor.execute(f"DROP TABLE IF EXISTS {table}")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Now create proper Optuna schema from scratch
|
|
db = DashboardDB(db_path, study_name)
|
|
|
|
converted = 0
|
|
for row in custom_trials:
|
|
trial_data = dict(zip(col_names, row))
|
|
|
|
# Extract params from JSON if available
|
|
params = {}
|
|
if 'params_json' in trial_data and trial_data['params_json']:
|
|
try:
|
|
params = json.loads(trial_data['params_json'])
|
|
except:
|
|
pass
|
|
|
|
# Extract objectives from JSON if available
|
|
objectives = {}
|
|
if 'objectives_json' in trial_data and trial_data['objectives_json']:
|
|
try:
|
|
objectives = json.loads(trial_data['objectives_json'])
|
|
except:
|
|
pass
|
|
|
|
# Get weighted sum
|
|
weighted_sum = trial_data.get(objective_column)
|
|
|
|
# Map status to state
|
|
status = trial_data.get(status_column, 'COMPLETE')
|
|
state = 'COMPLETE' if status.upper() in ('COMPLETE', 'COMPLETED') else status.upper()
|
|
|
|
# Get feasibility
|
|
is_feasible = bool(trial_data.get('is_feasible', 1))
|
|
|
|
# Build metadata
|
|
metadata = {}
|
|
for key in ['turbo_iteration', 'predicted_ws', 'prediction_error', 'solve_time']:
|
|
if key in trial_data and trial_data[key] is not None:
|
|
metadata[key] = trial_data[key]
|
|
|
|
# Log trial
|
|
db.log_trial(
|
|
params=params,
|
|
objectives=objectives,
|
|
weighted_sum=weighted_sum,
|
|
is_feasible=is_feasible,
|
|
state=state,
|
|
datetime_start=trial_data.get('datetime_start'),
|
|
datetime_complete=trial_data.get(datetime_column),
|
|
metadata=metadata,
|
|
)
|
|
converted += 1
|
|
|
|
return converted
|
|
|
|
|
|
# Convenience function for turbo optimization
|
|
def init_turbo_database(study_dir: Path, study_name: str) -> DashboardDB:
|
|
"""
|
|
Initialize a dashboard-compatible database for turbo optimization.
|
|
|
|
Args:
|
|
study_dir: Study directory (contains 3_results/)
|
|
study_name: Name of the study
|
|
|
|
Returns:
|
|
DashboardDB instance ready for logging
|
|
"""
|
|
results_dir = study_dir / "3_results"
|
|
results_dir.mkdir(parents=True, exist_ok=True)
|
|
db_path = results_dir / "study.db"
|
|
|
|
return DashboardDB(db_path, study_name)
|