Files
ATOCore/scripts/auto_triage.py

248 lines
9.0 KiB
Python
Raw Normal View History

"""Auto-triage: LLM second-pass over candidate memories.
Fetches all status=candidate memories from the AtoCore API, asks
a triage model (via claude -p) to classify each as promote / reject /
needs_human, and executes the verdict via the promote/reject endpoints.
Only needs_human candidates remain in the queue for manual review.
Trust model:
- Auto-promote: model says promote AND confidence >= 0.8 AND no
duplicate content in existing active memories
- Auto-reject: model says reject
- needs_human: everything else stays in queue
Runs host-side (same as batch extraction) because it needs the
claude CLI. Intended to be called after batch-extract.sh in the
nightly cron, or manually.
Usage:
python3 scripts/auto_triage.py --base-url http://localhost:8100
python3 scripts/auto_triage.py --dry-run # preview without executing
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
import urllib.error
import urllib.parse
import urllib.request
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
DEFAULT_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet")
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60"))
AUTO_PROMOTE_MIN_CONFIDENCE = 0.8
TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review.
You will receive:
- The candidate memory content and type
- A list of existing active memories for the same project (to check for duplicates)
For each candidate, output exactly one JSON object:
{"verdict": "promote|reject|needs_human", "confidence": 0.0-1.0, "reason": "one sentence"}
Rules:
1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping.
2. REJECT when the candidate is:
- A stale point-in-time snapshot ("live SHA is X", "36 active memories")
- An implementation detail too granular to be useful as standalone context
- A planned-but-not-implemented feature description
- A duplicate or near-duplicate of an existing active memory
- A session observation or conversational filler
- A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory
3. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates).
4. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field."""
_sandbox_cwd = None
def get_sandbox_cwd():
global _sandbox_cwd
if _sandbox_cwd is None:
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-")
return _sandbox_cwd
def api_get(base_url, path, timeout=10):
req = urllib.request.Request(f"{base_url}{path}")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def api_post(base_url, path, body=None, timeout=10):
data = json.dumps(body or {}).encode("utf-8")
req = urllib.request.Request(
f"{base_url}{path}", method="POST",
headers={"Content-Type": "application/json"}, data=data,
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def fetch_active_memories_for_project(base_url, project):
"""Fetch active memories for dedup checking."""
params = "active_only=true&limit=50"
if project:
params += f"&project={urllib.parse.quote(project)}"
result = api_get(base_url, f"/memory?{params}")
return result.get("memories", [])
def triage_one(candidate, active_memories, model, timeout_s):
"""Ask the triage model to classify one candidate."""
if not shutil.which("claude"):
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
active_summary = "\n".join(
f"- [{m['memory_type']}] {m['content'][:150]}"
for m in active_memories[:20]
) or "(no active memories for this project)"
user_message = (
f"CANDIDATE TO TRIAGE:\n"
f" type: {candidate['memory_type']}\n"
f" project: {candidate.get('project') or '(none)'}\n"
f" content: {candidate['content']}\n\n"
f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n"
f"Return the JSON verdict now."
)
args = [
"claude", "-p",
"--model", model,
"--append-system-prompt", TRIAGE_SYSTEM_PROMPT,
"--disable-slash-commands",
user_message,
]
try:
completed = subprocess.run(
args, capture_output=True, text=True,
timeout=timeout_s, cwd=get_sandbox_cwd(),
encoding="utf-8", errors="replace",
)
except subprocess.TimeoutExpired:
return {"verdict": "needs_human", "confidence": 0.0, "reason": "triage model timed out"}
except Exception as exc:
return {"verdict": "needs_human", "confidence": 0.0, "reason": f"subprocess error: {exc}"}
if completed.returncode != 0:
return {"verdict": "needs_human", "confidence": 0.0, "reason": f"claude exit {completed.returncode}"}
raw = (completed.stdout or "").strip()
return parse_verdict(raw)
def parse_verdict(raw):
"""Parse the triage model's JSON verdict."""
text = raw.strip()
if text.startswith("```"):
text = text.strip("`")
nl = text.find("\n")
if nl >= 0:
text = text[nl + 1:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
if not text.lstrip().startswith("{"):
start = text.find("{")
end = text.rfind("}")
if start >= 0 and end > start:
text = text[start:end + 1]
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"}
verdict = str(parsed.get("verdict", "needs_human")).strip().lower()
if verdict not in {"promote", "reject", "needs_human"}:
verdict = "needs_human"
confidence = parsed.get("confidence", 0.5)
try:
confidence = max(0.0, min(1.0, float(confidence)))
except (TypeError, ValueError):
confidence = 0.5
reason = str(parsed.get("reason", "")).strip()[:200]
return {"verdict": verdict, "confidence": confidence, "reason": reason}
def main():
parser = argparse.ArgumentParser(description="Auto-triage candidate memories")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--model", default=DEFAULT_MODEL)
parser.add_argument("--dry-run", action="store_true", help="preview without executing")
args = parser.parse_args()
# Fetch candidates
result = api_get(args.base_url, "/memory?status=candidate&limit=100")
candidates = result.get("memories", [])
print(f"candidates: {len(candidates)} model: {args.model} dry_run: {args.dry_run}")
if not candidates:
print("queue empty, nothing to triage")
return
# Cache active memories per project for dedup
active_cache = {}
promoted = rejected = needs_human = errors = 0
for i, cand in enumerate(candidates, 1):
project = cand.get("project") or ""
if project not in active_cache:
active_cache[project] = fetch_active_memories_for_project(args.base_url, project)
verdict_obj = triage_one(cand, active_cache[project], args.model, DEFAULT_TIMEOUT_S)
verdict = verdict_obj["verdict"]
conf = verdict_obj["confidence"]
reason = verdict_obj["reason"]
mid = cand["id"]
label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]"
if verdict == "promote" and conf >= AUTO_PROMOTE_MIN_CONFIDENCE:
if args.dry_run:
print(f" WOULD PROMOTE {label} conf={conf:.2f} {reason}")
else:
try:
api_post(args.base_url, f"/memory/{mid}/promote")
print(f" PROMOTED {label} conf={conf:.2f} {reason}")
active_cache[project].append(cand)
except Exception:
errors += 1
promoted += 1
elif verdict == "reject":
if args.dry_run:
print(f" WOULD REJECT {label} conf={conf:.2f} {reason}")
else:
try:
api_post(args.base_url, f"/memory/{mid}/reject")
print(f" REJECTED {label} conf={conf:.2f} {reason}")
except Exception:
errors += 1
rejected += 1
else:
print(f" NEEDS_HUMAN {label} conf={conf:.2f} {reason}")
needs_human += 1
print(f"\npromoted={promoted} rejected={rejected} needs_human={needs_human} errors={errors}")
if __name__ == "__main__":
main()