Files
ATOCore/scripts/batch_llm_extract_live.py

301 lines
11 KiB
Python
Raw Normal View History

"""Host-side LLM batch extraction — pure HTTP client, no atocore imports.
Fetches interactions from the AtoCore API, runs ``claude -p`` locally
for each, and POSTs candidates back. Zero dependency on atocore source
or Python packages only uses stdlib + the ``claude`` CLI on PATH.
This is necessary because the ``claude`` CLI is on the Dalidou HOST
but not inside the Docker container, and the host's Python doesn't
have the container's dependencies (pydantic_settings, etc.).
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet")
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90"))
MAX_RESPONSE_CHARS = 8000
MAX_PROMPT_CHARS = 2000
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
SYSTEM_PROMPT = """You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore.
Your job is to read one user prompt plus the assistant's response and decide which durable facts, decisions, preferences, architectural rules, or project invariants should be remembered across future sessions.
Rules:
1. Only surface durable claims. Skip transient status ("deploy is still running"), instructional guidance ("here is how to run the command"), troubleshooting tactics, ephemeral recommendations ("merge this PR now"), and session recaps.
2. A candidate is durable when a reader coming back in two weeks would still need to know it. Architectural choices, named rules, ratified decisions, invariants, procurement commitments, and project-level constraints qualify. Conversational fillers and step-by-step instructions do not.
3. Each candidate must stand alone. Rewrite the claim in one sentence under 200 characters with enough context that a reader without the conversation understands it.
4. Each candidate must have a type from this closed set: project, knowledge, preference, adaptation.
5. If the conversation is clearly scoped to a project (p04-gigabit, p05-interferometer, p06-polisher, atocore), set ``project`` to that id. Otherwise leave ``project`` empty.
6. If the response makes no durable claim, return an empty list. It is correct and expected to return [] on most conversational turns.
7. Confidence should be 0.5 by default so human review workload is honest. Raise to 0.6 only when the response states the claim in an unambiguous, committed form (e.g. "the decision is X", "the selected approach is Y", "X is non-negotiable").
8. Output must be a raw JSON array and nothing else. No prose before or after. No markdown fences. No explanations.
Each array element has exactly this shape:
{"type": "project|knowledge|preference|adaptation", "content": "...", "project": "...", "confidence": 0.5}
Return [] when there is nothing to extract."""
_sandbox_cwd = None
def get_sandbox_cwd():
global _sandbox_cwd
if _sandbox_cwd is None:
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-")
return _sandbox_cwd
def api_get(base_url, path, timeout=10):
req = urllib.request.Request(f"{base_url}{path}")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def api_post(base_url, path, body, timeout=10):
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
f"{base_url}{path}", method="POST",
headers={"Content-Type": "application/json"}, data=data,
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_last_run(base_url):
try:
state = api_get(base_url, "/project/state/atocore?category=status")
for entry in state.get("entries", []):
if entry.get("key") == "last_extract_batch_run":
return entry["value"]
except Exception:
pass
return None
def set_last_run(base_url, timestamp):
try:
api_post(base_url, "/project/state", {
"project": "atocore", "category": "status",
"key": "last_extract_batch_run", "value": timestamp,
"source": "batch_llm_extract_live.py",
})
except Exception:
pass
_known_projects: set[str] = set()
def _load_known_projects(base_url):
"""Fetch registered project IDs from the API for R9 validation."""
global _known_projects
try:
data = api_get(base_url, "/projects")
_known_projects = {p["id"] for p in data.get("projects", [])}
for p in data.get("projects", []):
for alias in p.get("aliases", []):
_known_projects.add(alias)
except Exception:
pass
def extract_one(prompt, response, project, model, timeout_s):
"""Run claude -p on one interaction, return parsed candidates."""
if not shutil.which("claude"):
return [], "claude_cli_missing"
prompt_excerpt = prompt[:MAX_PROMPT_CHARS]
response_excerpt = response[:MAX_RESPONSE_CHARS]
user_message = (
f"PROJECT HINT (may be empty): {project}\n\n"
f"USER PROMPT:\n{prompt_excerpt}\n\n"
f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n"
"Return the JSON array now."
)
args = [
"claude", "-p",
"--model", model,
"--append-system-prompt", SYSTEM_PROMPT,
"--disable-slash-commands",
user_message,
]
try:
completed = subprocess.run(
args, capture_output=True, text=True,
timeout=timeout_s, cwd=get_sandbox_cwd(),
encoding="utf-8", errors="replace",
)
except subprocess.TimeoutExpired:
return [], "timeout"
except Exception as exc:
return [], f"subprocess_error: {exc}"
if completed.returncode != 0:
return [], f"exit_{completed.returncode}"
raw = (completed.stdout or "").strip()
return parse_candidates(raw, project), ""
def parse_candidates(raw, interaction_project):
"""Parse model JSON output into candidate dicts."""
text = raw.strip()
if text.startswith("```"):
text = text.strip("`")
nl = text.find("\n")
if nl >= 0:
text = text[nl + 1:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
if not text or text == "[]":
return []
if not text.lstrip().startswith("["):
start = text.find("[")
end = text.rfind("]")
if start >= 0 and end > start:
text = text[start:end + 1]
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return []
if not isinstance(parsed, list):
return []
results = []
for item in parsed:
if not isinstance(item, dict):
continue
mem_type = str(item.get("type") or "").strip().lower()
content = str(item.get("content") or "").strip()
model_project = str(item.get("project") or "").strip()
# R9 trust hierarchy: interaction scope always wins when set.
# Model project only used for unscoped interactions + registered check.
if interaction_project:
project = interaction_project
elif model_project and model_project in _known_projects:
project = model_project
else:
project = ""
conf = item.get("confidence", 0.5)
if mem_type not in MEMORY_TYPES or not content:
continue
try:
conf = max(0.0, min(1.0, float(conf)))
except (TypeError, ValueError):
conf = 0.5
results.append({
"memory_type": mem_type,
"content": content[:1000],
"project": project,
"confidence": conf,
})
return results
def main():
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--limit", type=int, default=50)
parser.add_argument("--since", default=None)
parser.add_argument("--model", default=DEFAULT_MODEL)
args = parser.parse_args()
_load_known_projects(args.base_url)
since = args.since or get_last_run(args.base_url)
print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}")
params = [f"limit={args.limit}"]
if since:
params.append(f"since={urllib.parse.quote(since)}")
listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}")
interaction_summaries = listing.get("interactions", [])
print(f"listed {len(interaction_summaries)} interactions")
processed = 0
total_candidates = 0
total_persisted = 0
errors = 0
for summary in interaction_summaries:
resp_chars = summary.get("response_chars", 0) or 0
if resp_chars < 50:
continue
iid = summary["id"]
try:
raw = api_get(
args.base_url,
f"/interactions/{urllib.parse.quote(iid, safe='')}",
)
except Exception as exc:
print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr)
errors += 1
continue
response_text = raw.get("response", "") or ""
if not response_text.strip() or len(response_text) < 50:
continue
candidates, error = extract_one(
prompt=raw.get("prompt", "") or "",
response=response_text,
project=raw.get("project", "") or "",
model=args.model,
timeout_s=DEFAULT_TIMEOUT_S,
)
if error:
print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr)
errors += 1
continue
processed += 1
total_candidates += len(candidates)
for c in candidates:
try:
api_post(args.base_url, "/memory", {
"memory_type": c["memory_type"],
"content": c["content"],
"project": c["project"],
"confidence": c["confidence"],
"status": "candidate",
})
total_persisted += 1
except urllib.error.HTTPError as exc:
if exc.code != 400:
errors += 1
except Exception:
errors += 1
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
set_last_run(args.base_url, now)
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
if __name__ == "__main__":
main()