""" Dashboard Database Compatibility Module ======================================== Provides Optuna-compatible database schema for all optimization types, ensuring dashboard compatibility regardless of optimization method (standard Optuna, turbo/surrogate, GNN, etc.) Usage: from optimization_engine.utils.dashboard_db import DashboardDB # Initialize (creates Optuna-compatible schema) db = DashboardDB(study_dir / "3_results" / "study.db", study_name="my_study") # Log a trial db.log_trial( params={"rib_thickness": 10.5, "mass": 118.0}, objectives={"wfe_40_20": 5.63, "wfe_60_20": 12.75}, weighted_sum=175.87, # optional, for single-objective ranking is_feasible=True, metadata={"turbo_iteration": 1, "predicted_ws": 186.77} ) # Mark best trial db.mark_best(trial_id=1) # Get summary print(db.get_summary()) Schema follows Optuna's native format for full dashboard compatibility. """ import sqlite3 import json from pathlib import Path from datetime import datetime from typing import Dict, Any, Optional, List, Union class DashboardDB: """Optuna-compatible database wrapper for dashboard integration.""" SCHEMA_VERSION = 1 def __init__(self, db_path: Union[str, Path], study_name: str, direction: str = "MINIMIZE"): """ Initialize database with Optuna-compatible schema. Args: db_path: Path to SQLite database file study_name: Name of the optimization study direction: "MINIMIZE" or "MAXIMIZE" """ self.db_path = Path(db_path) self.study_name = study_name self.direction = direction self._init_schema() def _init_schema(self): """Create Optuna-compatible database schema.""" self.db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Core Optuna tables # version_info - tracks schema version cursor.execute(''' CREATE TABLE IF NOT EXISTS version_info ( version_info_id INTEGER PRIMARY KEY, schema_version INTEGER, library_version VARCHAR(256) ) ''') # Insert version if not exists cursor.execute("SELECT COUNT(*) FROM version_info") if cursor.fetchone()[0] == 0: cursor.execute( "INSERT INTO version_info (schema_version, library_version) VALUES (?, ?)", (12, "atomizer-dashboard-1.0") ) # studies - Optuna study metadata cursor.execute(''' CREATE TABLE IF NOT EXISTS studies ( study_id INTEGER PRIMARY KEY, study_name VARCHAR(512) UNIQUE ) ''') # Insert study if not exists cursor.execute("SELECT study_id FROM studies WHERE study_name = ?", (self.study_name,)) result = cursor.fetchone() if result: self.study_id = result[0] else: cursor.execute("INSERT INTO studies (study_name) VALUES (?)", (self.study_name,)) self.study_id = cursor.lastrowid # study_directions - optimization direction cursor.execute(''' CREATE TABLE IF NOT EXISTS study_directions ( study_direction_id INTEGER PRIMARY KEY, direction VARCHAR(8) NOT NULL, study_id INTEGER, objective INTEGER, FOREIGN KEY (study_id) REFERENCES studies(study_id) ) ''') # Insert direction if not exists cursor.execute( "SELECT COUNT(*) FROM study_directions WHERE study_id = ?", (self.study_id,) ) if cursor.fetchone()[0] == 0: cursor.execute( "INSERT INTO study_directions (direction, study_id, objective) VALUES (?, ?, ?)", (self.direction, self.study_id, 0) ) # trials - main trial table (Optuna schema) cursor.execute(''' CREATE TABLE IF NOT EXISTS trials ( trial_id INTEGER PRIMARY KEY, number INTEGER, study_id INTEGER, state VARCHAR(8) NOT NULL DEFAULT 'COMPLETE', datetime_start DATETIME, datetime_complete DATETIME, FOREIGN KEY (study_id) REFERENCES studies(study_id) ) ''') # trial_values - objective values cursor.execute(''' CREATE TABLE IF NOT EXISTS trial_values ( trial_value_id INTEGER PRIMARY KEY, trial_id INTEGER, objective INTEGER, value FLOAT, value_type VARCHAR(7) DEFAULT 'FINITE', FOREIGN KEY (trial_id) REFERENCES trials(trial_id) ) ''') # trial_params - parameter values cursor.execute(''' CREATE TABLE IF NOT EXISTS trial_params ( param_id INTEGER PRIMARY KEY, trial_id INTEGER, param_name VARCHAR(512), param_value FLOAT, distribution_json TEXT, FOREIGN KEY (trial_id) REFERENCES trials(trial_id) ) ''') # trial_user_attributes - custom metadata cursor.execute(''' CREATE TABLE IF NOT EXISTS trial_user_attributes ( trial_user_attribute_id INTEGER PRIMARY KEY, trial_id INTEGER, key VARCHAR(512), value_json TEXT, FOREIGN KEY (trial_id) REFERENCES trials(trial_id) ) ''') # trial_system_attributes - system metadata cursor.execute(''' CREATE TABLE IF NOT EXISTS trial_system_attributes ( trial_system_attribute_id INTEGER PRIMARY KEY, trial_id INTEGER, key VARCHAR(512), value_json TEXT, FOREIGN KEY (trial_id) REFERENCES trials(trial_id) ) ''') # study_user_attributes cursor.execute(''' CREATE TABLE IF NOT EXISTS study_user_attributes ( study_user_attribute_id INTEGER PRIMARY KEY, study_id INTEGER, key VARCHAR(512), value_json TEXT, FOREIGN KEY (study_id) REFERENCES studies(study_id) ) ''') # study_system_attributes cursor.execute(''' CREATE TABLE IF NOT EXISTS study_system_attributes ( study_system_attribute_id INTEGER PRIMARY KEY, study_id INTEGER, key VARCHAR(512), value_json TEXT, FOREIGN KEY (study_id) REFERENCES studies(study_id) ) ''') # trial_intermediate_values (for pruning callbacks) cursor.execute(''' CREATE TABLE IF NOT EXISTS trial_intermediate_values ( trial_intermediate_value_id INTEGER PRIMARY KEY, trial_id INTEGER, step INTEGER, intermediate_value FLOAT, intermediate_value_type VARCHAR(7) DEFAULT 'FINITE', FOREIGN KEY (trial_id) REFERENCES trials(trial_id) ) ''') # trial_heartbeats (for distributed optimization) cursor.execute(''' CREATE TABLE IF NOT EXISTS trial_heartbeats ( trial_heartbeat_id INTEGER PRIMARY KEY, trial_id INTEGER, heartbeat DATETIME, FOREIGN KEY (trial_id) REFERENCES trials(trial_id) ) ''') # alembic_version (Optuna uses alembic for migrations) cursor.execute(''' CREATE TABLE IF NOT EXISTS alembic_version ( version_num VARCHAR(32) PRIMARY KEY ) ''') cursor.execute("INSERT OR IGNORE INTO alembic_version VALUES ('v3.0.0')") # Create indexes for performance cursor.execute("CREATE INDEX IF NOT EXISTS ix_trials_study_id ON trials(study_id)") cursor.execute("CREATE INDEX IF NOT EXISTS ix_trials_state ON trials(state)") cursor.execute("CREATE INDEX IF NOT EXISTS ix_trial_values_trial_id ON trial_values(trial_id)") cursor.execute("CREATE INDEX IF NOT EXISTS ix_trial_params_trial_id ON trial_params(trial_id)") conn.commit() conn.close() def log_trial( self, params: Dict[str, float], objectives: Dict[str, float], weighted_sum: Optional[float] = None, is_feasible: bool = True, state: str = "COMPLETE", datetime_start: Optional[str] = None, datetime_complete: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> int: """ Log a trial to the database. Args: params: Parameter name -> value mapping objectives: Objective name -> value mapping weighted_sum: Optional weighted sum for single-objective ranking is_feasible: Whether trial meets constraints state: Trial state ("COMPLETE", "PRUNED", "FAIL", "RUNNING") datetime_start: ISO format timestamp datetime_complete: ISO format timestamp metadata: Additional metadata (turbo_iteration, predicted values, etc.) Returns: trial_id of inserted trial """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get next trial number cursor.execute( "SELECT COALESCE(MAX(number), -1) + 1 FROM trials WHERE study_id = ?", (self.study_id,) ) trial_number = cursor.fetchone()[0] # Default timestamps now = datetime.now().isoformat() dt_start = datetime_start or now dt_complete = datetime_complete or now # Insert trial cursor.execute(''' INSERT INTO trials (number, study_id, state, datetime_start, datetime_complete) VALUES (?, ?, ?, ?, ?) ''', (trial_number, self.study_id, state, dt_start, dt_complete)) trial_id = cursor.lastrowid # Insert objective values # Use weighted_sum as primary objective if provided, else first objective value primary_value = weighted_sum if weighted_sum is not None else list(objectives.values())[0] cursor.execute(''' INSERT INTO trial_values (trial_id, objective, value, value_type) VALUES (?, ?, ?, ?) ''', (trial_id, 0, primary_value, 'FINITE')) # Insert all objectives as user attributes for obj_name, obj_value in objectives.items(): cursor.execute(''' INSERT INTO trial_user_attributes (trial_id, key, value_json) VALUES (?, ?, ?) ''', (trial_id, f"obj_{obj_name}", json.dumps(obj_value))) # Insert parameters for param_name, param_value in params.items(): cursor.execute(''' INSERT INTO trial_params (trial_id, param_name, param_value, distribution_json) VALUES (?, ?, ?, ?) ''', (trial_id, param_name, param_value, '{}')) # Insert feasibility as user attribute cursor.execute(''' INSERT INTO trial_user_attributes (trial_id, key, value_json) VALUES (?, ?, ?) ''', (trial_id, 'is_feasible', json.dumps(is_feasible))) # Insert metadata if metadata: for key, value in metadata.items(): cursor.execute(''' INSERT INTO trial_user_attributes (trial_id, key, value_json) VALUES (?, ?, ?) ''', (trial_id, key, json.dumps(value))) conn.commit() conn.close() return trial_id def mark_best(self, trial_id: int): """Mark a trial as the best (adds user attribute).""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Remove previous best markers cursor.execute(''' DELETE FROM trial_user_attributes WHERE key = 'is_best' AND trial_id IN ( SELECT trial_id FROM trials WHERE study_id = ? ) ''', (self.study_id,)) # Mark new best cursor.execute(''' INSERT INTO trial_user_attributes (trial_id, key, value_json) VALUES (?, 'is_best', 'true') ''', (trial_id,)) conn.commit() conn.close() def get_trial_count(self, state: str = "COMPLETE") -> int: """Get count of trials in given state.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = ?", (self.study_id, state) ) count = cursor.fetchone()[0] conn.close() return count def get_best_trial(self) -> Optional[Dict[str, Any]]: """Get best trial (lowest objective value).""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT t.trial_id, t.number, tv.value FROM trials t JOIN trial_values tv ON t.trial_id = tv.trial_id WHERE t.study_id = ? AND t.state = 'COMPLETE' ORDER BY tv.value ASC LIMIT 1 ''', (self.study_id,)) result = cursor.fetchone() conn.close() if result: return { 'trial_id': result[0], 'number': result[1], 'value': result[2] } return None def get_summary(self) -> Dict[str, Any]: """Get database summary for logging.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = 'COMPLETE'", (self.study_id,) ) complete = cursor.fetchone()[0] cursor.execute( "SELECT COUNT(*) FROM trials WHERE study_id = ? AND state = 'PRUNED'", (self.study_id,) ) pruned = cursor.fetchone()[0] best = self.get_best_trial() conn.close() return { 'study_name': self.study_name, 'complete_trials': complete, 'pruned_trials': pruned, 'best_value': best['value'] if best else None, 'best_trial': best['number'] if best else None, } def clear(self): """Clear all trials (for re-running).""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("DELETE FROM trial_user_attributes WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,)) cursor.execute("DELETE FROM trial_system_attributes WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,)) cursor.execute("DELETE FROM trial_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,)) cursor.execute("DELETE FROM trial_params WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,)) cursor.execute("DELETE FROM trial_intermediate_values WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,)) cursor.execute("DELETE FROM trial_heartbeats WHERE trial_id IN (SELECT trial_id FROM trials WHERE study_id = ?)", (self.study_id,)) cursor.execute("DELETE FROM trials WHERE study_id = ?", (self.study_id,)) conn.commit() conn.close() def convert_custom_to_optuna( db_path: Union[str, Path], study_name: str, custom_table: str = "trials", param_columns: Optional[List[str]] = None, objective_column: str = "weighted_sum", status_column: str = "status", datetime_column: str = "datetime_complete", ) -> int: """ Convert a custom database schema to Optuna-compatible format. Args: db_path: Path to database study_name: Name for the study custom_table: Name of custom trials table to convert param_columns: List of parameter column names (auto-detect if None) objective_column: Column containing objective value status_column: Column containing trial status datetime_column: Column containing timestamp Returns: Number of trials converted """ db_path = Path(db_path) backup_path = db_path.with_suffix('.db.bak') # Backup original import shutil shutil.copy(db_path, backup_path) conn = sqlite3.connect(db_path) cursor = conn.cursor() # Check if custom table exists cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (custom_table,) ) if not cursor.fetchone(): conn.close() raise ValueError(f"Table '{custom_table}' not found") # Get column info cursor.execute(f"PRAGMA table_info({custom_table})") columns = {row[1]: row[2] for row in cursor.fetchall()} # Read all custom trials cursor.execute(f"SELECT * FROM {custom_table}") custom_trials = cursor.fetchall() # Get column names cursor.execute(f"PRAGMA table_info({custom_table})") col_names = [row[1] for row in cursor.fetchall()] # Drop ALL existing tables to start fresh cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") existing_tables = [row[0] for row in cursor.fetchall()] for table in existing_tables: if table != 'sqlite_sequence': # Don't drop internal SQLite table cursor.execute(f"DROP TABLE IF EXISTS {table}") conn.commit() conn.close() # Now create proper Optuna schema from scratch db = DashboardDB(db_path, study_name) converted = 0 for row in custom_trials: trial_data = dict(zip(col_names, row)) # Extract params from JSON if available params = {} if 'params_json' in trial_data and trial_data['params_json']: try: params = json.loads(trial_data['params_json']) except: pass # Extract objectives from JSON if available objectives = {} if 'objectives_json' in trial_data and trial_data['objectives_json']: try: objectives = json.loads(trial_data['objectives_json']) except: pass # Get weighted sum weighted_sum = trial_data.get(objective_column) # Map status to state status = trial_data.get(status_column, 'COMPLETE') state = 'COMPLETE' if status.upper() in ('COMPLETE', 'COMPLETED') else status.upper() # Get feasibility is_feasible = bool(trial_data.get('is_feasible', 1)) # Build metadata metadata = {} for key in ['turbo_iteration', 'predicted_ws', 'prediction_error', 'solve_time']: if key in trial_data and trial_data[key] is not None: metadata[key] = trial_data[key] # Log trial db.log_trial( params=params, objectives=objectives, weighted_sum=weighted_sum, is_feasible=is_feasible, state=state, datetime_start=trial_data.get('datetime_start'), datetime_complete=trial_data.get(datetime_column), metadata=metadata, ) converted += 1 return converted # Convenience function for turbo optimization def init_turbo_database(study_dir: Path, study_name: str) -> DashboardDB: """ Initialize a dashboard-compatible database for turbo optimization. Args: study_dir: Study directory (contains 3_results/) study_name: Name of the study Returns: DashboardDB instance ready for logging """ results_dir = study_dir / "3_results" results_dir.mkdir(parents=True, exist_ok=True) db_path = results_dir / "study.db" return DashboardDB(db_path, study_name)