#!/usr/bin/env python3 """Nightly integrity check for AtoCore (Phase 4 Robustness V1). Scans the database for drift conditions that indicate something is silently broken: - Memories referencing a non-existent source chunk - Active memories with duplicate content within the same project+type - Project-state entries with invalid project_id - Orphaned source chunks whose parent document was deleted - Memory count vs vector count parity drift (Chroma vs SQLite) Findings are written to project state (atocore/status/integrity_check_result) and surfaced on the dashboard. Any non-empty finding emits a warning alert via the alerts framework. Usage: python3 scripts/integrity_check.py [--base-url URL] [--dry-run] """ from __future__ import annotations import argparse import json import os import sys # Make src/ importable when run from repo root sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--base-url", default=os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100")) parser.add_argument("--dry-run", action="store_true", help="Report without writing findings to state") args = parser.parse_args() from atocore.models.database import get_connection from atocore.observability.alerts import emit_alert findings: dict = { "orphan_chunk_refs": 0, "duplicate_active": 0, "orphan_project_state": 0, "orphan_chunks": 0, "memory_count": 0, "active_memory_count": 0, } details: list[str] = [] with get_connection() as conn: # 1) Memories referencing a non-existent source_chunk_id row = conn.execute( "SELECT COUNT(*) FROM memories m " "WHERE m.source_chunk_id IS NOT NULL " "AND m.source_chunk_id != '' " "AND NOT EXISTS (SELECT 1 FROM source_chunks c WHERE c.id = m.source_chunk_id)" ).fetchone() findings["orphan_chunk_refs"] = int(row[0] or 0) if findings["orphan_chunk_refs"]: details.append(f"{findings['orphan_chunk_refs']} memory(ies) reference a missing source_chunk_id") # 2) Duplicate active memories (same content + project + type) dup_rows = conn.execute( "SELECT memory_type, project, content, COUNT(*) AS n " "FROM memories WHERE status = 'active' " "GROUP BY memory_type, project, content HAVING n > 1" ).fetchall() findings["duplicate_active"] = sum(int(r[3]) - 1 for r in dup_rows) if findings["duplicate_active"]: details.append(f"{findings['duplicate_active']} duplicate active memory row(s) across {len(dup_rows)} group(s)") # 3) Project-state entries with invalid project_id row = conn.execute( "SELECT COUNT(*) FROM project_state ps " "WHERE NOT EXISTS (SELECT 1 FROM projects p WHERE p.id = ps.project_id)" ).fetchone() findings["orphan_project_state"] = int(row[0] or 0) if findings["orphan_project_state"]: details.append(f"{findings['orphan_project_state']} project_state row(s) reference a missing project") # 4) Orphaned source chunks row = conn.execute( "SELECT COUNT(*) FROM source_chunks c " "WHERE NOT EXISTS (SELECT 1 FROM source_documents d WHERE d.id = c.document_id)" ).fetchone() findings["orphan_chunks"] = int(row[0] or 0) if findings["orphan_chunks"]: details.append(f"{findings['orphan_chunks']} source chunk(s) have no parent document") # 5) Memory counts (context for dashboard) findings["memory_count"] = int(conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]) findings["active_memory_count"] = int( conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0] ) # Compose result result = { "findings": findings, "details": details, "ok": not details, } print(json.dumps(result, indent=2)) # Write to project state unless dry-run if not args.dry_run: try: import urllib.request body = json.dumps({ "project": "atocore", "category": "status", "key": "integrity_check_result", "value": json.dumps(result), "source": "integrity check", }).encode("utf-8") req = urllib.request.Request( f"{args.base_url}/project/state", data=body, method="POST", headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=10) except Exception as e: print(f"WARN: state write failed: {e}", file=sys.stderr) # Raise an alert if anything drifted if details: emit_alert( severity="warning", title="Integrity drift detected", message="; ".join(details), context={k: v for k, v in findings.items() if not k.endswith("_count")}, ) if __name__ == "__main__": main()