diff --git a/scripts/integrity_check.py b/scripts/integrity_check.py index 68043a2..46641c7 100644 --- a/scripts/integrity_check.py +++ b/scripts/integrity_check.py @@ -1,17 +1,10 @@ #!/usr/bin/env python3 -"""Nightly integrity check for AtoCore (Phase 4 Robustness V1). +"""Trigger the integrity check inside the AtoCore container. -Scans the database for drift conditions that indicate something is -silently broken: - - Memories referencing a non-existent source chunk - - Active memories with duplicate content within the same project+type - - Project-state entries with invalid project_id - - Orphaned source chunks whose parent document was deleted - - Memory count vs vector count parity drift (Chroma vs SQLite) - -Findings are written to project state -(atocore/status/integrity_check_result) and surfaced on the dashboard. -Any non-empty finding emits a warning alert via the alerts framework. +The scan itself lives in the container (needs direct DB access via the +already-loaded sqlite connection). This host-side wrapper just POSTs to +/admin/integrity-check so the nightly cron can kick it off from bash +without needing the container's Python deps on the host. Usage: python3 scripts/integrity_check.py [--base-url URL] [--dry-run] @@ -23,114 +16,33 @@ import argparse import json import os import sys - -# Make src/ importable when run from repo root -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) +import urllib.parse +import urllib.request def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--base-url", default=os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100")) - parser.add_argument("--dry-run", action="store_true", help="Report without writing findings to state") + parser.add_argument("--dry-run", action="store_true", + help="Report without persisting findings to state") args = parser.parse_args() - from atocore.models.database import get_connection - from atocore.observability.alerts import emit_alert + url = args.base_url.rstrip("/") + "/admin/integrity-check" + if args.dry_run: + url += "?persist=false" - findings: dict = { - "orphan_chunk_refs": 0, - "duplicate_active": 0, - "orphan_project_state": 0, - "orphan_chunks": 0, - "memory_count": 0, - "active_memory_count": 0, - } - details: list[str] = [] - - with get_connection() as conn: - # 1) Memories referencing a non-existent source_chunk_id - row = conn.execute( - "SELECT COUNT(*) FROM memories m " - "WHERE m.source_chunk_id IS NOT NULL " - "AND m.source_chunk_id != '' " - "AND NOT EXISTS (SELECT 1 FROM source_chunks c WHERE c.id = m.source_chunk_id)" - ).fetchone() - findings["orphan_chunk_refs"] = int(row[0] or 0) - if findings["orphan_chunk_refs"]: - details.append(f"{findings['orphan_chunk_refs']} memory(ies) reference a missing source_chunk_id") - - # 2) Duplicate active memories (same content + project + type) - dup_rows = conn.execute( - "SELECT memory_type, project, content, COUNT(*) AS n " - "FROM memories WHERE status = 'active' " - "GROUP BY memory_type, project, content HAVING n > 1" - ).fetchall() - findings["duplicate_active"] = sum(int(r[3]) - 1 for r in dup_rows) - if findings["duplicate_active"]: - details.append(f"{findings['duplicate_active']} duplicate active memory row(s) across {len(dup_rows)} group(s)") - - # 3) Project-state entries with invalid project_id - row = conn.execute( - "SELECT COUNT(*) FROM project_state ps " - "WHERE NOT EXISTS (SELECT 1 FROM projects p WHERE p.id = ps.project_id)" - ).fetchone() - findings["orphan_project_state"] = int(row[0] or 0) - if findings["orphan_project_state"]: - details.append(f"{findings['orphan_project_state']} project_state row(s) reference a missing project") - - # 4) Orphaned source chunks - row = conn.execute( - "SELECT COUNT(*) FROM source_chunks c " - "WHERE NOT EXISTS (SELECT 1 FROM source_documents d WHERE d.id = c.document_id)" - ).fetchone() - findings["orphan_chunks"] = int(row[0] or 0) - if findings["orphan_chunks"]: - details.append(f"{findings['orphan_chunks']} source chunk(s) have no parent document") - - # 5) Memory counts (context for dashboard) - findings["memory_count"] = int(conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]) - findings["active_memory_count"] = int( - conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0] - ) - - # Compose result - result = { - "findings": findings, - "details": details, - "ok": not details, - } + req = urllib.request.Request(url, method="POST") + try: + with urllib.request.urlopen(req, timeout=30) as resp: + result = json.loads(resp.read().decode("utf-8")) + except Exception as e: + print(f"ERROR: could not reach {url}: {e}", file=sys.stderr) + sys.exit(1) print(json.dumps(result, indent=2)) - - # Write to project state unless dry-run - if not args.dry_run: - try: - import urllib.request - body = json.dumps({ - "project": "atocore", - "category": "status", - "key": "integrity_check_result", - "value": json.dumps(result), - "source": "integrity check", - }).encode("utf-8") - req = urllib.request.Request( - f"{args.base_url}/project/state", - data=body, - method="POST", - headers={"Content-Type": "application/json"}, - ) - urllib.request.urlopen(req, timeout=10) - except Exception as e: - print(f"WARN: state write failed: {e}", file=sys.stderr) - - # Raise an alert if anything drifted - if details: - emit_alert( - severity="warning", - title="Integrity drift detected", - message="; ".join(details), - context={k: v for k, v in findings.items() if not k.endswith("_count")}, - ) + if not result.get("ok", True): + # Non-zero exit so cron logs flag it + sys.exit(2) if __name__ == "__main__": diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index 4f9f990..f0458d8 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -565,6 +565,90 @@ def api_recent_audit(limit: int = 50) -> dict: return {"entries": entries, "count": len(entries)} +@router.post("/admin/integrity-check") +def api_integrity_check(persist: bool = True) -> dict: + """Run the integrity scan inside the container (where DB + deps live). + + Returns findings and persists them to project state when persist=True. + """ + from atocore.models.database import get_connection + from atocore.context.project_state import set_state + import json as _json + + findings = { + "orphan_chunk_refs": 0, "duplicate_active": 0, + "orphan_project_state": 0, "orphan_chunks": 0, + "memory_count": 0, "active_memory_count": 0, + } + details: list[str] = [] + + with get_connection() as conn: + r = conn.execute( + "SELECT COUNT(*) FROM memories m " + "WHERE m.source_chunk_id IS NOT NULL AND m.source_chunk_id != '' " + "AND NOT EXISTS (SELECT 1 FROM source_chunks c WHERE c.id = m.source_chunk_id)" + ).fetchone() + findings["orphan_chunk_refs"] = int(r[0] or 0) + if findings["orphan_chunk_refs"]: + details.append(f"{findings['orphan_chunk_refs']} memory(ies) reference missing source_chunk_id") + + dup_rows = conn.execute( + "SELECT memory_type, project, content, COUNT(*) AS n FROM memories " + "WHERE status = 'active' GROUP BY memory_type, project, content HAVING n > 1" + ).fetchall() + findings["duplicate_active"] = sum(int(r[3]) - 1 for r in dup_rows) + if findings["duplicate_active"]: + details.append(f"{findings['duplicate_active']} duplicate active row(s) across {len(dup_rows)} group(s)") + + r = conn.execute( + "SELECT COUNT(*) FROM project_state ps " + "WHERE NOT EXISTS (SELECT 1 FROM projects p WHERE p.id = ps.project_id)" + ).fetchone() + findings["orphan_project_state"] = int(r[0] or 0) + if findings["orphan_project_state"]: + details.append(f"{findings['orphan_project_state']} project_state row(s) reference missing project") + + r = conn.execute( + "SELECT COUNT(*) FROM source_chunks c " + "WHERE NOT EXISTS (SELECT 1 FROM source_documents d WHERE d.id = c.document_id)" + ).fetchone() + findings["orphan_chunks"] = int(r[0] or 0) + if findings["orphan_chunks"]: + details.append(f"{findings['orphan_chunks']} chunk(s) have no parent document") + + findings["memory_count"] = int(conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]) + findings["active_memory_count"] = int( + conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0] + ) + + result = {"findings": findings, "details": details, "ok": not details} + + if persist: + try: + set_state( + project_name="atocore", category="status", + key="integrity_check_result", + value=_json.dumps(result), + source="integrity check endpoint", + ) + except Exception as e: + log.warning("integrity_check_state_write_failed", error=str(e)) + + if details: + try: + from atocore.observability.alerts import emit_alert + emit_alert( + severity="warning", + title="Integrity drift detected", + message="; ".join(details), + context={k: v for k, v in findings.items() if not k.endswith("_count")}, + ) + except Exception: + pass + + return result + + @router.post("/memory/{memory_id}/promote") def api_promote_memory(memory_id: str) -> dict: """Promote a candidate memory to active (Phase 9 Commit C)."""