Makes human triage sustainable. Before: command-line-only review,
auto-triage stopped after 100 candidates/run. Now:
1. Web UI at /admin/triage
- Lists all pending candidates with inline promote/reject/edit
- Edit content in-place before promoting (PUT /memory/{id})
- Change type via dropdown
- Keyboard shortcuts: Y=promote, N=reject, E=edit, S=scroll-next
- Cards fade out after action, queue count updates live
- Zero JS framework — vanilla fetch + event delegation
2. auto_triage.py drains queue
- Loops up to 20 batches (default) of 100 candidates each
- Tracks seen IDs so needs_human items don't reprocess
- Exits cleanly when queue empty
- Nightly cron naturally drains everything
3. Dashboard + wiki surface triage queue
- Dashboard /admin/dashboard: new "triage" section with pending
count + /admin/triage URL + warning/notice severity levels
- Wiki homepage: prominent callout "N candidates awaiting triage —
review now" linking to /admin/triage, styled with triage-warning
(>50) or triage-notice (>20) CSS
Pattern: human intervenes only when AI can't decide. The UI makes
that intervention fast (20 candidates in 60 seconds). Nightly
auto-triage drains the queue so the human queue stays bounded.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
291 lines
12 KiB
Python
291 lines
12 KiB
Python
"""Auto-triage: LLM second-pass over candidate memories.
|
|
|
|
Fetches all status=candidate memories from the AtoCore API, asks
|
|
a triage model (via claude -p) to classify each as promote / reject /
|
|
needs_human, and executes the verdict via the promote/reject endpoints.
|
|
Only needs_human candidates remain in the queue for manual review.
|
|
|
|
Trust model:
|
|
- Auto-promote: model says promote AND confidence >= 0.8 AND no
|
|
duplicate content in existing active memories
|
|
- Auto-reject: model says reject
|
|
- needs_human: everything else stays in queue
|
|
|
|
Runs host-side (same as batch extraction) because it needs the
|
|
claude CLI. Intended to be called after batch-extract.sh in the
|
|
nightly cron, or manually.
|
|
|
|
Usage:
|
|
|
|
python3 scripts/auto_triage.py --base-url http://localhost:8100
|
|
python3 scripts/auto_triage.py --dry-run # preview without executing
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
DEFAULT_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet")
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60"))
|
|
AUTO_PROMOTE_MIN_CONFIDENCE = 0.8
|
|
|
|
TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review.
|
|
|
|
You will receive:
|
|
- The candidate memory content and type
|
|
- A list of existing active memories for the same project (to check for duplicates)
|
|
|
|
For each candidate, output exactly one JSON object:
|
|
|
|
{"verdict": "promote|reject|needs_human|contradicts", "confidence": 0.0-1.0, "reason": "one sentence", "conflicts_with": "id of existing memory if contradicts"}
|
|
|
|
Rules:
|
|
|
|
1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping.
|
|
|
|
2. REJECT when the candidate is:
|
|
- A stale point-in-time snapshot ("live SHA is X", "36 active memories")
|
|
- An implementation detail too granular to be useful as standalone context
|
|
- A planned-but-not-implemented feature description
|
|
- A duplicate or near-duplicate of an existing active memory
|
|
- A session observation or conversational filler
|
|
- A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory
|
|
|
|
3. CONTRADICTS when the candidate *conflicts* with an existing active memory (not a duplicate, but states something that can't both be true). Set `conflicts_with` to the existing memory id. This flags the tension for human review instead of silently rejecting or double-storing. Examples: "Option A selected" vs "Option B selected" for the same decision; "uses material X" vs "uses material Y" for the same component.
|
|
|
|
4. OPENCLAW-CURATED content (candidate content starts with "From OpenClaw/"): apply a MUCH LOWER bar. OpenClaw's SOUL.md, USER.md, MEMORY.md, MODEL-ROUTING.md, and dated memory/*.md files are ALREADY curated by OpenClaw as canonical continuity. Promote unless clearly wrong or a genuine duplicate. Do NOT reject OpenClaw content as "process rule belongs elsewhere" or "session log" — that's exactly what AtoCore wants to absorb. Session events, project updates, stakeholder notes, and decisions from OpenClaw daily memory files ARE valuable context and should promote.
|
|
|
|
5. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates).
|
|
|
|
6. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body=None, timeout=10):
|
|
data = json.dumps(body or {}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def fetch_active_memories_for_project(base_url, project):
|
|
"""Fetch active memories for dedup checking."""
|
|
params = "active_only=true&limit=50"
|
|
if project:
|
|
params += f"&project={urllib.parse.quote(project)}"
|
|
result = api_get(base_url, f"/memory?{params}")
|
|
return result.get("memories", [])
|
|
|
|
|
|
def triage_one(candidate, active_memories, model, timeout_s):
|
|
"""Ask the triage model to classify one candidate."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
active_summary = "\n".join(
|
|
f"- [{m['memory_type']}] {m['content'][:150]}"
|
|
for m in active_memories[:20]
|
|
) or "(no active memories for this project)"
|
|
|
|
user_message = (
|
|
f"CANDIDATE TO TRIAGE:\n"
|
|
f" type: {candidate['memory_type']}\n"
|
|
f" project: {candidate.get('project') or '(none)'}\n"
|
|
f" content: {candidate['content']}\n\n"
|
|
f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n"
|
|
f"Return the JSON verdict now."
|
|
)
|
|
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", TRIAGE_SYSTEM_PROMPT,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
|
|
# Retry with exponential backoff on transient failures (rate limits etc)
|
|
last_error = ""
|
|
for attempt in range(3):
|
|
if attempt > 0:
|
|
time.sleep(2 ** attempt) # 2s, 4s
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
last_error = "triage model timed out"
|
|
continue
|
|
except Exception as exc:
|
|
last_error = f"subprocess error: {exc}"
|
|
continue
|
|
|
|
if completed.returncode == 0:
|
|
raw = (completed.stdout or "").strip()
|
|
return parse_verdict(raw)
|
|
|
|
# Capture stderr for diagnostics (truncate to 200 chars)
|
|
stderr = (completed.stderr or "").strip()[:200]
|
|
last_error = f"claude exit {completed.returncode}: {stderr}" if stderr else f"claude exit {completed.returncode}"
|
|
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": last_error}
|
|
|
|
|
|
def parse_verdict(raw):
|
|
"""Parse the triage model's JSON verdict."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text.lstrip().startswith("{"):
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"}
|
|
|
|
verdict = str(parsed.get("verdict", "needs_human")).strip().lower()
|
|
if verdict not in {"promote", "reject", "needs_human", "contradicts"}:
|
|
verdict = "needs_human"
|
|
|
|
confidence = parsed.get("confidence", 0.5)
|
|
try:
|
|
confidence = max(0.0, min(1.0, float(confidence)))
|
|
except (TypeError, ValueError):
|
|
confidence = 0.5
|
|
|
|
reason = str(parsed.get("reason", "")).strip()[:200]
|
|
conflicts_with = str(parsed.get("conflicts_with", "")).strip()
|
|
return {
|
|
"verdict": verdict,
|
|
"confidence": confidence,
|
|
"reason": reason,
|
|
"conflicts_with": conflicts_with,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Auto-triage candidate memories")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--model", default=DEFAULT_MODEL)
|
|
parser.add_argument("--dry-run", action="store_true", help="preview without executing")
|
|
parser.add_argument("--max-batches", type=int, default=20,
|
|
help="Max batches of 100 to process per run (default 20 = 2000 candidates)")
|
|
args = parser.parse_args()
|
|
|
|
# Track IDs we've already seen so needs_human items don't re-process
|
|
# every batch (they stay in the candidate table until a human reviews).
|
|
seen_ids: set[str] = set()
|
|
active_cache: dict[str, list] = {}
|
|
promoted = rejected = needs_human = errors = 0
|
|
batch_num = 0
|
|
|
|
while batch_num < args.max_batches:
|
|
batch_num += 1
|
|
|
|
result = api_get(args.base_url, "/memory?status=candidate&limit=100")
|
|
all_candidates = result.get("memories", [])
|
|
# Filter out already-seen (needs_human from prior batch in same run)
|
|
candidates = [c for c in all_candidates if c["id"] not in seen_ids]
|
|
|
|
if not candidates:
|
|
if batch_num == 1:
|
|
print("queue empty, nothing to triage")
|
|
else:
|
|
print(f"\nQueue drained after batch {batch_num-1}.")
|
|
break
|
|
|
|
print(f"\n=== batch {batch_num}: {len(candidates)} candidates model: {args.model} dry_run: {args.dry_run} ===")
|
|
|
|
for i, cand in enumerate(candidates, 1):
|
|
if i > 1:
|
|
time.sleep(0.5)
|
|
|
|
seen_ids.add(cand["id"])
|
|
project = cand.get("project") or ""
|
|
if project not in active_cache:
|
|
active_cache[project] = fetch_active_memories_for_project(args.base_url, project)
|
|
|
|
verdict_obj = triage_one(cand, active_cache[project], args.model, DEFAULT_TIMEOUT_S)
|
|
verdict = verdict_obj["verdict"]
|
|
conf = verdict_obj["confidence"]
|
|
reason = verdict_obj["reason"]
|
|
conflicts_with = verdict_obj.get("conflicts_with", "")
|
|
|
|
mid = cand["id"]
|
|
label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]"
|
|
|
|
if verdict == "promote" and conf >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
if args.dry_run:
|
|
print(f" WOULD PROMOTE {label} conf={conf:.2f} {reason}")
|
|
else:
|
|
try:
|
|
api_post(args.base_url, f"/memory/{mid}/promote")
|
|
print(f" PROMOTED {label} conf={conf:.2f} {reason}")
|
|
active_cache[project].append(cand)
|
|
except Exception:
|
|
errors += 1
|
|
promoted += 1
|
|
elif verdict == "reject":
|
|
if args.dry_run:
|
|
print(f" WOULD REJECT {label} conf={conf:.2f} {reason}")
|
|
else:
|
|
try:
|
|
api_post(args.base_url, f"/memory/{mid}/reject")
|
|
print(f" REJECTED {label} conf={conf:.2f} {reason}")
|
|
except Exception:
|
|
errors += 1
|
|
rejected += 1
|
|
elif verdict == "contradicts":
|
|
print(f" CONTRADICTS {label} vs {conflicts_with[:8] if conflicts_with else '?'} {reason}")
|
|
needs_human += 1
|
|
else:
|
|
print(f" NEEDS_HUMAN {label} conf={conf:.2f} {reason}")
|
|
needs_human += 1
|
|
|
|
print(f"\ntotal: promoted={promoted} rejected={rejected} needs_human={needs_human} errors={errors} batches={batch_num}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|