Closes three live-affecting bugs surfaced by the 2026-04-29 Codex review,
all in the memory write/read path. Pre-deploy on Dalidou the live
discrepancy was dashboard.memories.active=315 vs integrity active=1091.
1. /admin/dashboard counts now SQL-aggregate (no sampling).
New get_memory_count_summary() helper. Dashboard memories.{active,
candidates,by_type,by_project,reinforced,by_status,total} all derive
from full-table SQL, not a confidence-sorted limit=500 sample. Post
deploy the dashboard active count must match the integrity panel.
2. PUT /memory/{id} accepts project; auto-triage now applies it.
Added project to MemoryUpdateRequest and update_memory() with
resolve_project_name canonicalization, before/after audit, and
duplicate-active check scoped to the new project. scripts/auto_triage.py
suggested-project correction now PUTs {"project": suggested} so
misattribution flags actually retarget the memory.
3. POST /memory/{id}/invalidate uses direct id lookup.
New get_memory(id) helper. Replaces the old
_get_memories(status="active", limit=1) lookup, which only saw the
highest-confidence active row. Active memories outside slot 0 no
longer 404. Same status-guard structure applied to
POST /memory/{id}/supersede so candidates can't silently flip to
superseded.
14 regression tests added (572 -> 586 locally). Reviewed by Codex twice:
verdict GO on tip 9604c3e.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
556 lines
24 KiB
Python
556 lines
24 KiB
Python
"""Auto-triage: LLM second-pass over candidate memories.
|
|
|
|
Fetches all status=candidate memories from the AtoCore API, asks
|
|
a triage model (via claude -p) to classify each as promote / reject /
|
|
needs_human, and executes the verdict via the promote/reject endpoints.
|
|
Only needs_human candidates remain in the queue for manual review.
|
|
|
|
Trust model:
|
|
- Auto-promote: model says promote AND confidence >= 0.8 AND no
|
|
duplicate content in existing active memories
|
|
- Auto-reject: model says reject
|
|
- needs_human: everything else stays in queue
|
|
|
|
Runs host-side (same as batch extraction) because it needs the
|
|
claude CLI. Intended to be called after batch-extract.sh in the
|
|
nightly cron, or manually.
|
|
|
|
Usage:
|
|
|
|
python3 scripts/auto_triage.py --base-url http://localhost:8100
|
|
python3 scripts/auto_triage.py --dry-run # preview without executing
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
|
|
# 3-tier escalation config (Phase "Triage Quality")
|
|
TIER1_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL_TIER1",
|
|
os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet"))
|
|
TIER2_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL_TIER2", "opus")
|
|
# Tier 3: default "discard" (auto-reject uncertain after opus disagrees/wavers),
|
|
# alternative "human" routes them to /admin/triage.
|
|
TIER3_ACTION = os.environ.get("ATOCORE_TRIAGE_TIER3", "discard").lower()
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60"))
|
|
TIER2_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIER2_TIMEOUT_S", "120"))
|
|
AUTO_PROMOTE_MIN_CONFIDENCE = 0.8
|
|
# Below this, tier 1 decision is "not confident enough" and we escalate
|
|
ESCALATION_CONFIDENCE_THRESHOLD = float(
|
|
os.environ.get("ATOCORE_TRIAGE_ESCALATION_THRESHOLD", "0.75")
|
|
)
|
|
|
|
# Kept for legacy callers that reference DEFAULT_MODEL
|
|
DEFAULT_MODEL = TIER1_MODEL
|
|
|
|
TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review.
|
|
|
|
You will receive:
|
|
- The candidate memory content, type, and claimed project
|
|
- A list of existing active memories for the same project (to check for duplicates + contradictions)
|
|
- Trusted project state entries (curated ground truth — higher trust than memories)
|
|
- Known project ids so you can flag misattribution
|
|
|
|
For each candidate, output exactly one JSON object:
|
|
|
|
{"verdict": "promote|reject|needs_human|contradicts", "confidence": 0.0-1.0, "reason": "one sentence", "conflicts_with": "id of existing memory if contradicts", "domain_tags": ["tag1","tag2"], "valid_until": null}
|
|
|
|
DOMAIN TAGS (Phase 3): A lowercase list of 2-5 topical keywords describing
|
|
the SUBJECT matter (not the project). This enables cross-project retrieval:
|
|
a query about "optics" can pull matches from p04 + p05 + p06.
|
|
|
|
Good tags are single lowercase words or hyphenated terms. Mix:
|
|
- domain keywords (optics, thermal, firmware, materials, controls)
|
|
- project tokens when clearly scoped (p04, p05, p06, abb)
|
|
- lifecycle/activity words (procurement, design, validation, vendor)
|
|
|
|
Always emit domain_tags on a promote. For reject, empty list is fine.
|
|
|
|
VALID_UNTIL (Phase 3): ISO date "YYYY-MM-DD" OR null (permanent).
|
|
Set to a near-future date when the candidate is time-bounded:
|
|
- Status snapshots ("current blocker is X") → ~2 weeks out
|
|
- Scheduled events ("meeting Friday") → event date
|
|
- Quotes with expiry → quote expiry date
|
|
Leave null for durable decisions, engineering insights, ratified requirements.
|
|
|
|
Rules:
|
|
|
|
1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping.
|
|
|
|
2. REJECT when the candidate is:
|
|
- A stale point-in-time snapshot ("live SHA is X", "36 active memories")
|
|
- An implementation detail too granular to be useful as standalone context
|
|
- A planned-but-not-implemented feature description
|
|
- A duplicate or near-duplicate of an existing active memory
|
|
- A session observation or conversational filler
|
|
- A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory
|
|
|
|
3. CONTRADICTS when the candidate *conflicts* with an existing active memory (not a duplicate, but states something that can't both be true). Set `conflicts_with` to the existing memory id. This flags the tension for human review instead of silently rejecting or double-storing. Examples: "Option A selected" vs "Option B selected" for the same decision; "uses material X" vs "uses material Y" for the same component.
|
|
|
|
4. OPENCLAW-CURATED content (candidate content starts with "From OpenClaw/"): apply a MUCH LOWER bar. OpenClaw's SOUL.md, USER.md, MEMORY.md, MODEL-ROUTING.md, and dated memory/*.md files are ALREADY curated by OpenClaw as canonical continuity. Promote unless clearly wrong or a genuine duplicate. Do NOT reject OpenClaw content as "process rule belongs elsewhere" or "session log" — that's exactly what AtoCore wants to absorb. Session events, project updates, stakeholder notes, and decisions from OpenClaw daily memory files ARE valuable context and should promote.
|
|
|
|
5. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates). If this is just noise/filler, prefer REJECT with low confidence.
|
|
|
|
6. PROJECT VALIDATION: The candidate has a "claimed project". You'll see the list of registered project ids. If the claimed project doesn't match any registered id AND the content clearly belongs to a registered project, include "suggested_project": "<correct_id>" in your output so the caller can auto-fix the attribution. If the content is genuinely cross-project or global, leave project empty (suggested_project=""). Misattribution is the #1 pollution source — flag it.
|
|
|
|
7. TEMPORAL SENSITIVITY: Be aggressive with valid_until for anything that reads like "current state", "right now", "this week", "as of". Stale facts pollute context. When in doubt, set a 2-4 week expiry rather than null.
|
|
|
|
8. CONFIDENCE GRADING:
|
|
- 0.9+: crystal clear durable fact or clear noise
|
|
- 0.75-0.9: confident but not cryptographic-certain
|
|
- 0.6-0.75: borderline — will escalate to opus for second opinion
|
|
- <0.6: genuinely ambiguous — needs human or will be discarded
|
|
|
|
9. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field. Include optional "suggested_project" field when misattribution detected."""
|
|
|
|
|
|
TIER2_SECOND_OPINION_PROMPT = TRIAGE_SYSTEM_PROMPT + """
|
|
|
|
ESCALATED REVIEW: You are seeing this candidate because the tier-1 (sonnet) reviewer could not decide confidently. You will be shown tier-1's verdict + reason as additional context. Your job is to resolve the uncertainty with more careful thinking. Use your full context window to cross-reference the existing memories. If you ALSO cannot decide with confidence >= 0.8, output verdict="needs_human" with a clear explanation of what information would break the tie. That signal will route to a human (or auto-discard, depending on config)."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body=None, timeout=10):
|
|
data = json.dumps(body or {}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def fetch_active_memories_for_project(base_url, project):
|
|
"""Fetch active memories for dedup checking."""
|
|
params = "active_only=true&limit=50"
|
|
if project:
|
|
params += f"&project={urllib.parse.quote(project)}"
|
|
result = api_get(base_url, f"/memory?{params}")
|
|
return result.get("memories", [])
|
|
|
|
|
|
def fetch_project_state(base_url, project):
|
|
"""Fetch trusted project state for ground-truth context."""
|
|
if not project:
|
|
return []
|
|
try:
|
|
result = api_get(base_url, f"/project/state/{urllib.parse.quote(project)}")
|
|
return result.get("entries", result.get("state", []))
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def fetch_registered_projects(base_url):
|
|
"""Return list of registered project ids + aliases for misattribution check."""
|
|
try:
|
|
result = api_get(base_url, "/projects")
|
|
projects = result.get("projects", [])
|
|
out = {}
|
|
for p in projects:
|
|
pid = p.get("project_id") or p.get("id") or p.get("name")
|
|
if pid:
|
|
out[pid] = p.get("aliases", []) or []
|
|
return out
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def build_triage_user_message(candidate, active_memories, project_state, known_projects):
|
|
"""Richer context for the triage model: memories + state + project registry."""
|
|
active_summary = "\n".join(
|
|
f"- [{m['memory_type']}] {m['content'][:200]}"
|
|
for m in active_memories[:30]
|
|
) or "(no active memories for this project)"
|
|
|
|
state_summary = ""
|
|
if project_state:
|
|
lines = []
|
|
for e in project_state[:20]:
|
|
cat = e.get("category", "?")
|
|
key = e.get("key", "?")
|
|
val = (e.get("value") or "")[:200]
|
|
lines.append(f"- [{cat}/{key}] {val}")
|
|
state_summary = "\n".join(lines)
|
|
else:
|
|
state_summary = "(no trusted state entries for this project)"
|
|
|
|
projects_line = ", ".join(sorted(known_projects.keys())) if known_projects else "(none)"
|
|
|
|
return (
|
|
f"CANDIDATE TO TRIAGE:\n"
|
|
f" type: {candidate['memory_type']}\n"
|
|
f" claimed project: {candidate.get('project') or '(none)'}\n"
|
|
f" content: {candidate['content']}\n\n"
|
|
f"REGISTERED PROJECT IDS: {projects_line}\n\n"
|
|
f"TRUSTED PROJECT STATE (ground truth, higher trust than memories):\n{state_summary}\n\n"
|
|
f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n"
|
|
f"Return the JSON verdict now."
|
|
)
|
|
|
|
|
|
def _call_claude(system_prompt, user_message, model, timeout_s):
|
|
"""Shared CLI caller with retry + stderr capture."""
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", system_prompt,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
last_error = ""
|
|
for attempt in range(3):
|
|
if attempt > 0:
|
|
time.sleep(2 ** attempt)
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
last_error = f"{model} timed out"
|
|
continue
|
|
except Exception as exc:
|
|
last_error = f"subprocess error: {exc}"
|
|
continue
|
|
|
|
if completed.returncode == 0:
|
|
return (completed.stdout or "").strip(), None
|
|
|
|
stderr = (completed.stderr or "").strip()[:200]
|
|
last_error = f"{model} exit {completed.returncode}: {stderr}" if stderr else f"{model} exit {completed.returncode}"
|
|
return None, last_error
|
|
|
|
|
|
def triage_one(candidate, active_memories, project_state, known_projects, model, timeout_s):
|
|
"""Tier-1 triage: ask the cheap model for a verdict."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
user_message = build_triage_user_message(candidate, active_memories, project_state, known_projects)
|
|
raw, err = _call_claude(TRIAGE_SYSTEM_PROMPT, user_message, model, timeout_s)
|
|
if err:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": err}
|
|
return parse_verdict(raw)
|
|
|
|
|
|
def triage_escalation(candidate, tier1_verdict, active_memories, project_state, known_projects, model, timeout_s):
|
|
"""Tier-2 escalation: opus sees tier-1's verdict + reasoning, tries again."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
base_msg = build_triage_user_message(candidate, active_memories, project_state, known_projects)
|
|
tier1_context = (
|
|
f"\nTIER-1 REVIEW (sonnet, for your reference):\n"
|
|
f" verdict: {tier1_verdict.get('verdict')}\n"
|
|
f" confidence: {tier1_verdict.get('confidence', 0.0):.2f}\n"
|
|
f" reason: {tier1_verdict.get('reason', '')[:300]}\n\n"
|
|
f"Resolve the uncertainty. If you also can't decide with confidence ≥ 0.8, "
|
|
f"return verdict='needs_human' with a specific explanation of what information "
|
|
f"would break the tie.\n\nReturn the JSON verdict now."
|
|
)
|
|
raw, err = _call_claude(TIER2_SECOND_OPINION_PROMPT, base_msg + tier1_context, model, timeout_s)
|
|
if err:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": f"tier2: {err}"}
|
|
return parse_verdict(raw)
|
|
|
|
|
|
def parse_verdict(raw):
|
|
"""Parse the triage model's JSON verdict."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text.lstrip().startswith("{"):
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"}
|
|
|
|
verdict = str(parsed.get("verdict", "needs_human")).strip().lower()
|
|
if verdict not in {"promote", "reject", "needs_human", "contradicts"}:
|
|
verdict = "needs_human"
|
|
|
|
confidence = parsed.get("confidence", 0.5)
|
|
try:
|
|
confidence = max(0.0, min(1.0, float(confidence)))
|
|
except (TypeError, ValueError):
|
|
confidence = 0.5
|
|
|
|
reason = str(parsed.get("reason", "")).strip()[:200]
|
|
conflicts_with = str(parsed.get("conflicts_with", "")).strip()
|
|
|
|
# Phase 3: domain tags + expiry
|
|
raw_tags = parsed.get("domain_tags") or []
|
|
if isinstance(raw_tags, str):
|
|
raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()]
|
|
if not isinstance(raw_tags, list):
|
|
raw_tags = []
|
|
domain_tags = []
|
|
for t in raw_tags[:10]:
|
|
if not isinstance(t, str):
|
|
continue
|
|
tag = t.strip().lower()
|
|
if tag and tag not in domain_tags:
|
|
domain_tags.append(tag)
|
|
|
|
valid_until = parsed.get("valid_until")
|
|
if valid_until is None:
|
|
valid_until = ""
|
|
else:
|
|
valid_until = str(valid_until).strip()
|
|
if valid_until.lower() in ("", "null", "none", "permanent"):
|
|
valid_until = ""
|
|
|
|
# Triage Quality: project misattribution flag
|
|
suggested_project = str(parsed.get("suggested_project", "")).strip()
|
|
|
|
return {
|
|
"verdict": verdict,
|
|
"confidence": confidence,
|
|
"reason": reason,
|
|
"conflicts_with": conflicts_with,
|
|
"domain_tags": domain_tags,
|
|
"valid_until": valid_until,
|
|
"suggested_project": suggested_project,
|
|
}
|
|
|
|
|
|
def _apply_metadata_update(base_url, mid, verdict_obj):
|
|
"""Persist tags + valid_until + suggested_project before the promote call."""
|
|
tags = verdict_obj.get("domain_tags") or []
|
|
valid_until = verdict_obj.get("valid_until") or ""
|
|
suggested = verdict_obj.get("suggested_project") or ""
|
|
|
|
body = {}
|
|
if tags:
|
|
body["domain_tags"] = tags
|
|
if valid_until:
|
|
body["valid_until"] = valid_until
|
|
if not body and not suggested:
|
|
return
|
|
|
|
if body:
|
|
try:
|
|
import urllib.request as _ur
|
|
req = _ur.Request(
|
|
f"{base_url}/memory/{mid}", method="PUT",
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps(body).encode("utf-8"),
|
|
)
|
|
_ur.urlopen(req, timeout=10).read()
|
|
except Exception:
|
|
pass
|
|
|
|
# Project auto-fix via direct SQLite update would bypass audit; use PUT if supported.
|
|
# For now we log the suggestion — operator script can apply it in batch.
|
|
if suggested:
|
|
# noop here — handled by caller which tracks suggested_project_fixes
|
|
pass
|
|
|
|
|
|
def process_candidate(cand, base_url, active_cache, state_cache, known_projects, dry_run):
|
|
"""Run the 3-tier triage and apply the resulting action.
|
|
|
|
Returns (action, note) where action in {promote, reject, discard, human, error}.
|
|
"""
|
|
mid = cand["id"]
|
|
project = cand.get("project") or ""
|
|
if project not in active_cache:
|
|
active_cache[project] = fetch_active_memories_for_project(base_url, project)
|
|
if project not in state_cache:
|
|
state_cache[project] = fetch_project_state(base_url, project)
|
|
|
|
# === Tier 1 ===
|
|
v1 = triage_one(
|
|
cand, active_cache[project], state_cache[project],
|
|
known_projects, TIER1_MODEL, DEFAULT_TIMEOUT_S,
|
|
)
|
|
|
|
# Project misattribution fix: suggested_project surfaces from tier 1.
|
|
# Earlier code POSTed only {"content": cand["content"]}, which left
|
|
# the project field unchanged because MemoryUpdateRequest had no
|
|
# project key and the service signature didn't accept one. Wave 1
|
|
# added project to MemoryUpdateRequest and update_memory(); this
|
|
# caller now actually applies the suggested project.
|
|
suggested = (v1.get("suggested_project") or "").strip()
|
|
if suggested and suggested != project and suggested in known_projects:
|
|
if not dry_run:
|
|
try:
|
|
import urllib.request as _ur
|
|
req = _ur.Request(
|
|
f"{base_url}/memory/{mid}", method="PUT",
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps({"project": suggested}).encode("utf-8"),
|
|
)
|
|
_ur.urlopen(req, timeout=10).read()
|
|
except Exception:
|
|
pass
|
|
print(f" ↺ misattribution flagged: {project!r} → {suggested!r}")
|
|
|
|
# High-confidence tier 1 decision → act
|
|
if v1["verdict"] in ("promote", "reject") and v1["confidence"] >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
return _apply_verdict(v1, cand, base_url, active_cache, dry_run, tier="sonnet")
|
|
|
|
# Borderline or uncertain → escalate to tier 2 (opus)
|
|
print(f" ↑ escalating (tier1 verdict={v1['verdict']} conf={v1['confidence']:.2f})")
|
|
v2 = triage_escalation(
|
|
cand, v1, active_cache[project], state_cache[project],
|
|
known_projects, TIER2_MODEL, TIER2_TIMEOUT_S,
|
|
)
|
|
|
|
# Tier 2 is confident → act
|
|
if v2["verdict"] in ("promote", "reject") and v2["confidence"] >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
return _apply_verdict(v2, cand, base_url, active_cache, dry_run, tier="opus")
|
|
|
|
# Tier 3: still uncertain — route per config
|
|
if TIER3_ACTION == "discard":
|
|
reason = f"tier1+tier2 uncertain: {v2.get('reason', '')[:150]}"
|
|
if dry_run:
|
|
return ("discard", reason)
|
|
try:
|
|
api_post(base_url, f"/memory/{mid}/reject")
|
|
except Exception:
|
|
return ("error", reason)
|
|
return ("discard", reason)
|
|
else:
|
|
# "human" — leave in queue for /admin/triage review
|
|
return ("human", v2.get("reason", "no reason")[:200])
|
|
|
|
|
|
def _apply_verdict(verdict_obj, cand, base_url, active_cache, dry_run, tier):
|
|
"""Execute the promote/reject action and update metadata."""
|
|
mid = cand["id"]
|
|
verdict = verdict_obj["verdict"]
|
|
conf = verdict_obj["confidence"]
|
|
reason = f"[{tier}] {verdict_obj['reason']}"
|
|
|
|
if verdict == "promote":
|
|
if dry_run:
|
|
return ("promote", reason)
|
|
_apply_metadata_update(base_url, mid, verdict_obj)
|
|
try:
|
|
api_post(base_url, f"/memory/{mid}/promote")
|
|
project = cand.get("project") or ""
|
|
if project in active_cache:
|
|
active_cache[project].append(cand)
|
|
return ("promote", reason)
|
|
except Exception as e:
|
|
return ("error", f"promote failed: {e}")
|
|
else:
|
|
if dry_run:
|
|
return ("reject", reason)
|
|
try:
|
|
api_post(base_url, f"/memory/{mid}/reject")
|
|
return ("reject", reason)
|
|
except Exception as e:
|
|
return ("error", f"reject failed: {e}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Auto-triage candidate memories (3-tier escalation)")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--dry-run", action="store_true", help="preview without executing")
|
|
parser.add_argument("--max-batches", type=int, default=20,
|
|
help="Max batches of 100 to process per run")
|
|
parser.add_argument("--no-escalation", action="store_true",
|
|
help="Disable tier-2 escalation (legacy single-model behavior)")
|
|
args = parser.parse_args()
|
|
|
|
seen_ids: set[str] = set()
|
|
active_cache: dict[str, list] = {}
|
|
state_cache: dict[str, list] = {}
|
|
|
|
known_projects = fetch_registered_projects(args.base_url)
|
|
print(f"Registered projects: {sorted(known_projects.keys())}")
|
|
print(f"Tier1: {TIER1_MODEL} Tier2: {TIER2_MODEL} Tier3: {TIER3_ACTION} "
|
|
f"escalation_threshold: {ESCALATION_CONFIDENCE_THRESHOLD}")
|
|
|
|
counts = {"promote": 0, "reject": 0, "discard": 0, "human": 0, "error": 0}
|
|
batch_num = 0
|
|
|
|
while batch_num < args.max_batches:
|
|
batch_num += 1
|
|
result = api_get(args.base_url, "/memory?status=candidate&limit=100")
|
|
all_candidates = result.get("memories", [])
|
|
candidates = [c for c in all_candidates if c["id"] not in seen_ids]
|
|
|
|
if not candidates:
|
|
if batch_num == 1:
|
|
print("queue empty, nothing to triage")
|
|
else:
|
|
print(f"\nQueue drained after batch {batch_num-1}.")
|
|
break
|
|
|
|
print(f"\n=== batch {batch_num}: {len(candidates)} candidates dry_run: {args.dry_run} ===")
|
|
|
|
for i, cand in enumerate(candidates, 1):
|
|
if i > 1:
|
|
time.sleep(0.5)
|
|
seen_ids.add(cand["id"])
|
|
mid = cand["id"]
|
|
label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]"
|
|
|
|
try:
|
|
action, note = process_candidate(
|
|
cand, args.base_url, active_cache, state_cache,
|
|
known_projects, args.dry_run,
|
|
)
|
|
except Exception as e:
|
|
action, note = ("error", f"exception: {e}")
|
|
|
|
counts[action] = counts.get(action, 0) + 1
|
|
verb = {"promote": "PROMOTED ", "reject": "REJECTED ",
|
|
"discard": "DISCARDED ", "human": "NEEDS_HUM ",
|
|
"error": "ERROR "}.get(action, action.upper())
|
|
if args.dry_run and action in ("promote", "reject", "discard"):
|
|
verb = "WOULD " + verb.strip()
|
|
print(f" {verb} {label} {note[:120]}")
|
|
|
|
print(
|
|
f"\ntotal: promoted={counts['promote']} rejected={counts['reject']} "
|
|
f"discarded={counts['discard']} human={counts['human']} errors={counts['error']} "
|
|
f"batches={batch_num}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|