Both scripts now: - Retry up to 3x with 2s/4s exponential backoff on transient failures (rate limits, capacity spikes) - Capture claude CLI stderr in the error message (200 char cap) instead of just the exit code — diagnostics actually useful now - Sleep 0.5s between calls to avoid bursting the backend Context: last batch run hit 100% failure in triage (every call exit 1) after 40% failure in extraction. claude CLI worked fine immediately after, so the failures were capacity/rate-limit transients. With retry + pacing these batches should complete cleanly now. 439 candidates are already in the queue waiting for triage. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
285 lines
11 KiB
Python
285 lines
11 KiB
Python
"""Auto-triage: LLM second-pass over candidate memories.
|
|
|
|
Fetches all status=candidate memories from the AtoCore API, asks
|
|
a triage model (via claude -p) to classify each as promote / reject /
|
|
needs_human, and executes the verdict via the promote/reject endpoints.
|
|
Only needs_human candidates remain in the queue for manual review.
|
|
|
|
Trust model:
|
|
- Auto-promote: model says promote AND confidence >= 0.8 AND no
|
|
duplicate content in existing active memories
|
|
- Auto-reject: model says reject
|
|
- needs_human: everything else stays in queue
|
|
|
|
Runs host-side (same as batch extraction) because it needs the
|
|
claude CLI. Intended to be called after batch-extract.sh in the
|
|
nightly cron, or manually.
|
|
|
|
Usage:
|
|
|
|
python3 scripts/auto_triage.py --base-url http://localhost:8100
|
|
python3 scripts/auto_triage.py --dry-run # preview without executing
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
DEFAULT_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet")
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60"))
|
|
AUTO_PROMOTE_MIN_CONFIDENCE = 0.8
|
|
|
|
TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review.
|
|
|
|
You will receive:
|
|
- The candidate memory content and type
|
|
- A list of existing active memories for the same project (to check for duplicates)
|
|
|
|
For each candidate, output exactly one JSON object:
|
|
|
|
{"verdict": "promote|reject|needs_human|contradicts", "confidence": 0.0-1.0, "reason": "one sentence", "conflicts_with": "id of existing memory if contradicts"}
|
|
|
|
Rules:
|
|
|
|
1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping.
|
|
|
|
2. REJECT when the candidate is:
|
|
- A stale point-in-time snapshot ("live SHA is X", "36 active memories")
|
|
- An implementation detail too granular to be useful as standalone context
|
|
- A planned-but-not-implemented feature description
|
|
- A duplicate or near-duplicate of an existing active memory
|
|
- A session observation or conversational filler
|
|
- A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory
|
|
|
|
3. CONTRADICTS when the candidate *conflicts* with an existing active memory (not a duplicate, but states something that can't both be true). Set `conflicts_with` to the existing memory id. This flags the tension for human review instead of silently rejecting or double-storing. Examples: "Option A selected" vs "Option B selected" for the same decision; "uses material X" vs "uses material Y" for the same component.
|
|
|
|
4. OPENCLAW-CURATED content (candidate content starts with "From OpenClaw/"): apply a MUCH LOWER bar. OpenClaw's SOUL.md, USER.md, MEMORY.md, MODEL-ROUTING.md, and dated memory/*.md files are ALREADY curated by OpenClaw as canonical continuity. Promote unless clearly wrong or a genuine duplicate. Do NOT reject OpenClaw content as "process rule belongs elsewhere" or "session log" — that's exactly what AtoCore wants to absorb. Session events, project updates, stakeholder notes, and decisions from OpenClaw daily memory files ARE valuable context and should promote.
|
|
|
|
5. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates).
|
|
|
|
6. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body=None, timeout=10):
|
|
data = json.dumps(body or {}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def fetch_active_memories_for_project(base_url, project):
|
|
"""Fetch active memories for dedup checking."""
|
|
params = "active_only=true&limit=50"
|
|
if project:
|
|
params += f"&project={urllib.parse.quote(project)}"
|
|
result = api_get(base_url, f"/memory?{params}")
|
|
return result.get("memories", [])
|
|
|
|
|
|
def triage_one(candidate, active_memories, model, timeout_s):
|
|
"""Ask the triage model to classify one candidate."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
active_summary = "\n".join(
|
|
f"- [{m['memory_type']}] {m['content'][:150]}"
|
|
for m in active_memories[:20]
|
|
) or "(no active memories for this project)"
|
|
|
|
user_message = (
|
|
f"CANDIDATE TO TRIAGE:\n"
|
|
f" type: {candidate['memory_type']}\n"
|
|
f" project: {candidate.get('project') or '(none)'}\n"
|
|
f" content: {candidate['content']}\n\n"
|
|
f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n"
|
|
f"Return the JSON verdict now."
|
|
)
|
|
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", TRIAGE_SYSTEM_PROMPT,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
|
|
# Retry with exponential backoff on transient failures (rate limits etc)
|
|
last_error = ""
|
|
for attempt in range(3):
|
|
if attempt > 0:
|
|
time.sleep(2 ** attempt) # 2s, 4s
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
last_error = "triage model timed out"
|
|
continue
|
|
except Exception as exc:
|
|
last_error = f"subprocess error: {exc}"
|
|
continue
|
|
|
|
if completed.returncode == 0:
|
|
raw = (completed.stdout or "").strip()
|
|
return parse_verdict(raw)
|
|
|
|
# Capture stderr for diagnostics (truncate to 200 chars)
|
|
stderr = (completed.stderr or "").strip()[:200]
|
|
last_error = f"claude exit {completed.returncode}: {stderr}" if stderr else f"claude exit {completed.returncode}"
|
|
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": last_error}
|
|
|
|
|
|
def parse_verdict(raw):
|
|
"""Parse the triage model's JSON verdict."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text.lstrip().startswith("{"):
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"}
|
|
|
|
verdict = str(parsed.get("verdict", "needs_human")).strip().lower()
|
|
if verdict not in {"promote", "reject", "needs_human", "contradicts"}:
|
|
verdict = "needs_human"
|
|
|
|
confidence = parsed.get("confidence", 0.5)
|
|
try:
|
|
confidence = max(0.0, min(1.0, float(confidence)))
|
|
except (TypeError, ValueError):
|
|
confidence = 0.5
|
|
|
|
reason = str(parsed.get("reason", "")).strip()[:200]
|
|
conflicts_with = str(parsed.get("conflicts_with", "")).strip()
|
|
return {
|
|
"verdict": verdict,
|
|
"confidence": confidence,
|
|
"reason": reason,
|
|
"conflicts_with": conflicts_with,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Auto-triage candidate memories")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--model", default=DEFAULT_MODEL)
|
|
parser.add_argument("--dry-run", action="store_true", help="preview without executing")
|
|
args = parser.parse_args()
|
|
|
|
# Fetch candidates
|
|
result = api_get(args.base_url, "/memory?status=candidate&limit=100")
|
|
candidates = result.get("memories", [])
|
|
print(f"candidates: {len(candidates)} model: {args.model} dry_run: {args.dry_run}")
|
|
|
|
if not candidates:
|
|
print("queue empty, nothing to triage")
|
|
return
|
|
|
|
# Cache active memories per project for dedup
|
|
active_cache = {}
|
|
promoted = rejected = needs_human = errors = 0
|
|
|
|
for i, cand in enumerate(candidates, 1):
|
|
# Light rate-limit pacing: 0.5s between triage calls so a burst
|
|
# doesn't overwhelm the claude CLI's backend. With ~60s per call
|
|
# this is negligible overhead but avoids the "all-failed" pattern
|
|
# we saw on large batches.
|
|
if i > 1:
|
|
time.sleep(0.5)
|
|
|
|
project = cand.get("project") or ""
|
|
if project not in active_cache:
|
|
active_cache[project] = fetch_active_memories_for_project(args.base_url, project)
|
|
|
|
verdict_obj = triage_one(cand, active_cache[project], args.model, DEFAULT_TIMEOUT_S)
|
|
verdict = verdict_obj["verdict"]
|
|
conf = verdict_obj["confidence"]
|
|
reason = verdict_obj["reason"]
|
|
conflicts_with = verdict_obj.get("conflicts_with", "")
|
|
|
|
mid = cand["id"]
|
|
label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]"
|
|
|
|
if verdict == "promote" and conf >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
if args.dry_run:
|
|
print(f" WOULD PROMOTE {label} conf={conf:.2f} {reason}")
|
|
else:
|
|
try:
|
|
api_post(args.base_url, f"/memory/{mid}/promote")
|
|
print(f" PROMOTED {label} conf={conf:.2f} {reason}")
|
|
active_cache[project].append(cand)
|
|
except Exception:
|
|
errors += 1
|
|
promoted += 1
|
|
elif verdict == "reject":
|
|
if args.dry_run:
|
|
print(f" WOULD REJECT {label} conf={conf:.2f} {reason}")
|
|
else:
|
|
try:
|
|
api_post(args.base_url, f"/memory/{mid}/reject")
|
|
print(f" REJECTED {label} conf={conf:.2f} {reason}")
|
|
except Exception:
|
|
errors += 1
|
|
rejected += 1
|
|
elif verdict == "contradicts":
|
|
# Leave candidate in queue but flag the conflict in content
|
|
# so the wiki/triage shows it. This is conservative: we
|
|
# don't silently merge or reject when sources disagree.
|
|
print(f" CONTRADICTS {label} vs {conflicts_with[:8] if conflicts_with else '?'} {reason}")
|
|
contradicts_count = locals().get('contradicts_count', 0) + 1
|
|
needs_human += 1
|
|
else:
|
|
print(f" NEEDS_HUMAN {label} conf={conf:.2f} {reason}")
|
|
needs_human += 1
|
|
|
|
print(f"\npromoted={promoted} rejected={rejected} needs_human={needs_human} errors={errors}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|