Adds structural metadata that the LLM triage was already implicitly
reasoning about ("stale snapshot" → reject). Phase 3 captures that
reasoning as fields so it can DRIVE retrieval, not just rejection.
Schema (src/atocore/models/database.py):
- domain_tags TEXT DEFAULT '[]' JSON array of lowercase topic keywords
- valid_until DATETIME ISO date; null = permanent
- idx_memories_valid_until index for efficient expiry queries
Memory service (src/atocore/memory/service.py):
- Memory dataclass gains domain_tags + valid_until
- create_memory, update_memory accept/persist both
- _row_to_memory safely reads both (JSON-decode + null handling)
- _normalize_tags helper: lowercase, dedup, strip, cap at 10
- get_memories_for_context filters expired (valid_until < today UTC)
- _rank_memories_for_query adds tag-boost: memories whose domain_tags
appear as substrings in query text rank higher (tertiary key after
content-overlap density + absolute overlap, before confidence)
LLM extractor (_llm_prompt.py → llm-0.5.0):
- SYSTEM_PROMPT documents domain_tags (2-5 keywords) + valid_until
(time-bounded facts get expiry dates; durable facts stay null)
- normalize_candidate_item parses both fields from model output with
graceful fallback for string/null/missing
LLM triage (scripts/auto_triage.py):
- TRIAGE_SYSTEM_PROMPT documents same two fields
- parse_verdict extracts them from verdict JSON
- On promote: PUT /memory/{id} with tags + valid_until BEFORE
POST /memory/{id}/promote, so active memories carry them
API (src/atocore/api/routes.py):
- MemoryCreateRequest: adds domain_tags, valid_until
- MemoryUpdateRequest: adds domain_tags, valid_until, memory_type
- GET /memory response exposes domain_tags + valid_until + created_at
Triage UI (src/atocore/engineering/triage_ui.py):
- Renders existing tags as colored badges
- Adds inline text field for tags (comma-separated) + date picker for
valid_until on every candidate card
- Save&Promote button persists edits via PUT then promotes
- Plain Promote (and Y shortcut) also saves tags/expiry if edited
Wiki (src/atocore/engineering/wiki.py):
- Search now matches memory content OR domain_tags
- Search results render tags as clickable badges linking to
/wiki/search?q=<tag> for cross-project navigation
- valid_until shown as amber "valid until YYYY-MM-DD" hint
Tests: 303 → 308 (5 new for Phase 3 behavior):
- test_create_memory_with_tags_and_valid_until
- test_create_memory_normalizes_tags
- test_update_memory_sets_tags_and_valid_until
- test_get_memories_for_context_excludes_expired
- test_context_builder_tag_boost_orders_results
Deferred (explicitly): temporal_scope enum, source_refs memory graph,
HDBSCAN clustering, memory detail wiki page, backfill of existing
actives. See docs/MASTER-BRAIN-PLAN.md.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
352 lines
14 KiB
Python
352 lines
14 KiB
Python
"""Auto-triage: LLM second-pass over candidate memories.
|
|
|
|
Fetches all status=candidate memories from the AtoCore API, asks
|
|
a triage model (via claude -p) to classify each as promote / reject /
|
|
needs_human, and executes the verdict via the promote/reject endpoints.
|
|
Only needs_human candidates remain in the queue for manual review.
|
|
|
|
Trust model:
|
|
- Auto-promote: model says promote AND confidence >= 0.8 AND no
|
|
duplicate content in existing active memories
|
|
- Auto-reject: model says reject
|
|
- needs_human: everything else stays in queue
|
|
|
|
Runs host-side (same as batch extraction) because it needs the
|
|
claude CLI. Intended to be called after batch-extract.sh in the
|
|
nightly cron, or manually.
|
|
|
|
Usage:
|
|
|
|
python3 scripts/auto_triage.py --base-url http://localhost:8100
|
|
python3 scripts/auto_triage.py --dry-run # preview without executing
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
DEFAULT_MODEL = os.environ.get("ATOCORE_TRIAGE_MODEL", "sonnet")
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TRIAGE_TIMEOUT_S", "60"))
|
|
AUTO_PROMOTE_MIN_CONFIDENCE = 0.8
|
|
|
|
TRIAGE_SYSTEM_PROMPT = """You are a memory triage reviewer for a personal context engine called AtoCore. You review candidate memories extracted from LLM conversations and decide whether each should be promoted to active status, rejected, or flagged for human review.
|
|
|
|
You will receive:
|
|
- The candidate memory content and type
|
|
- A list of existing active memories for the same project (to check for duplicates)
|
|
|
|
For each candidate, output exactly one JSON object:
|
|
|
|
{"verdict": "promote|reject|needs_human|contradicts", "confidence": 0.0-1.0, "reason": "one sentence", "conflicts_with": "id of existing memory if contradicts", "domain_tags": ["tag1","tag2"], "valid_until": null}
|
|
|
|
DOMAIN TAGS (Phase 3): A lowercase list of 2-5 topical keywords describing
|
|
the SUBJECT matter (not the project). This enables cross-project retrieval:
|
|
a query about "optics" can pull matches from p04 + p05 + p06.
|
|
|
|
Good tags are single lowercase words or hyphenated terms. Mix:
|
|
- domain keywords (optics, thermal, firmware, materials, controls)
|
|
- project tokens when clearly scoped (p04, p05, p06, abb)
|
|
- lifecycle/activity words (procurement, design, validation, vendor)
|
|
|
|
Always emit domain_tags on a promote. For reject, empty list is fine.
|
|
|
|
VALID_UNTIL (Phase 3): ISO date "YYYY-MM-DD" OR null (permanent).
|
|
Set to a near-future date when the candidate is time-bounded:
|
|
- Status snapshots ("current blocker is X") → ~2 weeks out
|
|
- Scheduled events ("meeting Friday") → event date
|
|
- Quotes with expiry → quote expiry date
|
|
Leave null for durable decisions, engineering insights, ratified requirements.
|
|
|
|
Rules:
|
|
|
|
1. PROMOTE when the candidate states a durable architectural fact, ratified decision, standing rule, or engineering constraint that is NOT already covered by an existing active memory. Confidence should reflect how certain you are this is worth keeping.
|
|
|
|
2. REJECT when the candidate is:
|
|
- A stale point-in-time snapshot ("live SHA is X", "36 active memories")
|
|
- An implementation detail too granular to be useful as standalone context
|
|
- A planned-but-not-implemented feature description
|
|
- A duplicate or near-duplicate of an existing active memory
|
|
- A session observation or conversational filler
|
|
- A process rule that belongs in DEV-LEDGER.md or AGENTS.md, not memory
|
|
|
|
3. CONTRADICTS when the candidate *conflicts* with an existing active memory (not a duplicate, but states something that can't both be true). Set `conflicts_with` to the existing memory id. This flags the tension for human review instead of silently rejecting or double-storing. Examples: "Option A selected" vs "Option B selected" for the same decision; "uses material X" vs "uses material Y" for the same component.
|
|
|
|
4. OPENCLAW-CURATED content (candidate content starts with "From OpenClaw/"): apply a MUCH LOWER bar. OpenClaw's SOUL.md, USER.md, MEMORY.md, MODEL-ROUTING.md, and dated memory/*.md files are ALREADY curated by OpenClaw as canonical continuity. Promote unless clearly wrong or a genuine duplicate. Do NOT reject OpenClaw content as "process rule belongs elsewhere" or "session log" — that's exactly what AtoCore wants to absorb. Session events, project updates, stakeholder notes, and decisions from OpenClaw daily memory files ARE valuable context and should promote.
|
|
|
|
5. NEEDS_HUMAN when you're genuinely unsure — the candidate might be valuable but you can't tell without domain knowledge. This should be rare (< 20% of candidates).
|
|
|
|
6. Output ONLY the JSON object. No prose, no markdown, no explanation outside the reason field."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-triage-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body=None, timeout=10):
|
|
data = json.dumps(body or {}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def fetch_active_memories_for_project(base_url, project):
|
|
"""Fetch active memories for dedup checking."""
|
|
params = "active_only=true&limit=50"
|
|
if project:
|
|
params += f"&project={urllib.parse.quote(project)}"
|
|
result = api_get(base_url, f"/memory?{params}")
|
|
return result.get("memories", [])
|
|
|
|
|
|
def triage_one(candidate, active_memories, model, timeout_s):
|
|
"""Ask the triage model to classify one candidate."""
|
|
if not shutil.which("claude"):
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "claude CLI not available"}
|
|
|
|
active_summary = "\n".join(
|
|
f"- [{m['memory_type']}] {m['content'][:150]}"
|
|
for m in active_memories[:20]
|
|
) or "(no active memories for this project)"
|
|
|
|
user_message = (
|
|
f"CANDIDATE TO TRIAGE:\n"
|
|
f" type: {candidate['memory_type']}\n"
|
|
f" project: {candidate.get('project') or '(none)'}\n"
|
|
f" content: {candidate['content']}\n\n"
|
|
f"EXISTING ACTIVE MEMORIES FOR THIS PROJECT:\n{active_summary}\n\n"
|
|
f"Return the JSON verdict now."
|
|
)
|
|
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", TRIAGE_SYSTEM_PROMPT,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
|
|
# Retry with exponential backoff on transient failures (rate limits etc)
|
|
last_error = ""
|
|
for attempt in range(3):
|
|
if attempt > 0:
|
|
time.sleep(2 ** attempt) # 2s, 4s
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
last_error = "triage model timed out"
|
|
continue
|
|
except Exception as exc:
|
|
last_error = f"subprocess error: {exc}"
|
|
continue
|
|
|
|
if completed.returncode == 0:
|
|
raw = (completed.stdout or "").strip()
|
|
return parse_verdict(raw)
|
|
|
|
# Capture stderr for diagnostics (truncate to 200 chars)
|
|
stderr = (completed.stderr or "").strip()[:200]
|
|
last_error = f"claude exit {completed.returncode}: {stderr}" if stderr else f"claude exit {completed.returncode}"
|
|
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": last_error}
|
|
|
|
|
|
def parse_verdict(raw):
|
|
"""Parse the triage model's JSON verdict."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text.lstrip().startswith("{"):
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return {"verdict": "needs_human", "confidence": 0.0, "reason": "failed to parse triage output"}
|
|
|
|
verdict = str(parsed.get("verdict", "needs_human")).strip().lower()
|
|
if verdict not in {"promote", "reject", "needs_human", "contradicts"}:
|
|
verdict = "needs_human"
|
|
|
|
confidence = parsed.get("confidence", 0.5)
|
|
try:
|
|
confidence = max(0.0, min(1.0, float(confidence)))
|
|
except (TypeError, ValueError):
|
|
confidence = 0.5
|
|
|
|
reason = str(parsed.get("reason", "")).strip()[:200]
|
|
conflicts_with = str(parsed.get("conflicts_with", "")).strip()
|
|
|
|
# Phase 3: domain tags + expiry
|
|
raw_tags = parsed.get("domain_tags") or []
|
|
if isinstance(raw_tags, str):
|
|
raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()]
|
|
if not isinstance(raw_tags, list):
|
|
raw_tags = []
|
|
domain_tags = []
|
|
for t in raw_tags[:10]:
|
|
if not isinstance(t, str):
|
|
continue
|
|
tag = t.strip().lower()
|
|
if tag and tag not in domain_tags:
|
|
domain_tags.append(tag)
|
|
|
|
valid_until = parsed.get("valid_until")
|
|
if valid_until is None:
|
|
valid_until = ""
|
|
else:
|
|
valid_until = str(valid_until).strip()
|
|
if valid_until.lower() in ("", "null", "none", "permanent"):
|
|
valid_until = ""
|
|
|
|
return {
|
|
"verdict": verdict,
|
|
"confidence": confidence,
|
|
"reason": reason,
|
|
"conflicts_with": conflicts_with,
|
|
"domain_tags": domain_tags,
|
|
"valid_until": valid_until,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Auto-triage candidate memories")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--model", default=DEFAULT_MODEL)
|
|
parser.add_argument("--dry-run", action="store_true", help="preview without executing")
|
|
parser.add_argument("--max-batches", type=int, default=20,
|
|
help="Max batches of 100 to process per run (default 20 = 2000 candidates)")
|
|
args = parser.parse_args()
|
|
|
|
# Track IDs we've already seen so needs_human items don't re-process
|
|
# every batch (they stay in the candidate table until a human reviews).
|
|
seen_ids: set[str] = set()
|
|
active_cache: dict[str, list] = {}
|
|
promoted = rejected = needs_human = errors = 0
|
|
batch_num = 0
|
|
|
|
while batch_num < args.max_batches:
|
|
batch_num += 1
|
|
|
|
result = api_get(args.base_url, "/memory?status=candidate&limit=100")
|
|
all_candidates = result.get("memories", [])
|
|
# Filter out already-seen (needs_human from prior batch in same run)
|
|
candidates = [c for c in all_candidates if c["id"] not in seen_ids]
|
|
|
|
if not candidates:
|
|
if batch_num == 1:
|
|
print("queue empty, nothing to triage")
|
|
else:
|
|
print(f"\nQueue drained after batch {batch_num-1}.")
|
|
break
|
|
|
|
print(f"\n=== batch {batch_num}: {len(candidates)} candidates model: {args.model} dry_run: {args.dry_run} ===")
|
|
|
|
for i, cand in enumerate(candidates, 1):
|
|
if i > 1:
|
|
time.sleep(0.5)
|
|
|
|
seen_ids.add(cand["id"])
|
|
project = cand.get("project") or ""
|
|
if project not in active_cache:
|
|
active_cache[project] = fetch_active_memories_for_project(args.base_url, project)
|
|
|
|
verdict_obj = triage_one(cand, active_cache[project], args.model, DEFAULT_TIMEOUT_S)
|
|
verdict = verdict_obj["verdict"]
|
|
conf = verdict_obj["confidence"]
|
|
reason = verdict_obj["reason"]
|
|
conflicts_with = verdict_obj.get("conflicts_with", "")
|
|
|
|
mid = cand["id"]
|
|
label = f"[{i:2d}/{len(candidates)}] {mid[:8]} [{cand['memory_type']}]"
|
|
|
|
if verdict == "promote" and conf >= AUTO_PROMOTE_MIN_CONFIDENCE:
|
|
if args.dry_run:
|
|
print(f" WOULD PROMOTE {label} conf={conf:.2f} {reason}")
|
|
else:
|
|
# Phase 3: update tags + valid_until on the candidate
|
|
# before promoting, so the active memory carries them.
|
|
tags = verdict_obj.get("domain_tags") or []
|
|
valid_until = verdict_obj.get("valid_until") or ""
|
|
if tags or valid_until:
|
|
try:
|
|
import urllib.request as _ur
|
|
body = json.dumps({
|
|
"domain_tags": tags,
|
|
"valid_until": valid_until,
|
|
}).encode("utf-8")
|
|
req = _ur.Request(
|
|
f"{args.base_url}/memory/{mid}", method="PUT",
|
|
headers={"Content-Type": "application/json"}, data=body,
|
|
)
|
|
_ur.urlopen(req, timeout=10).read()
|
|
except Exception:
|
|
pass # non-fatal; promote anyway
|
|
try:
|
|
api_post(args.base_url, f"/memory/{mid}/promote")
|
|
print(f" PROMOTED {label} conf={conf:.2f} {reason}")
|
|
active_cache[project].append(cand)
|
|
except Exception:
|
|
errors += 1
|
|
promoted += 1
|
|
elif verdict == "reject":
|
|
if args.dry_run:
|
|
print(f" WOULD REJECT {label} conf={conf:.2f} {reason}")
|
|
else:
|
|
try:
|
|
api_post(args.base_url, f"/memory/{mid}/reject")
|
|
print(f" REJECTED {label} conf={conf:.2f} {reason}")
|
|
except Exception:
|
|
errors += 1
|
|
rejected += 1
|
|
elif verdict == "contradicts":
|
|
print(f" CONTRADICTS {label} vs {conflicts_with[:8] if conflicts_with else '?'} {reason}")
|
|
needs_human += 1
|
|
else:
|
|
print(f" NEEDS_HUMAN {label} conf={conf:.2f} {reason}")
|
|
needs_human += 1
|
|
|
|
print(f"\ntotal: promoted={promoted} rejected={rejected} needs_human={needs_human} errors={errors} batches={batch_num}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|