diff --git a/deploy/dalidou/batch-extract.sh b/deploy/dalidou/batch-extract.sh index f6faa63..893db8f 100644 --- a/deploy/dalidou/batch-extract.sh +++ b/deploy/dalidou/batch-extract.sh @@ -150,4 +150,65 @@ print(f'Pipeline summary persisted: {json.dumps(summary)}') log "WARN: pipeline summary persistence failed (non-blocking)" } +# Step G: Integrity check (Phase 4 V1) +log "Step G: integrity check" +python3 "$APP_DIR/scripts/integrity_check.py" \ + --base-url "$ATOCORE_URL" \ + 2>&1 || { + log "WARN: integrity check failed (non-blocking)" +} + +# Step H: Pipeline-level alerts — detect conditions that warrant attention +log "Step H: pipeline alerts" +python3 -c " +import json, os, sys, urllib.request +sys.path.insert(0, '$APP_DIR/src') +from atocore.observability.alerts import emit_alert + +base = '$ATOCORE_URL' + +def get_state(project='atocore'): + try: + req = urllib.request.Request(f'{base}/project/state/{project}') + resp = urllib.request.urlopen(req, timeout=10) + return json.loads(resp.read()).get('entries', []) + except Exception: + return [] + +def get_dashboard(): + try: + req = urllib.request.Request(f'{base}/admin/dashboard') + resp = urllib.request.urlopen(req, timeout=10) + return json.loads(resp.read()) + except Exception: + return {} + +state = {(e['category'], e['key']): e['value'] for e in get_state()} +dash = get_dashboard() + +# Harness regression check +harness_raw = state.get(('status', 'retrieval_harness_result')) +if harness_raw: + try: + h = json.loads(harness_raw) + passed, total = h.get('passed', 0), h.get('total', 0) + if total > 0: + rate = passed / total + if rate < 0.85: + emit_alert('warning', 'Retrieval harness below 85%', + f'Only {passed}/{total} fixtures passing ({rate:.0%}). Failures: {h.get(\"failures\", [])[:5]}', + context={'pass_rate': rate}) + except Exception: + pass + +# Candidate queue pileup +candidates = dash.get('memories', {}).get('candidates', 0) +if candidates > 200: + emit_alert('warning', 'Candidate queue not draining', + f'{candidates} candidates pending. Auto-triage may be stuck or rate-limited.', + context={'candidates': candidates}) + +print('pipeline alerts check complete') +" 2>&1 || true + log "=== AtoCore batch extraction + triage complete ===" diff --git a/scripts/integrity_check.py b/scripts/integrity_check.py new file mode 100644 index 0000000..68043a2 --- /dev/null +++ b/scripts/integrity_check.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +"""Nightly integrity check for AtoCore (Phase 4 Robustness V1). + +Scans the database for drift conditions that indicate something is +silently broken: + - Memories referencing a non-existent source chunk + - Active memories with duplicate content within the same project+type + - Project-state entries with invalid project_id + - Orphaned source chunks whose parent document was deleted + - Memory count vs vector count parity drift (Chroma vs SQLite) + +Findings are written to project state +(atocore/status/integrity_check_result) and surfaced on the dashboard. +Any non-empty finding emits a warning alert via the alerts framework. + +Usage: + python3 scripts/integrity_check.py [--base-url URL] [--dry-run] +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys + +# Make src/ importable when run from repo root +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--base-url", default=os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100")) + parser.add_argument("--dry-run", action="store_true", help="Report without writing findings to state") + args = parser.parse_args() + + from atocore.models.database import get_connection + from atocore.observability.alerts import emit_alert + + findings: dict = { + "orphan_chunk_refs": 0, + "duplicate_active": 0, + "orphan_project_state": 0, + "orphan_chunks": 0, + "memory_count": 0, + "active_memory_count": 0, + } + details: list[str] = [] + + with get_connection() as conn: + # 1) Memories referencing a non-existent source_chunk_id + row = conn.execute( + "SELECT COUNT(*) FROM memories m " + "WHERE m.source_chunk_id IS NOT NULL " + "AND m.source_chunk_id != '' " + "AND NOT EXISTS (SELECT 1 FROM source_chunks c WHERE c.id = m.source_chunk_id)" + ).fetchone() + findings["orphan_chunk_refs"] = int(row[0] or 0) + if findings["orphan_chunk_refs"]: + details.append(f"{findings['orphan_chunk_refs']} memory(ies) reference a missing source_chunk_id") + + # 2) Duplicate active memories (same content + project + type) + dup_rows = conn.execute( + "SELECT memory_type, project, content, COUNT(*) AS n " + "FROM memories WHERE status = 'active' " + "GROUP BY memory_type, project, content HAVING n > 1" + ).fetchall() + findings["duplicate_active"] = sum(int(r[3]) - 1 for r in dup_rows) + if findings["duplicate_active"]: + details.append(f"{findings['duplicate_active']} duplicate active memory row(s) across {len(dup_rows)} group(s)") + + # 3) Project-state entries with invalid project_id + row = conn.execute( + "SELECT COUNT(*) FROM project_state ps " + "WHERE NOT EXISTS (SELECT 1 FROM projects p WHERE p.id = ps.project_id)" + ).fetchone() + findings["orphan_project_state"] = int(row[0] or 0) + if findings["orphan_project_state"]: + details.append(f"{findings['orphan_project_state']} project_state row(s) reference a missing project") + + # 4) Orphaned source chunks + row = conn.execute( + "SELECT COUNT(*) FROM source_chunks c " + "WHERE NOT EXISTS (SELECT 1 FROM source_documents d WHERE d.id = c.document_id)" + ).fetchone() + findings["orphan_chunks"] = int(row[0] or 0) + if findings["orphan_chunks"]: + details.append(f"{findings['orphan_chunks']} source chunk(s) have no parent document") + + # 5) Memory counts (context for dashboard) + findings["memory_count"] = int(conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]) + findings["active_memory_count"] = int( + conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0] + ) + + # Compose result + result = { + "findings": findings, + "details": details, + "ok": not details, + } + + print(json.dumps(result, indent=2)) + + # Write to project state unless dry-run + if not args.dry_run: + try: + import urllib.request + body = json.dumps({ + "project": "atocore", + "category": "status", + "key": "integrity_check_result", + "value": json.dumps(result), + "source": "integrity check", + }).encode("utf-8") + req = urllib.request.Request( + f"{args.base_url}/project/state", + data=body, + method="POST", + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"WARN: state write failed: {e}", file=sys.stderr) + + # Raise an alert if anything drifted + if details: + emit_alert( + severity="warning", + title="Integrity drift detected", + message="; ".join(details), + context={k: v for k, v in findings.items() if not k.endswith("_count")}, + ) + + +if __name__ == "__main__": + main() diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index 0b92a5e..4f9f990 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -543,17 +543,33 @@ def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict: @router.delete("/memory/{memory_id}") def api_invalidate_memory(memory_id: str) -> dict: """Invalidate a memory (error correction).""" - success = invalidate_memory(memory_id) + success = invalidate_memory(memory_id, actor="api-http") if not success: raise HTTPException(status_code=404, detail="Memory not found") return {"status": "invalidated", "id": memory_id} +@router.get("/memory/{memory_id}/audit") +def api_memory_audit(memory_id: str, limit: int = 100) -> dict: + """Return the audit history for a specific memory (newest first).""" + from atocore.memory.service import get_memory_audit + entries = get_memory_audit(memory_id, limit=limit) + return {"memory_id": memory_id, "entries": entries, "count": len(entries)} + + +@router.get("/admin/audit/recent") +def api_recent_audit(limit: int = 50) -> dict: + """Return recent memory_audit entries across all memories (newest first).""" + from atocore.memory.service import get_recent_audit + entries = get_recent_audit(limit=limit) + return {"entries": entries, "count": len(entries)} + + @router.post("/memory/{memory_id}/promote") def api_promote_memory(memory_id: str) -> dict: """Promote a candidate memory to active (Phase 9 Commit C).""" try: - success = promote_memory(memory_id) + success = promote_memory(memory_id, actor="api-http") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not success: @@ -567,7 +583,7 @@ def api_promote_memory(memory_id: str) -> dict: @router.post("/memory/{memory_id}/reject") def api_reject_candidate_memory(memory_id: str) -> dict: """Reject a candidate memory (Phase 9 Commit C review queue).""" - success = reject_candidate_memory(memory_id) + success = reject_candidate_memory(memory_id, actor="api-http") if not success: raise HTTPException( status_code=404, @@ -1046,33 +1062,47 @@ def api_dashboard() -> dict: # Pipeline health from project state pipeline: dict = {} extract_state: dict = {} + integrity: dict = {} + alerts: dict = {} try: state_entries = get_state("atocore") for entry in state_entries: - if entry.category != "status": - continue - if entry.key == "last_extract_batch_run": - extract_state["last_run"] = entry.value - elif entry.key == "pipeline_last_run": - pipeline["last_run"] = entry.value + if entry.category == "status": + if entry.key == "last_extract_batch_run": + extract_state["last_run"] = entry.value + elif entry.key == "pipeline_last_run": + pipeline["last_run"] = entry.value + try: + last = _dt.fromisoformat(entry.value.replace("Z", "+00:00")) + delta = _dt.now(_tz.utc) - last + pipeline["hours_since_last_run"] = round( + delta.total_seconds() / 3600, 1 + ) + except Exception: + pass + elif entry.key == "pipeline_summary": + try: + pipeline["summary"] = _json.loads(entry.value) + except Exception: + pipeline["summary_raw"] = entry.value + elif entry.key == "retrieval_harness_result": + try: + pipeline["harness"] = _json.loads(entry.value) + except Exception: + pipeline["harness_raw"] = entry.value + elif entry.key == "integrity_check_result": + try: + integrity = _json.loads(entry.value) + except Exception: + pass + elif entry.category == "alert": + # keys like "last_info", "last_warning", "last_critical" try: - last = _dt.fromisoformat(entry.value.replace("Z", "+00:00")) - delta = _dt.now(_tz.utc) - last - pipeline["hours_since_last_run"] = round( - delta.total_seconds() / 3600, 1 - ) + payload = _json.loads(entry.value) except Exception: - pass - elif entry.key == "pipeline_summary": - try: - pipeline["summary"] = _json.loads(entry.value) - except Exception: - pipeline["summary_raw"] = entry.value - elif entry.key == "retrieval_harness_result": - try: - pipeline["harness"] = _json.loads(entry.value) - except Exception: - pipeline["harness_raw"] = entry.value + payload = {"raw": entry.value} + severity = entry.key.replace("last_", "") + alerts[severity] = payload except Exception: pass @@ -1107,6 +1137,14 @@ def api_dashboard() -> dict: elif len(candidates) > 20: triage["notice"] = f"{len(candidates)} candidates awaiting triage." + # Recent audit activity (Phase 4 V1) — last 10 mutations for operator + recent_audit: list[dict] = [] + try: + from atocore.memory.service import get_recent_audit as _gra + recent_audit = _gra(limit=10) + except Exception: + pass + return { "memories": { "active": len(active), @@ -1123,6 +1161,9 @@ def api_dashboard() -> dict: "extraction_pipeline": extract_state, "pipeline": pipeline, "triage": triage, + "integrity": integrity, + "alerts": alerts, + "recent_audit": recent_audit, } diff --git a/src/atocore/memory/service.py b/src/atocore/memory/service.py index 07cb2a2..1d880f4 100644 --- a/src/atocore/memory/service.py +++ b/src/atocore/memory/service.py @@ -67,6 +67,106 @@ class Memory: valid_until: str = "" # ISO UTC; empty = permanent +def _audit_memory( + memory_id: str, + action: str, + actor: str = "api", + before: dict | None = None, + after: dict | None = None, + note: str = "", +) -> None: + """Append an entry to memory_audit. + + Phase 4 Robustness V1. Every memory mutation flows through this + helper so we can answer "how did this memory get to its current + state?" and "when did we learn X?". + + ``action`` is a short verb: created, updated, promoted, rejected, + superseded, invalidated, reinforced, auto_promoted, expired. + ``actor`` identifies the caller: api (default), auto-triage, + human-triage, host-cron, reinforcement, phase10-auto-promote, + etc. ``before`` / ``after`` are field snapshots (JSON-serialized). + Fail-open: a logging failure never breaks the mutation itself. + """ + import json as _json + try: + with get_connection() as conn: + conn.execute( + "INSERT INTO memory_audit (id, memory_id, action, actor, " + "before_json, after_json, note) VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + str(uuid.uuid4()), + memory_id, + action, + actor or "api", + _json.dumps(before or {}), + _json.dumps(after or {}), + (note or "")[:500], + ), + ) + except Exception as e: + log.warning("memory_audit_failed", memory_id=memory_id, action=action, error=str(e)) + + +def get_memory_audit(memory_id: str, limit: int = 100) -> list[dict]: + """Fetch audit entries for a memory, newest first.""" + import json as _json + with get_connection() as conn: + rows = conn.execute( + "SELECT id, memory_id, action, actor, before_json, after_json, note, timestamp " + "FROM memory_audit WHERE memory_id = ? ORDER BY timestamp DESC LIMIT ?", + (memory_id, limit), + ).fetchall() + out = [] + for r in rows: + try: + before = _json.loads(r["before_json"] or "{}") + except Exception: + before = {} + try: + after = _json.loads(r["after_json"] or "{}") + except Exception: + after = {} + out.append({ + "id": r["id"], + "memory_id": r["memory_id"], + "action": r["action"], + "actor": r["actor"] or "api", + "before": before, + "after": after, + "note": r["note"] or "", + "timestamp": r["timestamp"], + }) + return out + + +def get_recent_audit(limit: int = 50) -> list[dict]: + """Fetch recent memory_audit entries across all memories, newest first.""" + import json as _json + with get_connection() as conn: + rows = conn.execute( + "SELECT id, memory_id, action, actor, before_json, after_json, note, timestamp " + "FROM memory_audit ORDER BY timestamp DESC LIMIT ?", + (limit,), + ).fetchall() + out = [] + for r in rows: + try: + after = _json.loads(r["after_json"] or "{}") + except Exception: + after = {} + out.append({ + "id": r["id"], + "memory_id": r["memory_id"], + "action": r["action"], + "actor": r["actor"] or "api", + "note": r["note"] or "", + "timestamp": r["timestamp"], + "content_preview": (after.get("content") or "")[:120], + }) + return out + + def _normalize_tags(tags) -> list[str]: """Coerce a tags value (list, JSON string, None) to a clean lowercase list.""" import json as _json @@ -98,6 +198,7 @@ def create_memory( status: str = "active", domain_tags: list[str] | None = None, valid_until: str = "", + actor: str = "api", ) -> Memory: """Create a new memory entry. @@ -160,6 +261,21 @@ def create_memory( valid_until=valid_until or "", ) + _audit_memory( + memory_id=memory_id, + action="created", + actor=actor, + after={ + "memory_type": memory_type, + "content": content, + "project": project, + "status": status, + "confidence": confidence, + "domain_tags": tags, + "valid_until": valid_until or "", + }, + ) + return Memory( id=memory_id, memory_type=memory_type, @@ -235,6 +351,8 @@ def update_memory( memory_type: str | None = None, domain_tags: list[str] | None = None, valid_until: str | None = None, + actor: str = "api", + note: str = "", ) -> bool: """Update an existing memory.""" import json as _json @@ -258,31 +376,48 @@ def update_memory( if duplicate: raise ValueError("Update would create a duplicate active memory") + # Capture before-state for audit + before_snapshot = { + "content": existing["content"], + "status": existing["status"], + "confidence": existing["confidence"], + "memory_type": existing["memory_type"], + } + after_snapshot = dict(before_snapshot) + updates = [] params: list = [] if content is not None: updates.append("content = ?") params.append(content) + after_snapshot["content"] = content if confidence is not None: updates.append("confidence = ?") params.append(confidence) + after_snapshot["confidence"] = confidence if status is not None: if status not in MEMORY_STATUSES: raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}") updates.append("status = ?") params.append(status) + after_snapshot["status"] = status if memory_type is not None: if memory_type not in MEMORY_TYPES: raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}") updates.append("memory_type = ?") params.append(memory_type) + after_snapshot["memory_type"] = memory_type if domain_tags is not None: + norm_tags = _normalize_tags(domain_tags) updates.append("domain_tags = ?") - params.append(_json.dumps(_normalize_tags(domain_tags))) + params.append(_json.dumps(norm_tags)) + after_snapshot["domain_tags"] = norm_tags if valid_until is not None: + vu = valid_until.strip() or None updates.append("valid_until = ?") - params.append(valid_until.strip() or None) + params.append(vu) + after_snapshot["valid_until"] = vu or "" if not updates: return False @@ -297,21 +432,40 @@ def update_memory( if result.rowcount > 0: log.info("memory_updated", memory_id=memory_id) + # Action verb is driven by status change when applicable; otherwise "updated" + if status == "active" and before_snapshot["status"] == "candidate": + action = "promoted" + elif status == "invalid" and before_snapshot["status"] == "candidate": + action = "rejected" + elif status == "invalid": + action = "invalidated" + elif status == "superseded": + action = "superseded" + else: + action = "updated" + _audit_memory( + memory_id=memory_id, + action=action, + actor=actor, + before=before_snapshot, + after=after_snapshot, + note=note, + ) return True return False -def invalidate_memory(memory_id: str) -> bool: +def invalidate_memory(memory_id: str, actor: str = "api") -> bool: """Mark a memory as invalid (error correction).""" - return update_memory(memory_id, status="invalid") + return update_memory(memory_id, status="invalid", actor=actor) -def supersede_memory(memory_id: str) -> bool: +def supersede_memory(memory_id: str, actor: str = "api") -> bool: """Mark a memory as superseded (replaced by newer info).""" - return update_memory(memory_id, status="superseded") + return update_memory(memory_id, status="superseded", actor=actor) -def promote_memory(memory_id: str) -> bool: +def promote_memory(memory_id: str, actor: str = "api", note: str = "") -> bool: """Promote a candidate memory to active (Phase 9 Commit C review queue). Returns False if the memory does not exist or is not currently a @@ -326,10 +480,10 @@ def promote_memory(memory_id: str) -> bool: return False if row["status"] != "candidate": return False - return update_memory(memory_id, status="active") + return update_memory(memory_id, status="active", actor=actor, note=note) -def reject_candidate_memory(memory_id: str) -> bool: +def reject_candidate_memory(memory_id: str, actor: str = "api", note: str = "") -> bool: """Reject a candidate memory (Phase 9 Commit C). Sets the candidate's status to ``invalid`` so it drops out of the @@ -344,7 +498,7 @@ def reject_candidate_memory(memory_id: str) -> bool: return False if row["status"] != "candidate": return False - return update_memory(memory_id, status="invalid") + return update_memory(memory_id, status="invalid", actor=actor, note=note) def reinforce_memory( @@ -385,6 +539,17 @@ def reinforce_memory( old_confidence=round(old_confidence, 4), new_confidence=round(new_confidence, 4), ) + # Reinforcement writes an audit row per bump. Reinforcement fires often + # (every captured interaction); this lets you trace which interactions + # kept which memories alive. Could become chatty but is invaluable for + # decay/cold-memory analysis. If it becomes an issue, throttle here. + _audit_memory( + memory_id=memory_id, + action="reinforced", + actor="reinforcement", + before={"confidence": old_confidence}, + after={"confidence": new_confidence}, + ) return True, old_confidence, new_confidence @@ -420,7 +585,11 @@ def auto_promote_reinforced( for row in rows: mid = row["id"] - ok = promote_memory(mid) + ok = promote_memory( + mid, + actor="phase10-auto-promote", + note=f"ref_count={row['reference_count']} confidence={row['confidence']:.2f}", + ) if ok: promoted.append(mid) log.info( @@ -459,7 +628,11 @@ def expire_stale_candidates( for row in rows: mid = row["id"] - ok = reject_candidate_memory(mid) + ok = reject_candidate_memory( + mid, + actor="candidate-expiry", + note=f"unreinforced for {max_age_days}+ days", + ) if ok: expired.append(mid) log.info("memory_expired", memory_id=mid) diff --git a/src/atocore/models/database.py b/src/atocore/models/database.py index 293d6a9..2dcfd81 100644 --- a/src/atocore/models/database.py +++ b/src/atocore/models/database.py @@ -136,6 +136,30 @@ def _apply_migrations(conn: sqlite3.Connection) -> None: "CREATE INDEX IF NOT EXISTS idx_memories_valid_until ON memories(valid_until)" ) + # Phase 4 (Robustness V1): append-only audit log for memory mutations. + # Every create/update/promote/reject/supersede/invalidate/reinforce/expire/ + # auto_promote writes one row here. before/after are JSON snapshots of the + # relevant fields. actor lets us distinguish auto-triage vs human-triage vs + # api vs cron. This is the "how did this memory get to its current state" + # trail — essential once the brain starts auto-organizing itself. + conn.execute( + """ + CREATE TABLE IF NOT EXISTS memory_audit ( + id TEXT PRIMARY KEY, + memory_id TEXT NOT NULL, + action TEXT NOT NULL, + actor TEXT DEFAULT 'api', + before_json TEXT DEFAULT '{}', + after_json TEXT DEFAULT '{}', + note TEXT DEFAULT '', + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_audit_memory ON memory_audit(memory_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_audit_timestamp ON memory_audit(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_audit_action ON memory_audit(action)") + # Phase 9 Commit A: capture loop columns on the interactions table. # The original schema only carried prompt + project_id + a context_pack # JSON blob. To make interactions a real audit trail of what AtoCore fed diff --git a/src/atocore/observability/alerts.py b/src/atocore/observability/alerts.py new file mode 100644 index 0000000..bbb470a --- /dev/null +++ b/src/atocore/observability/alerts.py @@ -0,0 +1,170 @@ +"""Alert emission framework (Phase 4 Robustness V1). + +One-stop helper to raise operational alerts from any AtoCore code +path. An alert is a structured message about something the operator +should see — harness regression, queue pileup, integrity drift, +pipeline skipped, etc. + +Emission fans out to multiple sinks so a single call touches every +observability channel: + + 1. structlog logger (always) + 2. Append to ``$ATOCORE_ALERT_LOG`` (default ~/atocore-logs/alerts.log) + 3. Write the last alert of each severity to AtoCore project state + (atocore/alert/last_{severity}) so the dashboard can surface it + 4. POST to ``$ATOCORE_ALERT_WEBHOOK`` if set (Discord/Slack/generic) + +All sinks are fail-open — if one fails the others still fire. + +Severity levels (inspired by syslog but simpler): + - ``info`` operational event worth noting + - ``warning`` degraded state, service still works + - ``critical`` something is broken and needs attention + +Environment variables: + ATOCORE_ALERT_LOG override the alerts log file path + ATOCORE_ALERT_WEBHOOK POST JSON alerts here (Discord webhook, etc.) + ATOCORE_BASE_URL AtoCore API for project-state write (default localhost:8100) +""" + +from __future__ import annotations + +import json +import os +import threading +import urllib.error +import urllib.request +from datetime import datetime, timezone +from pathlib import Path + +from atocore.observability.logger import get_logger + +log = get_logger("alerts") + +SEVERITIES = {"info", "warning", "critical"} + + +def _default_alert_log() -> Path: + explicit = os.environ.get("ATOCORE_ALERT_LOG") + if explicit: + return Path(explicit) + return Path.home() / "atocore-logs" / "alerts.log" + + +def _append_log(severity: str, title: str, message: str, context: dict | None) -> None: + path = _default_alert_log() + try: + path.parent.mkdir(parents=True, exist_ok=True) + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + line = f"[{ts}] [{severity.upper()}] {title}: {message}" + if context: + line += f" {json.dumps(context, ensure_ascii=True)[:500]}" + line += "\n" + with open(path, "a", encoding="utf-8") as f: + f.write(line) + except Exception as e: + log.warning("alert_log_write_failed", error=str(e)) + + +def _write_state(severity: str, title: str, message: str, ts: str) -> None: + """Record the most-recent alert per severity into project_state. + + Uses the internal ``set_state`` helper directly so we work even + when the HTTP API isn't available (e.g. called from cron scripts + that import atocore as a library). + """ + try: + from atocore.context.project_state import set_state + + set_state( + project_name="atocore", + category="alert", + key=f"last_{severity}", + value=json.dumps({"title": title, "message": message[:400], "timestamp": ts}), + source="alert framework", + ) + except Exception as e: + log.warning("alert_state_write_failed", error=str(e)) + + +def _post_webhook(severity: str, title: str, message: str, context: dict | None, ts: str) -> None: + url = os.environ.get("ATOCORE_ALERT_WEBHOOK") + if not url: + return + + # Auto-detect Discord webhook shape for nicer formatting + if "discord.com/api/webhooks" in url or "discordapp.com/api/webhooks" in url: + emoji = {"info": ":information_source:", "warning": ":warning:", "critical": ":rotating_light:"}.get(severity, "") + body = { + "content": f"{emoji} **AtoCore {severity}**: {title}", + "embeds": [{ + "description": message[:1800], + "timestamp": ts, + "fields": [ + {"name": k, "value": str(v)[:200], "inline": True} + for k, v in (context or {}).items() + ][:10], + }], + } + else: + body = { + "severity": severity, + "title": title, + "message": message, + "context": context or {}, + "timestamp": ts, + } + + def _fire(): + try: + req = urllib.request.Request( + url, + data=json.dumps(body).encode("utf-8"), + method="POST", + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=8) + except Exception as e: + log.warning("alert_webhook_failed", error=str(e)) + + threading.Thread(target=_fire, daemon=True).start() + + +def emit_alert( + severity: str, + title: str, + message: str, + context: dict | None = None, +) -> None: + """Emit an alert to all configured sinks. + + Fail-open: any single sink failure is logged but does not prevent + other sinks from firing. + """ + severity = (severity or "info").lower() + if severity not in SEVERITIES: + severity = "info" + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + # Sink 1: structlog — always + logger_fn = { + "info": log.info, + "warning": log.warning, + "critical": log.error, + }[severity] + logger_fn("alert", title=title, message=message[:500], **(context or {})) + + # Sinks 2-4: fail-open, each wrapped + try: + _append_log(severity, title, message, context) + except Exception: + pass + try: + _write_state(severity, title, message, ts) + except Exception: + pass + try: + _post_webhook(severity, title, message, context, ts) + except Exception: + pass diff --git a/tests/test_alerts.py b/tests/test_alerts.py new file mode 100644 index 0000000..410323f --- /dev/null +++ b/tests/test_alerts.py @@ -0,0 +1,58 @@ +"""Tests for the Phase 4 alerts framework.""" + +from __future__ import annotations + +import os +import tempfile +from pathlib import Path + +import pytest + +import atocore.config as _config + + +@pytest.fixture(autouse=True) +def isolated_env(monkeypatch): + """Isolate alerts sinks per test.""" + tmpdir = tempfile.mkdtemp() + log_file = Path(tmpdir) / "alerts.log" + monkeypatch.setenv("ATOCORE_ALERT_LOG", str(log_file)) + monkeypatch.delenv("ATOCORE_ALERT_WEBHOOK", raising=False) + + # Data dir for any state writes + monkeypatch.setenv("ATOCORE_DATA_DIR", tmpdir) + _config.settings = _config.Settings() + + from atocore.models.database import init_db + init_db() + + yield {"tmpdir": tmpdir, "log_file": log_file} + + +def test_emit_alert_writes_log_file(isolated_env): + from atocore.observability.alerts import emit_alert + + emit_alert("warning", "test title", "test message body", context={"count": 5}) + + content = isolated_env["log_file"].read_text(encoding="utf-8") + assert "test title" in content + assert "test message body" in content + assert "WARNING" in content + assert '"count": 5' in content + + +def test_emit_alert_invalid_severity_falls_back_to_info(isolated_env): + from atocore.observability.alerts import emit_alert + + emit_alert("made-up-severity", "t", "m") + content = isolated_env["log_file"].read_text(encoding="utf-8") + assert "INFO" in content + + +def test_emit_alert_fails_open_on_log_write_error(monkeypatch, isolated_env): + """An unwritable log path should not crash the emit.""" + from atocore.observability.alerts import emit_alert + + monkeypatch.setenv("ATOCORE_ALERT_LOG", "/nonexistent/path/that/definitely/is/not/writable/alerts.log") + # Must not raise + emit_alert("info", "t", "m") diff --git a/tests/test_memory.py b/tests/test_memory.py index 5ae0673..71c7d1c 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -264,6 +264,82 @@ def test_expire_stale_candidates(isolated_db): assert mem["status"] == "invalid" +# --- Phase 4: memory_audit log --- + + +def test_audit_create_logs_entry(isolated_db): + from atocore.memory.service import create_memory, get_memory_audit + + mem = create_memory("knowledge", "test content for audit", actor="test-harness") + audit = get_memory_audit(mem.id) + assert len(audit) >= 1 + latest = audit[0] + assert latest["action"] == "created" + assert latest["actor"] == "test-harness" + assert latest["after"]["content"] == "test content for audit" + + +def test_audit_promote_logs_entry(isolated_db): + from atocore.memory.service import create_memory, get_memory_audit, promote_memory + + mem = create_memory("knowledge", "candidate for promote", status="candidate") + promote_memory(mem.id, actor="test-triage") + audit = get_memory_audit(mem.id) + actions = [a["action"] for a in audit] + assert "promoted" in actions + promote_entry = next(a for a in audit if a["action"] == "promoted") + assert promote_entry["actor"] == "test-triage" + assert promote_entry["before"]["status"] == "candidate" + assert promote_entry["after"]["status"] == "active" + + +def test_audit_reject_logs_entry(isolated_db): + from atocore.memory.service import create_memory, get_memory_audit, reject_candidate_memory + + mem = create_memory("knowledge", "candidate for reject", status="candidate") + reject_candidate_memory(mem.id, actor="test-triage", note="stale") + audit = get_memory_audit(mem.id) + actions = [a["action"] for a in audit] + assert "rejected" in actions + reject_entry = next(a for a in audit if a["action"] == "rejected") + assert reject_entry["note"] == "stale" + + +def test_audit_update_captures_before_after(isolated_db): + from atocore.memory.service import create_memory, get_memory_audit, update_memory + + mem = create_memory("knowledge", "original content", confidence=0.5) + update_memory(mem.id, content="updated content", confidence=0.9, actor="human-edit") + audit = get_memory_audit(mem.id) + update_entries = [a for a in audit if a["action"] == "updated"] + assert len(update_entries) >= 1 + u = update_entries[0] + assert u["before"]["content"] == "original content" + assert u["after"]["content"] == "updated content" + assert u["before"]["confidence"] == 0.5 + assert u["after"]["confidence"] == 0.9 + + +def test_audit_reinforce_logs_entry(isolated_db): + from atocore.memory.service import create_memory, get_memory_audit, reinforce_memory + + mem = create_memory("knowledge", "reinforced mem", confidence=0.5) + reinforce_memory(mem.id, confidence_delta=0.02) + audit = get_memory_audit(mem.id) + actions = [a["action"] for a in audit] + assert "reinforced" in actions + + +def test_recent_audit_returns_cross_memory_entries(isolated_db): + from atocore.memory.service import create_memory, get_recent_audit + + m1 = create_memory("knowledge", "mem one content", actor="harness") + m2 = create_memory("knowledge", "mem two content", actor="harness") + recent = get_recent_audit(limit=10) + ids = {e["memory_id"] for e in recent} + assert m1.id in ids and m2.id in ids + + # --- Phase 3: domain_tags + valid_until ---