Codex's formal audit of fb4d55c said GO WITH CONDITIONS. Two P2 findings
to fold in before merge:
1. auto_triage.py:417 still PUT {"content": cand["content"]} — the
suggested-project correction was unreachable even with
MemoryUpdateRequest.project in place. Changed body to
{"project": suggested} so misattribution flags actually retarget the
memory. Added a regression test that asserts the script source
contains the new PUT shape, so a future "optimization" can't silently
undo this.
2. POST /memory/{id}/supersede had no status guard — calling
supersede_memory() delegated to update_memory(status="superseded"),
which would silently flip a candidate to superseded. Mirrored the
invalidate route: get_memory(id) lookup, 404 unknown / 200
already_superseded / 409 wrong-status / 200 superseded.
Plus a P3 from the same audit: covered the "retarget to project=''
when a global active duplicate exists" case via
test_update_memory_to_empty_project_detects_global_duplicate.
Tests: 581 -> 586 (+5: 3 supersede route + 1 project-empty duplicate +
1 auto_triage caller invariant).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
556 lines
24 KiB
Python
556 lines
24 KiB
Python
"""Auto-triage: LLM second-pass over candidate memories.
|
|
|
|
Fetches all status=candidate memories from the AtoCore API, asks
|
|
a triage model (via claude -p) to classify each as promote / reject /
|
|
needs_human, and executes the verdict via the promote/reject endpoints.
|
|
Only needs_human candidates remain in the queue for manual review.
|
|
|
|
Trust model:
|
|
- Auto-promote: model says promote AND confidence >= 0.8 AND no
|
|
duplicate content in existing active memories
|
|
- Auto-reject: model says reject
|
|
- needs_human: everything else stays in queue
|
|
|
|
Runs host-side (same as batch extraction) because it needs the
|
|
claude CLI. Intended to be called after batch-extract.sh in the
|
|
nightly cron, or manually.
|
|
|
|
Usage:
|
|
|
|
python3 scripts/auto_triage.py --base-url http://localhost:8100
|
|
python3 scripts/auto_triage.py --dry-run # preview without executing
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
|
|
# 3-tier escalation config (Phase "Triage Quality")
|
|
TIER1_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL_TIER1",
|
|
os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet"))
|
|
TIER2_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL_TIER2", "opus")
|
|
# Tier 3: default "discard" (auto-reject uncertain after opus disagrees/wavers),
|
|
# alternative "human" routes them to /admin/triage.
|
|
TIER3_ACTION = os.environ.get("ATOCORE_TRIAGE_TIER3", "discard").lower()
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60"))
|
|
TIER2_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIER2_TIMEOUT_S", "120"))
|
|
AUTO_PROMOTE_MIN_CONFIDENCE = 0.8
|
|
# Below this, tier 1 decision is "not confident enough" and we escalate
|
|
ESCALATION_CONFIDENCE_THRESHOLD = float(
|
|
os.environ.get("ATOCORE_TRIAGE_ESCALATION_THRESHOLD", "0.75")
|
|
)
|
|
|
|
# Kept for legacy callers that reference DEFAULT_MODEL
|
|
DEFAULT_MODEL = TIER1_MODEL
|
|
|
|
TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review.
|
|
|
|
You will receive:
|
|
- The candidate memory content, type, and claimed project
|
|
- A list of existing active memories for the same project (to check for duplicates + contradictions)
|
|
- Trusted project state entries (curated ground truth — higher trust than memories)
|
|
- Known project ids so you can flag misattribution
|
|
|
|
For each candidate, output exactly one JSON object:
|
|
|
|
{"verdict": "promote|reject|needs_human|contradicts", "confidence": 0.0-1.0, "reason": "one sentence", "conflicts_with": "id of existing memory if contradicts", "domain_tags": ["tag1","tag2"], "valid_until": null}
|
|
|
|
DOMAIN TAGS (Phase 3): A lowercase list of 2-5 topical keywords describing
|
|
the SUBJECT matter (not the project). This enables cross-project retrieval:
|
|
a query about "optics" can pull matches from p04 + p05 + p06.
|
|
|
|
Good tags are single lowercase words or hyphenated terms. Mix:
|
|
- domain keywords (optics, thermal, firmware, materials, controls)
|
|
- project tokens when clearly scoped (p04, p05, p06, abb)
|
|
- lifecycle/activity words (procurement, design, validation, vendor)
|
|
|
|
Always emit domain_tags on a promote. For reject, empty list is fine.
|
|
|
|
VALID_UNTIL (Phase 3): ISO date "YYYY-MM-DD" OR null (permanent).
|
|
Set to a near-future date when the candidate is time-bounded:
|
|
- Status snapshots ("current blocker is X") → ~2 weeks out
|
|
- Scheduled events ("meeting Friday") → event date
|
|
- Quotes with expiry → quote expiry date
|
|
Leave null for durable decisions, engineering insights, ratified requirements.
|
|
|
|
Rules:
|
|
|
|
1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping.
|
|
|
|
2. REJECT when the candidate is:
|
|
- A stale point-in-time snapshot ("live SHA is X", "36 active memories")
|
|
- An implementation detail too granular to be useful as standalone context
|
|
- A planned-but-not-implemented feature description
|
|
- A duplicate or near-duplicate of an existing active memory
|
|
- A session observation or conversational filler
|
|
- A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory
|
|
|
|
3. CONTRADICTS when the candidate *conflicts* with an existing active memory (not a duplicate, but states something that can't both be true). Set `conflicts_with` to the existing memory id. This flags the tension for human review instead of silently rejecting or double-storing. Examples: "Option A selected" vs "Option B selected" for the same decision; "uses material X" vs "uses material Y" for the same component.
|
|
|
|
4. OPENCLAW-CURATED content (candidate content starts with "From OpenClaw/"): apply a MUCH LOWER bar. OpenClaw's SOUL.md, USER.md, MEMORY.md, MODEL-ROUTING.md, and dated memory/*.md files are ALREADY curated by OpenClaw as canonical continuity. Promote unless clearly wrong or a genuine duplicate. Do NOT reject OpenClaw content as "process rule belongs elsewhere" or "session log" — that's exactly what AtoCore wants to absorb. Session events, project updates, stakeholder notes, and decisions from OpenClaw daily memory files ARE valuable context and should promote.
|
|
|
|
5. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates). If this is just noise/filler, prefer REJECT with low confidence.
|
|
|
|
6. PROJECT VALIDATION: The candidate has a "claimed project". You'll see the list of registered project ids. If the claimed project doesn't match any registered id AND the content clearly belongs to a registered project, include "suggested_project": "<correct_id>" in your output so the caller can auto-fix the attribution. If the content is genuinely cross-project or global, leave project empty (suggested_project=""). Misattribution is the #1 pollution source — flag it.
|
|
|
|
7. TEMPORAL SENSITIVITY: Be aggressive with valid_until for anything that reads like "current state", "right now", "this week", "as of". Stale facts pollute context. When in doubt, set a 2-4 week expiry rather than null.
|
|
|
|
8. CONFIDENCE GRADING:
|
|
- 0.9+: crystal clear durable fact or clear noise
|
|
- 0.75-0.9: confident but not cryptographic-certain
|
|
- 0.6-0.75: borderline — will escalate to opus for second opinion
|
|
- <0.6: genuinely ambiguous — needs human or will be discarded
|
|
|
|
9. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field. Include optional "suggested_project" field when misattribution detected."""
|
|
|
|
|
|
TIER2_SECOND_OPINION_PROMPT = TRIAGE_SYSTEM_PROMPT + """
|
|
|
|
ESCALATED REVIEW: You are seeing this candidate because the tier-1 (sonnet) reviewer could not decide confidently. You will be shown tier-1's verdict + reason as additional context. Your job is to resolve the uncertainty with more careful thinking. Use your full context window to cross-reference the existing memories. If you ALSO cannot decide with confidence >= 0.8, output verdict="needs_human" with a clear explanation of what information would break the tie. That signal will route to a human (or auto-discard, depending on config)."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body=None, timeout=10):
|
|
data = json.dumps(body or {}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def fetch_active_memories_for_project(base_url, project):
|
|
"""Fetch active memories for dedup checking."""
|
|
params = "active_only=true&limit=50"
|
|
if project:
|
|
params += f"&project={urllib.parse.quote(project)}"
|
|
result = api_get(base_url, f"/memory?{params}")
|
|
return result.get("memories", [])
|
|
|
|
|
|
def fetch_project_state(base_url, project):
|
|
"""Fetch trusted project state for ground-truth context."""
|
|
if not project:
|
|
return []
|
|
try:
|
|
result = api_get(base_url, f"/project/state/{urllib.parse.quote(project)}")
|
|
return result.get("entries", result.get("state", []))
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def fetch_registered_projects(base_url):
|
|
"""Return list of registered project ids + aliases for misattribution check."""
|
|
try:
|
|
result = api_get(base_url, "/projects")
|
|
projects = result.get("projects", [])
|
|
out = {}
|
|
for p in projects:
|
|
pid = p.get("project_id") or p.get("id") or p.get("name")
|
|
if pid:
|
|
out[pid] = p.get("aliases", []) or []
|
|
return out
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def build_triage_user_message(candidate, active_memories, project_state, known_projects):
|
|
"""Richer context for the triage model: memories + state + project registry."""
|
|
active_summary = "\n".join(
|
|
f"- [{m['memory_type']}] {m['content'][:200]}"
|
|
for m in active_memories[:30]
|
|
) or "(no active memories for this project)"
|
|
|
|
state_summary = ""
|
|
if project_state:
|
|
lines = []
|
|
for e in project_state[:20]:
|
|
cat = e.get("category", "?")
|
|
key = e.get("key", "?")
|
|
val = (e.get("value") or "")[:200]
|
|
lines.append(f"- [{cat}/{key}] {val}")
|
|
state_summary = "\n".join(lines)
|
|
else:
|
|
state_summary = "(no trusted state entries for this project)"
|
|
|
|
projects_line = ", ".join(sorted(known_projects.keys())) if known_projects else "(none)"
|
|
|
|
return (
|
|
f"CANDIDATE TO TRIAGE:\n"
|
|
f" type: {candidate['memory_type']}\n"
|
|
f" claimed project: {candidate.get('project') or '(none)'}\n"
|
|
f" content: {candidate['content']}\n\n"
|
|
f"REGISTERED PROJECT IDS: {projects_line}\n\n"
|
|
f"TRUSTED PROJECT STATE (ground truth, higher trust than memories):\n{state_summary}\n\n"
|
|
f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n"
|
|
f"Return the JSON verdict now."
|
|
)
|
|
|
|
|
|
def _call_claude(system_prompt, user_message, model, timeout_s):
|
|
"""Shared CLI caller with retry + stderr capture."""
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", system_prompt,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
last_error = ""
|
|
for attempt in range(3):
|
|
if attempt > 0:
|
|
time.sleep(2 ** attempt)
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
last_error = f"{model} timed out"
|
|
continue
|
|
except Exception as exc:
|
|
last_error = f"subprocess error: {exc}"
|
|
continue
|
|
|
|
if completed.returncode == 0:
|
|
return (completed.stdout or "").strip(), None
|
|
|
|
stderr = (completed.stderr or "").strip()[:200]
|
|
last_error = f"{model} exit {completed.returncode}: {stderr}" if stderr else f"{model} exit {completed.returncode}"
|
|
return None, last_error
|
|
|
|
|
|
def triage_one(candidate, active_memories, project_state, known_projects, model, timeout_s):
|
|
"""Tier-1 triage: ask the cheap model for a verdict."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
user_message = build_triage_user_message(candidate, active_memories, project_state, known_projects)
|
|
raw, err = _call_claude(TRIAGE_SYSTEM_PROMPT, user_message, model, timeout_s)
|
|
if err:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": err}
|
|
return parse_verdict(raw)
|
|
|
|
|
|
def triage_escalation(candidate, tier1_verdict, active_memories, project_state, known_projects, model, timeout_s):
|
|
"""Tier-2 escalation: opus sees tier-1's verdict + reasoning, tries again."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
base_msg = build_triage_user_message(candidate, active_memories, project_state, known_projects)
|
|
tier1_context = (
|
|
f"\nTIER-1 REVIEW (sonnet, for your reference):\n"
|
|
f" verdict: {tier1_verdict.get('verdict')}\n"
|
|
f" confidence: {tier1_verdict.get('confidence', 0.0):.2f}\n"
|
|
f" reason: {tier1_verdict.get('reason', '')[:300]}\n\n"
|
|
f"Resolve the uncertainty. If you also can't decide with confidence ≥ 0.8, "
|
|
f"return verdict='needs_human' with a specific explanation of what information "
|
|
f"would break the tie.\n\nReturn the JSON verdict now."
|
|
)
|
|
raw, err = _call_claude(TIER2_SECOND_OPINION_PROMPT, base_msg + tier1_context, model, timeout_s)
|
|
if err:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": f"tier2: {err}"}
|
|
return parse_verdict(raw)
|
|
|
|
|
|
def parse_verdict(raw):
|
|
"""Parse the triage model's JSON verdict."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text.lstrip().startswith("{"):
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"}
|
|
|
|
verdict = str(parsed.get("verdict", "needs_human")).strip().lower()
|
|
if verdict not in {"promote", "reject", "needs_human", "contradicts"}:
|
|
verdict = "needs_human"
|
|
|
|
confidence = parsed.get("confidence", 0.5)
|
|
try:
|
|
confidence = max(0.0, min(1.0, float(confidence)))
|
|
except (TypeError, ValueError):
|
|
confidence = 0.5
|
|
|
|
reason = str(parsed.get("reason", "")).strip()[:200]
|
|
conflicts_with = str(parsed.get("conflicts_with", "")).strip()
|
|
|
|
# Phase 3: domain tags + expiry
|
|
raw_tags = parsed.get("domain_tags") or []
|
|
if isinstance(raw_tags, str):
|
|
raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()]
|
|
if not isinstance(raw_tags, list):
|
|
raw_tags = []
|
|
domain_tags = []
|
|
for t in raw_tags[:10]:
|
|
if not isinstance(t, str):
|
|
continue
|
|
tag = t.strip().lower()
|
|
if tag and tag not in domain_tags:
|
|
domain_tags.append(tag)
|
|
|
|
valid_until = parsed.get("valid_until")
|
|
if valid_until is None:
|
|
valid_until = ""
|
|
else:
|
|
valid_until = str(valid_until).strip()
|
|
if valid_until.lower() in ("", "null", "none", "permanent"):
|
|
valid_until = ""
|
|
|
|
# Triage Quality: project misattribution flag
|
|
suggested_project = str(parsed.get("suggested_project", "")).strip()
|
|
|
|
return {
|
|
"verdict": verdict,
|
|
"confidence": confidence,
|
|
"reason": reason,
|
|
"conflicts_with": conflicts_with,
|
|
"domain_tags": domain_tags,
|
|
"valid_until": valid_until,
|
|
"suggested_project": suggested_project,
|
|
}
|
|
|
|
|
|
def _apply_metadata_update(base_url, mid, verdict_obj):
|
|
"""Persist tags + valid_until + suggested_project before the promote call."""
|
|
tags = verdict_obj.get("domain_tags") or []
|
|
valid_until = verdict_obj.get("valid_until") or ""
|
|
suggested = verdict_obj.get("suggested_project") or ""
|
|
|
|
body = {}
|
|
if tags:
|
|
body["domain_tags"] = tags
|
|
if valid_until:
|
|
body["valid_until"] = valid_until
|
|
if not body and not suggested:
|
|
return
|
|
|
|
if body:
|
|
try:
|
|
import urllib.request as _ur
|
|
req = _ur.Request(
|
|
f"{base_url}/memory/{mid}", method="PUT",
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps(body).encode("utf-8"),
|
|
)
|
|
_ur.urlopen(req, timeout=10).read()
|
|
except Exception:
|
|
pass
|
|
|
|
# Project auto-fix via direct SQLite update would bypass audit; use PUT if supported.
|
|
# For now we log the suggestion — operator script can apply it in batch.
|
|
if suggested:
|
|
# noop here — handled by caller which tracks suggested_project_fixes
|
|
pass
|
|
|
|
|
|
def process_candidate(cand, base_url, active_cache, state_cache, known_projects, dry_run):
|
|
"""Run the 3-tier triage and apply the resulting action.
|
|
|
|
Returns (action, note) where action in {promote, reject, discard, human, error}.
|
|
"""
|
|
mid = cand["id"]
|
|
project = cand.get("project") or ""
|
|
if project not in active_cache:
|
|
active_cache[project] = fetch_active_memories_for_project(base_url, project)
|
|
if project not in state_cache:
|
|
state_cache[project] = fetch_project_state(base_url, project)
|
|
|
|
# === Tier 1 ===
|
|
v1 = triage_one(
|
|
cand, active_cache[project], state_cache[project],
|
|
known_projects, TIER1_MODEL, DEFAULT_TIMEOUT_S,
|
|
)
|
|
|
|
# Project misattribution fix: suggested_project surfaces from tier 1.
|
|
# Earlier code POSTed only {"content": cand["content"]}, which left
|
|
# the project field unchanged because MemoryUpdateRequest had no
|
|
# project key and the service signature didn't accept one. Wave 1
|
|
# added project to MemoryUpdateRequest and update_memory(); this
|
|
# caller now actually applies the suggested project.
|
|
suggested = (v1.get("suggested_project") or "").strip()
|
|
if suggested and suggested != project and suggested in known_projects:
|
|
if not dry_run:
|
|
try:
|
|
import urllib.request as _ur
|
|
req = _ur.Request(
|
|
f"{base_url}/memory/{mid}", method="PUT",
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps({"project": suggested}).encode("utf-8"),
|
|
)
|
|
_ur.urlopen(req, timeout=10).read()
|
|
except Exception:
|
|
pass
|
|
print(f" ↺ misattribution flagged: {project!r} → {suggested!r}")
|
|
|
|
# High-confidence tier 1 decision → act
|
|
if v1["verdict"] in ("promote", "reject") and v1["confidence"] >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
return _apply_verdict(v1, cand, base_url, active_cache, dry_run, tier="sonnet")
|
|
|
|
# Borderline or uncertain → escalate to tier 2 (opus)
|
|
print(f" ↑ escalating (tier1 verdict={v1['verdict']} conf={v1['confidence']:.2f})")
|
|
v2 = triage_escalation(
|
|
cand, v1, active_cache[project], state_cache[project],
|
|
known_projects, TIER2_MODEL, TIER2_TIMEOUT_S,
|
|
)
|
|
|
|
# Tier 2 is confident → act
|
|
if v2["verdict"] in ("promote", "reject") and v2["confidence"] >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
return _apply_verdict(v2, cand, base_url, active_cache, dry_run, tier="opus")
|
|
|
|
# Tier 3: still uncertain — route per config
|
|
if TIER3_ACTION == "discard":
|
|
reason = f"tier1+tier2 uncertain: {v2.get('reason', '')[:150]}"
|
|
if dry_run:
|
|
return ("discard", reason)
|
|
try:
|
|
api_post(base_url, f"/memory/{mid}/reject")
|
|
except Exception:
|
|
return ("error", reason)
|
|
return ("discard", reason)
|
|
else:
|
|
# "human" — leave in queue for /admin/triage review
|
|
return ("human", v2.get("reason", "no reason")[:200])
|
|
|
|
|
|
def _apply_verdict(verdict_obj, cand, base_url, active_cache, dry_run, tier):
|
|
"""Execute the promote/reject action and update metadata."""
|
|
mid = cand["id"]
|
|
verdict = verdict_obj["verdict"]
|
|
conf = verdict_obj["confidence"]
|
|
reason = f"[{tier}] {verdict_obj['reason']}"
|
|
|
|
if verdict == "promote":
|
|
if dry_run:
|
|
return ("promote", reason)
|
|
_apply_metadata_update(base_url, mid, verdict_obj)
|
|
try:
|
|
api_post(base_url, f"/memory/{mid}/promote")
|
|
project = cand.get("project") or ""
|
|
if project in active_cache:
|
|
active_cache[project].append(cand)
|
|
return ("promote", reason)
|
|
except Exception as e:
|
|
return ("error", f"promote failed: {e}")
|
|
else:
|
|
if dry_run:
|
|
return ("reject", reason)
|
|
try:
|
|
api_post(base_url, f"/memory/{mid}/reject")
|
|
return ("reject", reason)
|
|
except Exception as e:
|
|
return ("error", f"reject failed: {e}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Auto-triage candidate memories (3-tier escalation)")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--dry-run", action="store_true", help="preview without executing")
|
|
parser.add_argument("--max-batches", type=int, default=20,
|
|
help="Max batches of 100 to process per run")
|
|
parser.add_argument("--no-escalation", action="store_true",
|
|
help="Disable tier-2 escalation (legacy single-model behavior)")
|
|
args = parser.parse_args()
|
|
|
|
seen_ids: set[str] = set()
|
|
active_cache: dict[str, list] = {}
|
|
state_cache: dict[str, list] = {}
|
|
|
|
known_projects = fetch_registered_projects(args.base_url)
|
|
print(f"Registered projects: {sorted(known_projects.keys())}")
|
|
print(f"Tier1: {TIER1_MODEL} Tier2: {TIER2_MODEL} Tier3: {TIER3_ACTION} "
|
|
f"escalation_threshold: {ESCALATION_CONFIDENCE_THRESHOLD}")
|
|
|
|
counts = {"promote": 0, "reject": 0, "discard": 0, "human": 0, "error": 0}
|
|
batch_num = 0
|
|
|
|
while batch_num < args.max_batches:
|
|
batch_num += 1
|
|
result = api_get(args.base_url, "/memory?status=candidate&limit=100")
|
|
all_candidates = result.get("memories", [])
|
|
candidates = [c for c in all_candidates if c["id"] not in seen_ids]
|
|
|
|
if not candidates:
|
|
if batch_num == 1:
|
|
print("queue empty, nothing to triage")
|
|
else:
|
|
print(f"\nQueue drained after batch {batch_num-1}.")
|
|
break
|
|
|
|
print(f"\n=== batch {batch_num}: {len(candidates)} candidates dry_run: {args.dry_run} ===")
|
|
|
|
for i, cand in enumerate(candidates, 1):
|
|
if i > 1:
|
|
time.sleep(0.5)
|
|
seen_ids.add(cand["id"])
|
|
mid = cand["id"]
|
|
label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]"
|
|
|
|
try:
|
|
action, note = process_candidate(
|
|
cand, args.base_url, active_cache, state_cache,
|
|
known_projects, args.dry_run,
|
|
)
|
|
except Exception as e:
|
|
action, note = ("error", f"exception: {e}")
|
|
|
|
counts[action] = counts.get(action, 0) + 1
|
|
verb = {"promote": "PROMOTED ", "reject": "REJECTED ",
|
|
"discard": "DISCARDED ", "human": "NEEDS_HUM ",
|
|
"error": "ERROR "}.get(action, action.upper())
|
|
if args.dry_run and action in ("promote", "reject", "discard"):
|
|
verb = "WOULD " + verb.strip()
|
|
print(f" {verb} {label} {note[:120]}")
|
|
|
|
print(
|
|
f"\ntotal: promoted={counts['promote']} rejected={counts['reject']} "
|
|
f"discarded={counts['discard']} human={counts['human']} errors={counts['error']} "
|
|
f"batches={batch_num}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|