"""SQLite database schema and connection management.""" import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Generator import atocore.config as _config from atocore.observability.logger import get_logger log = get_logger("database") SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS source_documents ( id TEXT PRIMARY KEY, file_path TEXT UNIQUE NOT NULL, file_hash TEXT NOT NULL, title TEXT, doc_type TEXT DEFAULT 'markdown', tags TEXT DEFAULT '[]', ingested_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS source_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL REFERENCES source_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, content TEXT NOT NULL, heading_path TEXT DEFAULT '', char_count INTEGER NOT NULL, metadata TEXT DEFAULT '{}', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, memory_type TEXT NOT NULL, content TEXT NOT NULL, project TEXT DEFAULT '', source_chunk_id TEXT REFERENCES source_chunks(id), confidence REAL DEFAULT 1.0, status TEXT DEFAULT 'active', last_referenced_at DATETIME, reference_count INTEGER DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, description TEXT DEFAULT '', status TEXT DEFAULT 'active', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS interactions ( id TEXT PRIMARY KEY, prompt TEXT NOT NULL, context_pack TEXT DEFAULT '{}', response_summary TEXT DEFAULT '', response TEXT DEFAULT '', memories_used TEXT DEFAULT '[]', chunks_used TEXT DEFAULT '[]', client TEXT DEFAULT '', session_id TEXT DEFAULT '', project TEXT DEFAULT '', project_id TEXT REFERENCES projects(id), created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- Indexes that reference columns guaranteed to exist since the first -- release ship here. Indexes that reference columns added by later -- migrations (memories.project, interactions.project, -- interactions.session_id) are created inside _apply_migrations AFTER -- the corresponding ALTER TABLE, NOT here. Creating them here would -- fail on upgrade from a pre-migration schema because CREATE TABLE -- IF NOT EXISTS is a no-op on an existing table, so the new columns -- wouldn't be added before the CREATE INDEX runs. CREATE INDEX IF NOT EXISTS idx_chunks_document ON source_chunks(document_id); CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type); CREATE INDEX IF NOT EXISTS idx_memories_status ON memories(status); CREATE INDEX IF NOT EXISTS idx_interactions_project ON interactions(project_id); """ def _ensure_data_dir() -> None: _config.ensure_runtime_dirs() def init_db() -> None: """Initialize the database with schema.""" _ensure_data_dir() with get_connection() as conn: conn.executescript(SCHEMA_SQL) _apply_migrations(conn) log.info("database_initialized", path=str(_config.settings.db_path)) def _apply_migrations(conn: sqlite3.Connection) -> None: """Apply lightweight schema migrations for existing local databases.""" if not _column_exists(conn, "memories", "project"): conn.execute("ALTER TABLE memories ADD COLUMN project TEXT DEFAULT ''") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_project ON memories(project)") # Phase 9 Commit B: reinforcement columns. # last_referenced_at records when a memory was most recently referenced # in a captured interaction; reference_count is a monotonically # increasing counter bumped on every reference. Together they let # Reflection (Commit C) and decay (deferred) reason about which # memories are actually being used versus which have gone cold. if not _column_exists(conn, "memories", "last_referenced_at"): conn.execute("ALTER TABLE memories ADD COLUMN last_referenced_at DATETIME") if not _column_exists(conn, "memories", "reference_count"): conn.execute("ALTER TABLE memories ADD COLUMN reference_count INTEGER DEFAULT 0") conn.execute( "CREATE INDEX IF NOT EXISTS idx_memories_last_referenced ON memories(last_referenced_at)" ) # Phase 9 Commit A: capture loop columns on the interactions table. # The original schema only carried prompt + project_id + a context_pack # JSON blob. To make interactions a real audit trail of what AtoCore fed # the LLM and what came back, we record the full response, the chunk # and memory ids that were actually used, plus client + session metadata. if not _column_exists(conn, "interactions", "response"): conn.execute("ALTER TABLE interactions ADD COLUMN response TEXT DEFAULT ''") if not _column_exists(conn, "interactions", "memories_used"): conn.execute("ALTER TABLE interactions ADD COLUMN memories_used TEXT DEFAULT '[]'") if not _column_exists(conn, "interactions", "chunks_used"): conn.execute("ALTER TABLE interactions ADD COLUMN chunks_used TEXT DEFAULT '[]'") if not _column_exists(conn, "interactions", "client"): conn.execute("ALTER TABLE interactions ADD COLUMN client TEXT DEFAULT ''") if not _column_exists(conn, "interactions", "session_id"): conn.execute("ALTER TABLE interactions ADD COLUMN session_id TEXT DEFAULT ''") if not _column_exists(conn, "interactions", "project"): conn.execute("ALTER TABLE interactions ADD COLUMN project TEXT DEFAULT ''") conn.execute( "CREATE INDEX IF NOT EXISTS idx_interactions_session ON interactions(session_id)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_interactions_project_name ON interactions(project)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at)" ) def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() return any(row["name"] == column for row in rows) @contextmanager def get_connection() -> Generator[sqlite3.Connection, None, None]: """Get a database connection with row factory.""" _ensure_data_dir() conn = sqlite3.connect( str(_config.settings.db_path), timeout=_config.settings.db_busy_timeout_ms / 1000, ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute(f"PRAGMA busy_timeout = {_config.settings.db_busy_timeout_ms}") conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA synchronous = NORMAL") try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close()