Files
Atomizer/optimization_engine/utils/dashboard_db.py

575 lines
20 KiB
Python
Raw Normal View History

"""
Dashboard Database Compatibility Module
========================================
Provides Optuna-compatible database schema for all optimization types,
ensuring dashboard compatibility regardless of optimization method
(standard Optuna, turbo/surrogate, GNN, etc.)
Usage:
from optimization_engine.utils.dashboard_db import DashboardDB
# Initialize (creates Optuna-compatible schema)
db = DashboardDB(study_dir / "3_results" / "study.db", study_name="my_study")
# Log a trial
db.log_trial(
params={"rib_thickness": 10.5, "mass": 118.0},
objectives={"wfe_40_20": 5.63, "wfe_60_20": 12.75},
weighted_sum=175.87, # optional, for single-objective ranking
is_feasible=True,
metadata={"turbo_iteration": 1, "predicted_ws": 186.77}
)
# Mark best trial
db.mark_best(trial_id=1)
# Get summary
print(db.get_summary())
Schema follows Optuna's native format for full dashboard compatibility.
"""
import sqlite3
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional, List, Union
class DashboardDB:
"""Optuna-compatible database wrapper for dashboard integration."""
SCHEMA_VERSION = 1
def __init__(self, db_path: Union[str, Path], study_name: str, direction: str = "MINIMIZE"):
"""
Initialize database with Optuna-compatible schema.
Args:
db_path: Path to SQLite database file
study_name: Name of the optimization study
direction: "MINIMIZE" or "MAXIMIZE"
"""
self.db_path = Path(db_path)
self.study_name = study_name
self.direction = direction
self._init_schema()
def _init_schema(self):
"""Create Optuna-compatible database schema."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Core Optuna tables
# version_info - tracks schema version
cursor.execute('''
CREATE TABLE IF NOT EXISTS version_info (
version_info_id INTEGER PRIMARY KEY,
schema_version INTEGER,
library_version VARCHAR(256)
)
''')
# Insert version if not exists
cursor.execute("SELECT COUNT(*) FROM version_info")
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO version_info (schema_version, library_version) VALUES (?, ?)",
(12, "atomizer-dashboard-1.0")
)
# studies - Optuna study metadata
cursor.execute('''
CREATE TABLE IF NOT EXISTS studies (
study_id INTEGER PRIMARY KEY,
study_name VARCHAR(512) UNIQUE
)
''')
# Insert study if not exists
cursor.execute("SELECT study_id FROM studies WHERE study_name = ?", (self.study_name,))
result = cursor.fetchone()
if result:
self.study_id = result[0]
else:
cursor.execute("INSERT INTO studies (study_name) VALUES (?)", (self.study_name,))
self.study_id = cursor.lastrowid
# study_directions - optimization direction
cursor.execute('''
CREATE TABLE IF NOT EXISTS study_directions (
study_direction_id INTEGER PRIMARY KEY,
direction VARCHAR(8) NOT NULL,
study_id INTEGER,
objective INTEGER,
FOREIGN KEY (study_id) REFERENCES studies(study_id)
)
''')
# Insert direction if not exists
cursor.execute(
"SELECT COUNT(*) FROM study_directions WHERE study_id = ?",
(self.study_id,)
)
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO study_directions (direction, study_id, objective) VALUES (?, ?, ?)",
(self.direction, self.study_id, 0)
)
# trials - main trial table (Optuna schema)
cursor.execute('''
CREATE TABLE IF NOT EXISTS trials (
trial_id INTEGER PRIMARY KEY,
number INTEGER,
study_id INTEGER,
state VARCHAR(8) NOT NULL DEFAULT 'COMPLETE',
datetime_start DATETIME,
datetime_complete DATETIME,
FOREIGN KEY (study_id) REFERENCES studies(study_id)
)
''')
# trial_values - objective values
cursor.execute('''
CREATE TABLE IF NOT EXISTS trial_values (
trial_value_id INTEGER PRIMARY KEY,
trial_id INTEGER,
objective INTEGER,
value FLOAT,
value_type VARCHAR(7) DEFAULT 'FINITE',
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
)
''')
# trial_params - parameter values
cursor.execute('''
CREATE TABLE IF NOT EXISTS trial_params (
param_id INTEGER PRIMARY KEY,
trial_id INTEGER,
param_name VARCHAR(512),
param_value FLOAT,
distribution_json TEXT,
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
)
''')
# trial_user_attributes - custom metadata
cursor.execute('''
CREATE TABLE IF NOT EXISTS trial_user_attributes (
trial_user_attribute_id INTEGER PRIMARY KEY,
trial_id INTEGER,
key VARCHAR(512),
value_json TEXT,
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
)
''')
# trial_system_attributes - system metadata
cursor.execute('''
CREATE TABLE IF NOT EXISTS trial_system_attributes (
trial_system_attribute_id INTEGER PRIMARY KEY,
trial_id INTEGER,
key VARCHAR(512),
value_json TEXT,
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
)
''')
# study_user_attributes
cursor.execute('''
CREATE TABLE IF NOT EXISTS study_user_attributes (
study_user_attribute_id INTEGER PRIMARY KEY,
study_id INTEGER,
key VARCHAR(512),
value_json TEXT,
FOREIGN KEY (study_id) REFERENCES studies(study_id)
)
''')
# study_system_attributes
cursor.execute('''
CREATE TABLE IF NOT EXISTS study_system_attributes (
study_system_attribute_id INTEGER PRIMARY KEY,
study_id INTEGER,
key VARCHAR(512),
value_json TEXT,
FOREIGN KEY (study_id) REFERENCES studies(study_id)
)
''')
# trial_intermediate_values (for pruning callbacks)
cursor.execute('''
CREATE TABLE IF NOT EXISTS trial_intermediate_values (
trial_intermediate_value_id INTEGER PRIMARY KEY,
trial_id INTEGER,
step INTEGER,
intermediate_value FLOAT,
intermediate_value_type VARCHAR(7) DEFAULT 'FINITE',
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
)
''')
# trial_heartbeats (for distributed optimization)
cursor.execute('''
CREATE TABLE IF NOT EXISTS trial_heartbeats (
trial_heartbeat_id INTEGER PRIMARY KEY,
trial_id INTEGER,
heartbeat DATETIME,
FOREIGN KEY (trial_id) REFERENCES trials(trial_id)
)
''')
# alembic_version (Optuna uses alembic for migrations)
cursor.execute('''
CREATE TABLE IF NOT EXISTS alembic_version (
version_num VARCHAR(32) PRIMARY KEY
)
''')
cursor.execute("INSERT OR IGNORE INTO alembic_version VALUES ('v3.0.0')")
# Create indexes for performance
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trials_study_id ON trials(study_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trials_state ON trials(state)")
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trial_values_trial_id ON trial_values(trial_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS ix_trial_params_trial_id ON trial_params(trial_id)")
conn.commit()
conn.close()
def log_trial(
self,
params: Dict[str, float],
objectives: Dict[str, float],
weighted_sum: Optional[float] = None,
is_feasible: bool = True,
state: str = "COMPLETE",
datetime_start: Optional[str] = None,
datetime_complete: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> int:
"""
Log a trial to the database.
Args:
params: Parameter name -> value mapping
objectives: Objective name -> value mapping
weighted_sum: Optional weighted sum for single-objective ranking
is_feasible: Whether trial meets constraints
state: Trial state ("COMPLETE", "PRUNED", "FAIL", "RUNNING")
datetime_start: ISO format timestamp
datetime_complete: ISO format timestamp
metadata: Additional metadata (turbo_iteration, predicted values, etc.)
Returns:
trial_id of inserted trial
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get next trial number
cursor.execute(
"SELECT COALESCE(MAX(number), -1) + 1 FROM trials WHERE study_id = ?",
(self.study_id,)
)
trial_number = cursor.fetchone()[0]
# Default timestamps
now = datetime.now().isoformat()
dt_start = datetime_start or now
dt_complete = datetime_complete or now
# Insert trial
cursor.execute('''
INSERT INTO trials (number, study_id, state, datetime_start, datetime_complete)
VALUES (?, ?, ?, ?, ?)
''', (trial_number, self.study_id, state, dt_start, dt_complete))
trial_id = cursor.lastrowid
# Insert objective values
# Use weighted_sum as primary objective if provided, else first objective value
primary_value = weighted_sum if weighted_sum is not None else list(objectives.values())[0]
cursor.execute('''
INSERT INTO trial_values (trial_id, objective, value, value_type)
VALUES (?, ?, ?, ?)
''', (trial_id, 0, primary_value, 'FINITE'))
# Insert all objectives as user attributes
for obj_name, obj_value in objectives.items():
cursor.execute('''
INSERT INTO trial_user_attributes (trial_id, key, value_json)
VALUES (?, ?, ?)
''', (trial_id, f"obj_{obj_name}", json.dumps(obj_value)))
# Insert parameters
for param_name, param_value in params.items():
cursor.execute('''
INSERT INTO trial_params (trial_id, param_name, param_value, distribution_json)
VALUES (?, ?, ?, ?)
''', (trial_id, param_name, param_value, '{}'))
# Insert feasibility as user attribute
cursor.execute('''
INSERT INTO trial_user_attributes (trial_id, key, value_json)
VALUES (?, ?, ?)
''', (trial_id, 'is_feasible', json.dumps(is_feasible)))
# Insert metadata
if metadata:
for key, value in metadata.items():
cursor.execute('''
INSERT INTO trial_user_attributes (trial_id, key, value_json)
VALUES (?, ?, ?)
''', (trial_id, key, json.dumps(value)))
conn.commit()
conn.close()
return trial_id
def mark_best(self, trial_id: int):
"""Mark a trial as the best (adds user attribute)."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Remove previous best markers
cursor.execute('''
DELETE FROM trial_user_attributes
WHERE key = 'is_best' AND trial_id IN (
SELECT trial_id FROM trials WHERE study_id = ?
)
''', (self.study_id,))
# Mark new best
cursor.execute('''
INSERT INTO trial_user_attributes (trial_id, key, value_json)
VALUES (?, 'is_best', 'true')
''', (trial_id,))
conn.commit()
conn.close()
def get_trial_count(self, state: str = "COMPLETE") -> int:
"""Get count of trials in given state."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = ?",
(self.study_id, state)
)
count = cursor.fetchone()[0]
conn.close()
return count
def get_best_trial(self) -> Optional[Dict[str, Any]]:
"""Get best trial (lowest objective value)."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT t.trial_id, t.number, tv.value
FROM trials t
JOIN trial_values tv ON t.trial_id = tv.trial_id
WHERE t.study_id = ? AND t.state = 'COMPLETE'
ORDER BY tv.value ASC
LIMIT 1
''', (self.study_id,))
result = cursor.fetchone()
conn.close()
if result:
return {
'trial_id': result[0],
'number': result[1],
'value': result[2]
}
return None
def get_summary(self) -> Dict[str, Any]:
"""Get database summary for logging."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = 'COMPLETE'",
(self.study_id,)
)
complete = cursor.fetchone()[0]
cursor.execute(
"SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = 'PRUNED'",
(self.study_id,)
)
pruned = cursor.fetchone()[0]
best = self.get_best_trial()
conn.close()
return {
'study_name': self.study_name,
'complete_trials': complete,
'pruned_trials': pruned,
'best_value': best['value'] if best else None,
'best_trial': best['number'] if best else None,
}
def clear(self):
"""Clear all trials (for re-running)."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM trial_user_attributes WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
cursor.execute("DELETE FROM trial_system_attributes WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
cursor.execute("DELETE FROM trial_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
cursor.execute("DELETE FROM trial_params WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
cursor.execute("DELETE FROM trial_intermediate_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
cursor.execute("DELETE FROM trial_heartbeats WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,))
cursor.execute("DELETE FROM trials WHERE study_id = ?", (self.study_id,))
conn.commit()
conn.close()
def convert_custom_to_optuna(
db_path: Union[str, Path],
study_name: str,
custom_table: str = "trials",
param_columns: Optional[List[str]] = None,
objective_column: str = "weighted_sum",
status_column: str = "status",
datetime_column: str = "datetime_complete",
) -> int:
"""
Convert a custom database schema to Optuna-compatible format.
Args:
db_path: Path to database
study_name: Name for the study
custom_table: Name of custom trials table to convert
param_columns: List of parameter column names (auto-detect if None)
objective_column: Column containing objective value
status_column: Column containing trial status
datetime_column: Column containing timestamp
Returns:
Number of trials converted
"""
db_path = Path(db_path)
backup_path = db_path.with_suffix('.db.bak')
# Backup original
import shutil
shutil.copy(db_path, backup_path)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if custom table exists
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(custom_table,)
)
if not cursor.fetchone():
conn.close()
raise ValueError(f"Table '{custom_table}' not found")
# Get column info
cursor.execute(f"PRAGMA table_info({custom_table})")
columns = {row[1]: row[2] for row in cursor.fetchall()}
# Read all custom trials
cursor.execute(f"SELECT * FROM {custom_table}")
custom_trials = cursor.fetchall()
# Get column names
cursor.execute(f"PRAGMA table_info({custom_table})")
col_names = [row[1] for row in cursor.fetchall()]
# Drop ALL existing tables to start fresh
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
existing_tables = [row[0] for row in cursor.fetchall()]
for table in existing_tables:
if table != 'sqlite_sequence': # Don't drop internal SQLite table
cursor.execute(f"DROP TABLE IF EXISTS {table}")
conn.commit()
conn.close()
# Now create proper Optuna schema from scratch
db = DashboardDB(db_path, study_name)
converted = 0
for row in custom_trials:
trial_data = dict(zip(col_names, row))
# Extract params from JSON if available
params = {}
if 'params_json' in trial_data and trial_data['params_json']:
try:
params = json.loads(trial_data['params_json'])
except:
pass
# Extract objectives from JSON if available
objectives = {}
if 'objectives_json' in trial_data and trial_data['objectives_json']:
try:
objectives = json.loads(trial_data['objectives_json'])
except:
pass
# Get weighted sum
weighted_sum = trial_data.get(objective_column)
# Map status to state
status = trial_data.get(status_column, 'COMPLETE')
state = 'COMPLETE' if status.upper() in ('COMPLETE', 'COMPLETED') else status.upper()
# Get feasibility
is_feasible = bool(trial_data.get('is_feasible', 1))
# Build metadata
metadata = {}
for key in ['turbo_iteration', 'predicted_ws', 'prediction_error', 'solve_time']:
if key in trial_data and trial_data[key] is not None:
metadata[key] = trial_data[key]
# Log trial
db.log_trial(
params=params,
objectives=objectives,
weighted_sum=weighted_sum,
is_feasible=is_feasible,
state=state,
datetime_start=trial_data.get('datetime_start'),
datetime_complete=trial_data.get(datetime_column),
metadata=metadata,
)
converted += 1
return converted
# Convenience function for turbo optimization
def init_turbo_database(study_dir: Path, study_name: str) -> DashboardDB:
"""
Initialize a dashboard-compatible database for turbo optimization.
Args:
study_dir: Study directory (contains 3_results/)
study_name: Name of the study
Returns:
DashboardDB instance ready for logging
"""
results_dir = study_dir / "3_results"
results_dir.mkdir(parents=True, exist_ok=True)
db_path = results_dir / "study.db"
return DashboardDB(db_path, study_name)