"""SQLite database schema and connection management.""" import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Generator import atocore.config as _config from atocore.observability.logger import get_logger log = get_logger("database") SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS source_documents ( id TEXT PRIMARY KEY, file_path TEXT UNIQUE NOT NULL, file_hash TEXT NOT NULL, title TEXT, doc_type TEXT DEFAULT 'markdown', tags TEXT DEFAULT '[]', ingested_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS source_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL REFERENCES source_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, content TEXT NOT NULL, heading_path TEXT DEFAULT '', char_count INTEGER NOT NULL, metadata TEXT DEFAULT '{}', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, memory_type TEXT NOT NULL, content TEXT NOT NULL, project TEXT DEFAULT '', source_chunk_id TEXT REFERENCES source_chunks(id), confidence REAL DEFAULT 1.0, status TEXT DEFAULT 'active', last_referenced_at DATETIME, reference_count INTEGER DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, description TEXT DEFAULT '', status TEXT DEFAULT 'active', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS interactions ( id TEXT PRIMARY KEY, prompt TEXT NOT NULL, context_pack TEXT DEFAULT '{}', response_summary TEXT DEFAULT '', response TEXT DEFAULT '', memories_used TEXT DEFAULT '[]', chunks_used TEXT DEFAULT '[]', client TEXT DEFAULT '', session_id TEXT DEFAULT '', project TEXT DEFAULT '', project_id TEXT REFERENCES projects(id), created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- Indexes that reference columns guaranteed to exist since the first -- release ship here. Indexes that reference columns added by later -- migrations (memories.project, interactions.project, -- interactions.session_id) are created inside _apply_migrations AFTER -- the corresponding ALTER TABLE, NOT here. Creating them here would -- fail on upgrade from a pre-migration schema because CREATE TABLE -- IF NOT EXISTS is a no-op on an existing table, so the new columns -- wouldn't be added before the CREATE INDEX runs. CREATE INDEX IF NOT EXISTS idx_chunks_document ON source_chunks(document_id); CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type); CREATE INDEX IF NOT EXISTS idx_memories_status ON memories(status); CREATE INDEX IF NOT EXISTS idx_interactions_project ON interactions(project_id); """ def _ensure_data_dir() -> None: _config.ensure_runtime_dirs() def init_db() -> None: """Initialize the database with schema.""" _ensure_data_dir() with get_connection() as conn: conn.executescript(SCHEMA_SQL) _apply_migrations(conn) log.info("database_initialized", path=str(_config.settings.db_path)) def _apply_migrations(conn: sqlite3.Connection) -> None: """Apply lightweight schema migrations for existing local databases.""" if not _column_exists(conn, "memories", "project"): conn.execute("ALTER TABLE memories ADD COLUMN project TEXT DEFAULT ''") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_project ON memories(project)") # Phase 9 Commit B: reinforcement columns. # last_referenced_at records when a memory was most recently referenced # in a captured interaction; reference_count is a monotonically # increasing counter bumped on every reference. Together they let # Reflection (Commit C) and decay (deferred) reason about which # memories are actually being used versus which have gone cold. if not _column_exists(conn, "memories", "last_referenced_at"): conn.execute("ALTER TABLE memories ADD COLUMN last_referenced_at DATETIME") if not _column_exists(conn, "memories", "reference_count"): conn.execute("ALTER TABLE memories ADD COLUMN reference_count INTEGER DEFAULT 0") conn.execute( "CREATE INDEX IF NOT EXISTS idx_memories_last_referenced ON memories(last_referenced_at)" ) # Phase 3 (Auto-Organization V1): domain tags + expiry. # domain_tags is a JSON array of lowercase strings (optics, mechanics, # firmware, business, etc.) inferred by the LLM during triage. Used for # cross-project retrieval: a query about "optics" can surface matches from # p04 + p05 + p06 without knowing all the project names. # valid_until is an ISO UTC timestamp beyond which the memory is # considered stale. get_memories_for_context filters these out of context # packs automatically so ephemeral facts (status snapshots, weekly counts) # don't pollute grounding once they've aged out. if not _column_exists(conn, "memories", "domain_tags"): conn.execute("ALTER TABLE memories ADD COLUMN domain_tags TEXT DEFAULT '[]'") if not _column_exists(conn, "memories", "valid_until"): conn.execute("ALTER TABLE memories ADD COLUMN valid_until DATETIME") conn.execute( "CREATE INDEX IF NOT EXISTS idx_memories_valid_until ON memories(valid_until)" ) # Phase 5 (Engineering V1): when a memory graduates to an entity, we # keep the memory row as an immutable historical pointer. The forward # pointer lets downstream code follow "what did this memory become?" # without having to join through source_refs. if not _column_exists(conn, "memories", "graduated_to_entity_id"): conn.execute("ALTER TABLE memories ADD COLUMN graduated_to_entity_id TEXT") conn.execute( "CREATE INDEX IF NOT EXISTS idx_memories_graduated ON memories(graduated_to_entity_id)" ) # Phase 4 (Robustness V1): append-only audit log for memory mutations. # Every create/update/promote/reject/supersede/invalidate/reinforce/expire/ # auto_promote writes one row here. before/after are JSON snapshots of the # relevant fields. actor lets us distinguish auto-triage vs human-triage vs # api vs cron. This is the "how did this memory get to its current state" # trail — essential once the brain starts auto-organizing itself. conn.execute( """ CREATE TABLE IF NOT EXISTS memory_audit ( id TEXT PRIMARY KEY, memory_id TEXT NOT NULL, action TEXT NOT NULL, actor TEXT DEFAULT 'api', before_json TEXT DEFAULT '{}', after_json TEXT DEFAULT '{}', note TEXT DEFAULT '', timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_audit_memory ON memory_audit(memory_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_audit_timestamp ON memory_audit(timestamp)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_audit_action ON memory_audit(action)") # Phase 5 (Engineering V1): entity_kind discriminator lets one audit # table serve both memories AND entities. Default "memory" keeps existing # rows correct; entity mutations write entity_kind="entity". if not _column_exists(conn, "memory_audit", "entity_kind"): conn.execute("ALTER TABLE memory_audit ADD COLUMN entity_kind TEXT DEFAULT 'memory'") conn.execute( "CREATE INDEX IF NOT EXISTS idx_memory_audit_entity_kind ON memory_audit(entity_kind)" ) # Phase 5: conflicts + conflict_members tables per conflict-model.md. # A conflict is "two or more active rows claiming the same slot with # incompatible values". slot_kind + slot_key identify the logical slot # (e.g., "component.material" for some component id). Members point # back to the conflicting rows (memory or entity) with layer trust so # resolution can pick the highest-trust winner. conn.execute( """ CREATE TABLE IF NOT EXISTS conflicts ( id TEXT PRIMARY KEY, slot_kind TEXT NOT NULL, slot_key TEXT NOT NULL, project TEXT DEFAULT '', status TEXT DEFAULT 'open', resolution TEXT DEFAULT '', resolved_at DATETIME, detected_at DATETIME DEFAULT CURRENT_TIMESTAMP, note TEXT DEFAULT '' ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS conflict_members ( id TEXT PRIMARY KEY, conflict_id TEXT NOT NULL REFERENCES conflicts(id) ON DELETE CASCADE, member_kind TEXT NOT NULL, member_id TEXT NOT NULL, member_layer_trust INTEGER DEFAULT 0, value_snapshot TEXT DEFAULT '' ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_conflicts_status ON conflicts(status)") conn.execute("CREATE INDEX IF NOT EXISTS idx_conflicts_project ON conflicts(project)") conn.execute( "CREATE INDEX IF NOT EXISTS idx_conflicts_slot ON conflicts(slot_kind, slot_key)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_conflict_members_conflict ON conflict_members(conflict_id)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_conflict_members_member ON conflict_members(member_kind, member_id)" ) # Phase 9 Commit A: capture loop columns on the interactions table. # The original schema only carried prompt + project_id + a context_pack # JSON blob. To make interactions a real audit trail of what AtoCore fed # the LLM and what came back, we record the full response, the chunk # and memory ids that were actually used, plus client + session metadata. if not _column_exists(conn, "interactions", "response"): conn.execute("ALTER TABLE interactions ADD COLUMN response TEXT DEFAULT ''") if not _column_exists(conn, "interactions", "memories_used"): conn.execute("ALTER TABLE interactions ADD COLUMN memories_used TEXT DEFAULT '[]'") if not _column_exists(conn, "interactions", "chunks_used"): conn.execute("ALTER TABLE interactions ADD COLUMN chunks_used TEXT DEFAULT '[]'") if not _column_exists(conn, "interactions", "client"): conn.execute("ALTER TABLE interactions ADD COLUMN client TEXT DEFAULT ''") if not _column_exists(conn, "interactions", "session_id"): conn.execute("ALTER TABLE interactions ADD COLUMN session_id TEXT DEFAULT ''") if not _column_exists(conn, "interactions", "project"): conn.execute("ALTER TABLE interactions ADD COLUMN project TEXT DEFAULT ''") conn.execute( "CREATE INDEX IF NOT EXISTS idx_interactions_session ON interactions(session_id)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_interactions_project_name ON interactions(project)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at)" ) # Phase 7A (Memory Consolidation — "sleep cycle"): merge candidates. # When the dedup detector finds a cluster of semantically similar active # memories within the same (project, memory_type) bucket, it drafts a # unified content via LLM and writes a proposal here. The triage UI # surfaces these for human approval. On approve, source memories become # status=superseded and a new merged memory is created. # memory_ids is a JSON array (length >= 2) of the source memory ids. # proposed_* hold the LLM's draft; a human can edit before approve. # result_memory_id is filled on approve with the new merged memory's id. conn.execute( """ CREATE TABLE IF NOT EXISTS memory_merge_candidates ( id TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', memory_ids TEXT NOT NULL, similarity REAL, proposed_content TEXT, proposed_memory_type TEXT, proposed_project TEXT, proposed_tags TEXT DEFAULT '[]', proposed_confidence REAL, reason TEXT DEFAULT '', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, resolved_at DATETIME, resolved_by TEXT, result_memory_id TEXT ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_mmc_status ON memory_merge_candidates(status)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_mmc_created_at ON memory_merge_candidates(created_at)" ) # Phase 7C (Memory Consolidation — tag canonicalization): alias → canonical # map for domain_tags. A weekly LLM pass proposes rows here; high-confidence # ones auto-apply (rewrite domain_tags across all memories), low-confidence # ones stay pending for human approval. Immutable history: resolved rows # keep status=approved/rejected; the same alias can re-appear with a new # id if the tag reaches a different canonical later. conn.execute( """ CREATE TABLE IF NOT EXISTS tag_aliases ( id TEXT PRIMARY KEY, alias TEXT NOT NULL, canonical TEXT NOT NULL, status TEXT DEFAULT 'pending', confidence REAL DEFAULT 0.0, alias_count INTEGER DEFAULT 0, canonical_count INTEGER DEFAULT 0, reason TEXT DEFAULT '', applied_to_memories INTEGER DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, resolved_at DATETIME, resolved_by TEXT ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_tag_aliases_status ON tag_aliases(status)") conn.execute("CREATE INDEX IF NOT EXISTS idx_tag_aliases_alias ON tag_aliases(alias)") # Issue F (visual evidence): binary asset store. One row per unique # content hash — re-uploading the same file is idempotent. The blob # itself lives on disk under stored_path; this table is the catalog. # width/height are populated for image mime types (NULL otherwise). # source_refs is a JSON array of free-form provenance pointers # (e.g. "session:", "interaction:") that survive independent # of the EVIDENCED_BY graph. status=invalid tombstones an asset # without dropping the row so audit trails stay intact. conn.execute( """ CREATE TABLE IF NOT EXISTS assets ( id TEXT PRIMARY KEY, hash_sha256 TEXT UNIQUE NOT NULL, mime_type TEXT NOT NULL, size_bytes INTEGER NOT NULL, width INTEGER, height INTEGER, stored_path TEXT NOT NULL, original_filename TEXT DEFAULT '', project TEXT DEFAULT '', caption TEXT DEFAULT '', source_refs TEXT DEFAULT '[]', status TEXT DEFAULT 'active', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_hash ON assets(hash_sha256)") conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_project ON assets(project)") conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_status ON assets(status)") def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() return any(row["name"] == column for row in rows) @contextmanager def get_connection() -> Generator[sqlite3.Connection, None, None]: """Get a database connection with row factory.""" _ensure_data_dir() conn = sqlite3.connect( str(_config.settings.db_path), timeout=_config.settings.db_busy_timeout_ms / 1000, ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute(f"PRAGMA busy_timeout = {_config.settings.db_busy_timeout_ms}") conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA synchronous = NORMAL") try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close()