fix: pure-stdlib host-side extraction script (no atocore imports)

The host Python on Dalidou lacks pydantic_settings and other
container-only deps. Refactored batch_llm_extract_live.py to be
a standalone HTTP client + subprocess wrapper using only stdlib.
Duplicates the system prompt and JSON parser from extractor_llm.py
rather than importing them — acceptable duplication since this
is a deployment adapter, not a library.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-12 10:57:18 -04:00
parent cd0fd390a8
commit 8af8af90d0

View File

@@ -1,23 +1,12 @@
"""Host-side LLM batch extraction against a live AtoCore instance.
"""Host-side LLM batch extraction — pure HTTP client, no atocore imports.
Fetches recent interactions from the AtoCore API, runs the LLM
extractor locally (requires ``claude`` CLI on PATH), and POSTs
candidates back to the API as ``status=candidate``.
Fetches interactions from the AtoCore API, runs ``claude -p`` locally
for each, and POSTs candidates back. Zero dependency on atocore source
or Python packages — only uses stdlib + the ``claude`` CLI on PATH.
This script runs on the HOST (not inside the Docker container)
because the ``claude`` CLI is installed host-side. The container's
``/admin/extract-batch`` endpoint can't use LLM mode because
``shutil.which("claude")`` returns None inside the container.
Tracks last-run timestamp via project state so re-runs auto-resume.
Usage (manual):
python3 scripts/batch_llm_extract_live.py --base-url http://localhost:8100
Usage (cron, via wrapper):
bash deploy/dalidou/batch-extract.sh
This is necessary because the ``claude`` CLI is on the Dalidou HOST
but not inside the Docker container, and the host's Python doesn't
have the container's dependencies (pydantic_settings, etc.).
"""
from __future__ import annotations
@@ -25,42 +14,71 @@ from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
# Make src/ importable
_REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_REPO_ROOT / "src"))
from atocore.interactions.service import Interaction # noqa: E402
from atocore.memory.extractor_llm import extract_candidates_llm # noqa: E402
from datetime import datetime, timezone
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
DEFAULT_TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "10"))
DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet")
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90"))
MAX_RESPONSE_CHARS = 8000
MAX_PROMPT_CHARS = 2000
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
SYSTEM_PROMPT = """You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore.
Your job is to read one user prompt plus the assistant's response and decide which durable facts, decisions, preferences, architectural rules, or project invariants should be remembered across future sessions.
Rules:
1. Only surface durable claims. Skip transient status ("deploy is still running"), instructional guidance ("here is how to run the command"), troubleshooting tactics, ephemeral recommendations ("merge this PR now"), and session recaps.
2. A candidate is durable when a reader coming back in two weeks would still need to know it. Architectural choices, named rules, ratified decisions, invariants, procurement commitments, and project-level constraints qualify. Conversational fillers and step-by-step instructions do not.
3. Each candidate must stand alone. Rewrite the claim in one sentence under 200 characters with enough context that a reader without the conversation understands it.
4. Each candidate must have a type from this closed set: project, knowledge, preference, adaptation.
5. If the conversation is clearly scoped to a project (p04-gigabit, p05-interferometer, p06-polisher, atocore), set ``project`` to that id. Otherwise leave ``project`` empty.
6. If the response makes no durable claim, return an empty list. It is correct and expected to return [] on most conversational turns.
7. Confidence should be 0.5 by default so human review workload is honest. Raise to 0.6 only when the response states the claim in an unambiguous, committed form (e.g. "the decision is X", "the selected approach is Y", "X is non-negotiable").
8. Output must be a raw JSON array and nothing else. No prose before or after. No markdown fences. No explanations.
Each array element has exactly this shape:
{"type": "project|knowledge|preference|adaptation", "content": "...", "project": "...", "confidence": 0.5}
Return [] when there is nothing to extract."""
_sandbox_cwd = None
def api_get(base_url: str, path: str, timeout: int = DEFAULT_TIMEOUT) -> dict:
def get_sandbox_cwd():
global _sandbox_cwd
if _sandbox_cwd is None:
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-")
return _sandbox_cwd
def api_get(base_url, path, timeout=10):
req = urllib.request.Request(f"{base_url}{path}")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def api_post(base_url: str, path: str, body: dict, timeout: int = DEFAULT_TIMEOUT) -> dict:
def api_post(base_url, path, body, timeout=10):
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
f"{base_url}{path}",
method="POST",
headers={"Content-Type": "application/json"},
data=data,
f"{base_url}{path}", method="POST",
headers={"Content-Type": "application/json"}, data=data,
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_last_run(base_url: str) -> str | None:
def get_last_run(base_url):
try:
state = api_get(base_url, "/project/state/atocore?category=status")
for entry in state.get("entries", []):
@@ -71,66 +89,150 @@ def get_last_run(base_url: str) -> str | None:
return None
def set_last_run(base_url: str, timestamp: str) -> None:
def set_last_run(base_url, timestamp):
try:
api_post(base_url, "/project/state", {
"project": "atocore",
"category": "status",
"key": "last_extract_batch_run",
"value": timestamp,
"project": "atocore", "category": "status",
"key": "last_extract_batch_run", "value": timestamp,
"source": "batch_llm_extract_live.py",
})
except Exception:
pass
def fetch_interactions(base_url: str, since: str | None, limit: int) -> list[dict]:
params = [f"limit={limit}"]
if since:
params.append(f"since={urllib.parse.quote(since)}")
query = "?" + "&".join(params)
result = api_get(base_url, f"/interactions{query}")
return result.get("interactions", [])
def extract_one(prompt, response, project, model, timeout_s):
"""Run claude -p on one interaction, return parsed candidates."""
if not shutil.which("claude"):
return [], "claude_cli_missing"
prompt_excerpt = prompt[:MAX_PROMPT_CHARS]
response_excerpt = response[:MAX_RESPONSE_CHARS]
user_message = (
f"PROJECT HINT (may be empty): {project}\n\n"
f"USER PROMPT:\n{prompt_excerpt}\n\n"
f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n"
"Return the JSON array now."
)
args = [
"claude", "-p",
"--model", model,
"--append-system-prompt", SYSTEM_PROMPT,
"--no-session-persistence",
"--disable-slash-commands",
user_message,
]
try:
completed = subprocess.run(
args, capture_output=True, text=True,
timeout=timeout_s, cwd=get_sandbox_cwd(),
encoding="utf-8", errors="replace",
)
except subprocess.TimeoutExpired:
return [], "timeout"
except Exception as exc:
return [], f"subprocess_error: {exc}"
if completed.returncode != 0:
return [], f"exit_{completed.returncode}"
raw = (completed.stdout or "").strip()
return parse_candidates(raw, project), ""
def main() -> int:
def parse_candidates(raw, interaction_project):
"""Parse model JSON output into candidate dicts."""
text = raw.strip()
if text.startswith("```"):
text = text.strip("`")
nl = text.find("\n")
if nl >= 0:
text = text[nl + 1:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
if not text or text == "[]":
return []
if not text.lstrip().startswith("["):
start = text.find("[")
end = text.rfind("]")
if start >= 0 and end > start:
text = text[start:end + 1]
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return []
if not isinstance(parsed, list):
return []
results = []
for item in parsed:
if not isinstance(item, dict):
continue
mem_type = str(item.get("type") or "").strip().lower()
content = str(item.get("content") or "").strip()
project = str(item.get("project") or "").strip()
if not project and interaction_project:
project = interaction_project
conf = item.get("confidence", 0.5)
if mem_type not in MEMORY_TYPES or not content:
continue
try:
conf = max(0.0, min(1.0, float(conf)))
except (TypeError, ValueError):
conf = 0.5
results.append({
"memory_type": mem_type,
"content": content[:1000],
"project": project,
"confidence": conf,
})
return results
def main():
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--limit", type=int, default=50)
parser.add_argument("--since", default=None, help="override last-run timestamp")
parser.add_argument("--since", default=None)
parser.add_argument("--model", default=DEFAULT_MODEL)
args = parser.parse_args()
since = args.since or get_last_run(args.base_url)
print(f"fetching interactions since={since or '(first run)'} limit={args.limit}")
print(f"since={since or '(first run)'} limit={args.limit} model={args.model}")
raw_interactions = fetch_interactions(args.base_url, since, args.limit)
print(f"fetched {len(raw_interactions)} interactions")
params = [f"limit={args.limit}"]
if since:
params.append(f"since={urllib.parse.quote(since)}")
raw_interactions = api_get(args.base_url, f"/interactions?{'&'.join(params)}")
interactions = raw_interactions.get("interactions", [])
print(f"fetched {len(interactions)} interactions")
processed = 0
total_candidates = 0
total_persisted = 0
errors = 0
for raw in raw_interactions:
for raw in interactions:
response_text = raw.get("response", "") or ""
if not response_text.strip():
if not response_text.strip() or len(response_text) < 50:
continue
interaction = Interaction(
id=raw["id"],
candidates, error = extract_one(
prompt=raw.get("prompt", "") or "",
response=response_text,
response_summary=raw.get("response_summary", "") or "",
project=raw.get("project", "") or "",
client=raw.get("client", "") or "",
session_id=raw.get("session_id", "") or "",
created_at=raw.get("created_at", "") or "",
model=args.model,
timeout_s=DEFAULT_TIMEOUT_S,
)
try:
candidates = extract_candidates_llm(interaction)
except Exception as exc:
print(f" ! extraction error on {interaction.id[:8]}: {exc}")
if error:
print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr)
errors += 1
continue
@@ -140,27 +242,24 @@ def main() -> int:
for c in candidates:
try:
api_post(args.base_url, "/memory", {
"memory_type": c.memory_type,
"content": c.content,
"project": c.project,
"confidence": c.confidence,
"memory_type": c["memory_type"],
"content": c["content"],
"project": c["project"],
"confidence": c["confidence"],
"status": "candidate",
})
total_persisted += 1
except urllib.error.HTTPError as exc:
if exc.code != 400: # 400 = duplicate, skip silently
if exc.code != 400:
errors += 1
except Exception:
errors += 1
from datetime import datetime, timezone
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
set_last_run(args.base_url, now)
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
main()