Batch 3, Days 1-3. The core R9 failure was Case F: when the model returned a registered project DIFFERENT from the interaction's known scope, the old code trusted the model because the project was registered. A p06-polisher interaction could silently produce a p04-gigabit candidate. New rule (trust hierarchy): 1. Interaction scope always wins when set (cases A, C, E, F) 2. Model project used only for unscoped interactions AND only when it resolves to a registered project (cases D, G) 3. Empty string when both are empty or unregistered (case B) The rule is: interaction.project is the strongest signal because it comes from the capture hook's project detection, which runs before the LLM ever sees the content. The model's project guess is only useful when the capture hook had no project context. 7 case tests (A-G) cover every combination of model/interaction project state. Pre-existing tests updated for the new behavior. Host-side script mirrors the same hierarchy using _known_projects fetched from GET /projects at startup. Test count: 286 -> 290 (+4 net, 7 new R9 cases, 3 old tests consolidated). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
301 lines
11 KiB
Python
301 lines
11 KiB
Python
"""Host-side LLM batch extraction — pure HTTP client, no atocore imports.
|
|
|
|
Fetches interactions from the AtoCore API, runs ``claude -p`` locally
|
|
for each, and POSTs candidates back. Zero dependency on atocore source
|
|
or Python packages — only uses stdlib + the ``claude`` CLI on PATH.
|
|
|
|
This is necessary because the ``claude`` CLI is on the Dalidou HOST
|
|
but not inside the Docker container, and the host's Python doesn't
|
|
have the container's dependencies (pydantic_settings, etc.).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet")
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90"))
|
|
MAX_RESPONSE_CHARS = 8000
|
|
MAX_PROMPT_CHARS = 2000
|
|
|
|
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
|
|
|
|
SYSTEM_PROMPT = """You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore.
|
|
|
|
Your job is to read one user prompt plus the assistant's response and decide which durable facts, decisions, preferences, architectural rules, or project invariants should be remembered across future sessions.
|
|
|
|
Rules:
|
|
|
|
1. Only surface durable claims. Skip transient status ("deploy is still running"), instructional guidance ("here is how to run the command"), troubleshooting tactics, ephemeral recommendations ("merge this PR now"), and session recaps.
|
|
2. A candidate is durable when a reader coming back in two weeks would still need to know it. Architectural choices, named rules, ratified decisions, invariants, procurement commitments, and project-level constraints qualify. Conversational fillers and step-by-step instructions do not.
|
|
3. Each candidate must stand alone. Rewrite the claim in one sentence under 200 characters with enough context that a reader without the conversation understands it.
|
|
4. Each candidate must have a type from this closed set: project, knowledge, preference, adaptation.
|
|
5. If the conversation is clearly scoped to a project (p04-gigabit, p05-interferometer, p06-polisher, atocore), set ``project`` to that id. Otherwise leave ``project`` empty.
|
|
6. If the response makes no durable claim, return an empty list. It is correct and expected to return [] on most conversational turns.
|
|
7. Confidence should be 0.5 by default so human review workload is honest. Raise to 0.6 only when the response states the claim in an unambiguous, committed form (e.g. "the decision is X", "the selected approach is Y", "X is non-negotiable").
|
|
8. Output must be a raw JSON array and nothing else. No prose before or after. No markdown fences. No explanations.
|
|
|
|
Each array element has exactly this shape:
|
|
|
|
{"type": "project|knowledge|preference|adaptation", "content": "...", "project": "...", "confidence": 0.5}
|
|
|
|
Return [] when there is nothing to extract."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body, timeout=10):
|
|
data = json.dumps(body).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def get_last_run(base_url):
|
|
try:
|
|
state = api_get(base_url, "/project/state/atocore?category=status")
|
|
for entry in state.get("entries", []):
|
|
if entry.get("key") == "last_extract_batch_run":
|
|
return entry["value"]
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def set_last_run(base_url, timestamp):
|
|
try:
|
|
api_post(base_url, "/project/state", {
|
|
"project": "atocore", "category": "status",
|
|
"key": "last_extract_batch_run", "value": timestamp,
|
|
"source": "batch_llm_extract_live.py",
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
_known_projects: set[str] = set()
|
|
|
|
|
|
def _load_known_projects(base_url):
|
|
"""Fetch registered project IDs from the API for R9 validation."""
|
|
global _known_projects
|
|
try:
|
|
data = api_get(base_url, "/projects")
|
|
_known_projects = {p["id"] for p in data.get("projects", [])}
|
|
for p in data.get("projects", []):
|
|
for alias in p.get("aliases", []):
|
|
_known_projects.add(alias)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def extract_one(prompt, response, project, model, timeout_s):
|
|
"""Run claude -p on one interaction, return parsed candidates."""
|
|
if not shutil.which("claude"):
|
|
return [], "claude_cli_missing"
|
|
|
|
prompt_excerpt = prompt[:MAX_PROMPT_CHARS]
|
|
response_excerpt = response[:MAX_RESPONSE_CHARS]
|
|
user_message = (
|
|
f"PROJECT HINT (may be empty): {project}\n\n"
|
|
f"USER PROMPT:\n{prompt_excerpt}\n\n"
|
|
f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n"
|
|
"Return the JSON array now."
|
|
)
|
|
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", SYSTEM_PROMPT,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return [], "timeout"
|
|
except Exception as exc:
|
|
return [], f"subprocess_error: {exc}"
|
|
|
|
if completed.returncode != 0:
|
|
return [], f"exit_{completed.returncode}"
|
|
|
|
raw = (completed.stdout or "").strip()
|
|
return parse_candidates(raw, project), ""
|
|
|
|
|
|
def parse_candidates(raw, interaction_project):
|
|
"""Parse model JSON output into candidate dicts."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text or text == "[]":
|
|
return []
|
|
|
|
if not text.lstrip().startswith("["):
|
|
start = text.find("[")
|
|
end = text.rfind("]")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
if not isinstance(parsed, list):
|
|
return []
|
|
|
|
results = []
|
|
for item in parsed:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
mem_type = str(item.get("type") or "").strip().lower()
|
|
content = str(item.get("content") or "").strip()
|
|
model_project = str(item.get("project") or "").strip()
|
|
# R9 trust hierarchy: interaction scope always wins when set.
|
|
# Model project only used for unscoped interactions + registered check.
|
|
if interaction_project:
|
|
project = interaction_project
|
|
elif model_project and model_project in _known_projects:
|
|
project = model_project
|
|
else:
|
|
project = ""
|
|
conf = item.get("confidence", 0.5)
|
|
if mem_type not in MEMORY_TYPES or not content:
|
|
continue
|
|
try:
|
|
conf = max(0.0, min(1.0, float(conf)))
|
|
except (TypeError, ValueError):
|
|
conf = 0.5
|
|
results.append({
|
|
"memory_type": mem_type,
|
|
"content": content[:1000],
|
|
"project": project,
|
|
"confidence": conf,
|
|
})
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--limit", type=int, default=50)
|
|
parser.add_argument("--since", default=None)
|
|
parser.add_argument("--model", default=DEFAULT_MODEL)
|
|
args = parser.parse_args()
|
|
|
|
_load_known_projects(args.base_url)
|
|
since = args.since or get_last_run(args.base_url)
|
|
print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}")
|
|
|
|
params = [f"limit={args.limit}"]
|
|
if since:
|
|
params.append(f"since={urllib.parse.quote(since)}")
|
|
listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}")
|
|
interaction_summaries = listing.get("interactions", [])
|
|
print(f"listed {len(interaction_summaries)} interactions")
|
|
|
|
processed = 0
|
|
total_candidates = 0
|
|
total_persisted = 0
|
|
errors = 0
|
|
|
|
for summary in interaction_summaries:
|
|
resp_chars = summary.get("response_chars", 0) or 0
|
|
if resp_chars < 50:
|
|
continue
|
|
iid = summary["id"]
|
|
try:
|
|
raw = api_get(
|
|
args.base_url,
|
|
f"/interactions/{urllib.parse.quote(iid, safe='')}",
|
|
)
|
|
except Exception as exc:
|
|
print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
response_text = raw.get("response", "") or ""
|
|
if not response_text.strip() or len(response_text) < 50:
|
|
continue
|
|
|
|
candidates, error = extract_one(
|
|
prompt=raw.get("prompt", "") or "",
|
|
response=response_text,
|
|
project=raw.get("project", "") or "",
|
|
model=args.model,
|
|
timeout_s=DEFAULT_TIMEOUT_S,
|
|
)
|
|
|
|
if error:
|
|
print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
|
|
processed += 1
|
|
total_candidates += len(candidates)
|
|
|
|
for c in candidates:
|
|
try:
|
|
api_post(args.base_url, "/memory", {
|
|
"memory_type": c["memory_type"],
|
|
"content": c["content"],
|
|
"project": c["project"],
|
|
"confidence": c["confidence"],
|
|
"status": "candidate",
|
|
})
|
|
total_persisted += 1
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code != 400:
|
|
errors += 1
|
|
except Exception:
|
|
errors += 1
|
|
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
set_last_run(args.base_url, now)
|
|
|
|
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|