"""Auto-triage: LLM second-pass over candidate memories. Fetches all status=candidate memories from the AtoCore API, asks a triage model (via claude -p) to classify each as promote / reject / needs_human, and executes the verdict via the promote/reject endpoints. Only needs_human candidates remain in the queue for manual review. Trust model: - Auto-promote: model says promote AND confidence >= 0.8 AND no duplicate content in existing active memories - Auto-reject: model says reject - needs_human: everything else stays in queue Runs host-side (same as batch extraction) because it needs the claude CLI. Intended to be called after batch-extract.sh in the nightly cron, or manually. Usage: python3 scripts/auto_triage.py --base-url http://localhost:8100 python3 scripts/auto_triage.py --dry-run # preview without executing """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import time import tempfile import urllib.error import urllib.parse import urllib.request DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60")) AUTO_PROMOTE_MIN_CONFIDENCE = 0.8 TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review. You will receive: - The candidate memory content and type - A list of existing active memories for the same project (to check for duplicates) For each candidate, output exactly one JSON object: {"verdict": "promote|reject|needs_human|contradicts", "confidence": 0.0-1.0, "reason": "one sentence", "conflicts_with": "id of existing memory if contradicts", "domain_tags": ["tag1","tag2"], "valid_until": null} DOMAIN TAGS (Phase 3): A lowercase list of 2-5 topical keywords describing the SUBJECT matter (not the project). This enables cross-project retrieval: a query about "optics" can pull matches from p04 + p05 + p06. Good tags are single lowercase words or hyphenated terms. Mix: - domain keywords (optics, thermal, firmware, materials, controls) - project tokens when clearly scoped (p04, p05, p06, abb) - lifecycle/activity words (procurement, design, validation, vendor) Always emit domain_tags on a promote. For reject, empty list is fine. VALID_UNTIL (Phase 3): ISO date "YYYY-MM-DD" OR null (permanent). Set to a near-future date when the candidate is time-bounded: - Status snapshots ("current blocker is X") → ~2 weeks out - Scheduled events ("meeting Friday") → event date - Quotes with expiry → quote expiry date Leave null for durable decisions, engineering insights, ratified requirements. Rules: 1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping. 2. REJECT when the candidate is: - A stale point-in-time snapshot ("live SHA is X", "36 active memories") - An implementation detail too granular to be useful as standalone context - A planned-but-not-implemented feature description - A duplicate or near-duplicate of an existing active memory - A session observation or conversational filler - A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory 3. CONTRADICTS when the candidate *conflicts* with an existing active memory (not a duplicate, but states something that can't both be true). Set `conflicts_with` to the existing memory id. This flags the tension for human review instead of silently rejecting or double-storing. Examples: "Option A selected" vs "Option B selected" for the same decision; "uses material X" vs "uses material Y" for the same component. 4. OPENCLAW-CURATED content (candidate content starts with "From OpenClaw/"): apply a MUCH LOWER bar. OpenClaw's SOUL.md, USER.md, MEMORY.md, MODEL-ROUTING.md, and dated memory/*.md files are ALREADY curated by OpenClaw as canonical continuity. Promote unless clearly wrong or a genuine duplicate. Do NOT reject OpenClaw content as "process rule belongs elsewhere" or "session log" — that's exactly what AtoCore wants to absorb. Session events, project updates, stakeholder notes, and decisions from OpenClaw daily memory files ARE valuable context and should promote. 5. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates). 6. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field.""" _sandbox_cwd = None def get_sandbox_cwd(): global _sandbox_cwd if _sandbox_cwd is None: _sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-") return _sandbox_cwd def api_get(base_url, path, timeout=10): req = urllib.request.Request(f"{base_url}{path}") with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def api_post(base_url, path, body=None, timeout=10): data = json.dumps(body or {}).encode("utf-8") req = urllib.request.Request( f"{base_url}{path}", method="POST", headers={"Content-Type": "application/json"}, data=data, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def fetch_active_memories_for_project(base_url, project): """Fetch active memories for dedup checking.""" params = "active_only=true&limit=50" if project: params += f"&project={urllib.parse.quote(project)}" result = api_get(base_url, f"/memory?{params}") return result.get("memories", []) def triage_one(candidate, active_memories, model, timeout_s): """Ask the triage model to classify one candidate.""" if not shutil.which("claude"): return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"} active_summary = "\n".join( f"- [{m['memory_type']}] {m['content'][:150]}" for m in active_memories[:20] ) or "(no active memories for this project)" user_message = ( f"CANDIDATE TO TRIAGE:\n" f" type: {candidate['memory_type']}\n" f" project: {candidate.get('project') or '(none)'}\n" f" content: {candidate['content']}\n\n" f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n" f"Return the JSON verdict now." ) args = [ "claude", "-p", "--model", model, "--append-system-prompt", TRIAGE_SYSTEM_PROMPT, "--disable-slash-commands", user_message, ] # Retry with exponential backoff on transient failures (rate limits etc) last_error = "" for attempt in range(3): if attempt > 0: time.sleep(2 ** attempt) # 2s, 4s try: completed = subprocess.run( args, capture_output=True, text=True, timeout=timeout_s, cwd=get_sandbox_cwd(), encoding="utf-8", errors="replace", ) except subprocess.TimeoutExpired: last_error = "triage model timed out" continue except Exception as exc: last_error = f"subprocess error: {exc}" continue if completed.returncode == 0: raw = (completed.stdout or "").strip() return parse_verdict(raw) # Capture stderr for diagnostics (truncate to 200 chars) stderr = (completed.stderr or "").strip()[:200] last_error = f"claude exit {completed.returncode}: {stderr}" if stderr else f"claude exit {completed.returncode}" return {"verdict": "needs_human", "confidence": 0.0, "reason": last_error} def parse_verdict(raw): """Parse the triage model's JSON verdict.""" text = raw.strip() if text.startswith("```"): text = text.strip("`") nl = text.find("\n") if nl >= 0: text = text[nl + 1:] if text.endswith("```"): text = text[:-3] text = text.strip() if not text.lstrip().startswith("{"): start = text.find("{") end = text.rfind("}") if start >= 0 and end > start: text = text[start:end + 1] try: parsed = json.loads(text) except json.JSONDecodeError: return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"} verdict = str(parsed.get("verdict", "needs_human")).strip().lower() if verdict not in {"promote", "reject", "needs_human", "contradicts"}: verdict = "needs_human" confidence = parsed.get("confidence", 0.5) try: confidence = max(0.0, min(1.0, float(confidence))) except (TypeError, ValueError): confidence = 0.5 reason = str(parsed.get("reason", "")).strip()[:200] conflicts_with = str(parsed.get("conflicts_with", "")).strip() # Phase 3: domain tags + expiry raw_tags = parsed.get("domain_tags") or [] if isinstance(raw_tags, str): raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()] if not isinstance(raw_tags, list): raw_tags = [] domain_tags = [] for t in raw_tags[:10]: if not isinstance(t, str): continue tag = t.strip().lower() if tag and tag not in domain_tags: domain_tags.append(tag) valid_until = parsed.get("valid_until") if valid_until is None: valid_until = "" else: valid_until = str(valid_until).strip() if valid_until.lower() in ("", "null", "none", "permanent"): valid_until = "" return { "verdict": verdict, "confidence": confidence, "reason": reason, "conflicts_with": conflicts_with, "domain_tags": domain_tags, "valid_until": valid_until, } def main(): parser = argparse.ArgumentParser(description="Auto-triage candidate memories") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--model", default=DEFAULT_MODEL) parser.add_argument("--dry-run", action="store_true", help="preview without executing") parser.add_argument("--max-batches", type=int, default=20, help="Max batches of 100 to process per run (default 20 = 2000 candidates)") args = parser.parse_args() # Track IDs we've already seen so needs_human items don't re-process # every batch (they stay in the candidate table until a human reviews). seen_ids: set[str] = set() active_cache: dict[str, list] = {} promoted = rejected = needs_human = errors = 0 batch_num = 0 while batch_num < args.max_batches: batch_num += 1 result = api_get(args.base_url, "/memory?status=candidate&limit=100") all_candidates = result.get("memories", []) # Filter out already-seen (needs_human from prior batch in same run) candidates = [c for c in all_candidates if c["id"] not in seen_ids] if not candidates: if batch_num == 1: print("queue empty, nothing to triage") else: print(f"\nQueue drained after batch {batch_num-1}.") break print(f"\n=== batch {batch_num}: {len(candidates)} candidates model: {args.model} dry_run: {args.dry_run} ===") for i, cand in enumerate(candidates, 1): if i > 1: time.sleep(0.5) seen_ids.add(cand["id"]) project = cand.get("project") or "" if project not in active_cache: active_cache[project] = fetch_active_memories_for_project(args.base_url, project) verdict_obj = triage_one(cand, active_cache[project], args.model, DEFAULT_TIMEOUT_S) verdict = verdict_obj["verdict"] conf = verdict_obj["confidence"] reason = verdict_obj["reason"] conflicts_with = verdict_obj.get("conflicts_with", "") mid = cand["id"] label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]" if verdict == "promote" and conf >= AUTO_PROMOTE_MIN_CONFIDENCE: if args.dry_run: print(f" WOULD PROMOTE {label} conf={conf:.2f} {reason}") else: # Phase 3: update tags + valid_until on the candidate # before promoting, so the active memory carries them. tags = verdict_obj.get("domain_tags") or [] valid_until = verdict_obj.get("valid_until") or "" if tags or valid_until: try: import urllib.request as _ur body = json.dumps({ "domain_tags": tags, "valid_until": valid_until, }).encode("utf-8") req = _ur.Request( f"{args.base_url}/memory/{mid}", method="PUT", headers={"Content-Type": "application/json"}, data=body, ) _ur.urlopen(req, timeout=10).read() except Exception: pass # non-fatal; promote anyway try: api_post(args.base_url, f"/memory/{mid}/promote") print(f" PROMOTED {label} conf={conf:.2f} {reason}") active_cache[project].append(cand) except Exception: errors += 1 promoted += 1 elif verdict == "reject": if args.dry_run: print(f" WOULD REJECT {label} conf={conf:.2f} {reason}") else: try: api_post(args.base_url, f"/memory/{mid}/reject") print(f" REJECTED {label} conf={conf:.2f} {reason}") except Exception: errors += 1 rejected += 1 elif verdict == "contradicts": print(f" CONTRADICTS {label} vs {conflicts_with[:8] if conflicts_with else '?'} {reason}") needs_human += 1 else: print(f" NEEDS_HUMAN {label} conf={conf:.2f} {reason}") needs_human += 1 print(f"\ntotal: promoted={promoted} rejected={rejected} needs_human={needs_human} errors={errors} batches={batch_num}") if __name__ == "__main__": main()