"""Host-side LLM batch extraction — HTTP client + shared prompt module. Fetches interactions from the AtoCore API, runs ``claude -p`` locally for each, and POSTs candidates back. Uses stdlib + the ``claude`` CLI on PATH, plus the stdlib-only shared prompt/parser module at ``atocore.memory._llm_prompt`` to eliminate prompt/parser drift against the in-container extractor (R12). This is necessary because the ``claude`` CLI is on the Dalidou HOST but not inside the Docker container, and the host's Python doesn't have the container's dependencies (pydantic_settings, etc.) — so we only import the one stdlib-only module, not the full atocore package. """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import tempfile import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone # R12: share the prompt + parser with the in-container extractor so # the two paths can't drift. The imported module is stdlib-only by # design; see src/atocore/memory/_llm_prompt.py. _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) _SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) if _SRC_DIR not in sys.path: sys.path.insert(0, _SRC_DIR) from atocore.memory._llm_prompt import ( # noqa: E402 MEMORY_TYPES, SYSTEM_PROMPT, build_user_message, normalize_candidate_item, parse_llm_json_array, ) DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90")) _sandbox_cwd = None def get_sandbox_cwd(): global _sandbox_cwd if _sandbox_cwd is None: _sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-") return _sandbox_cwd def api_get(base_url, path, timeout=10): req = urllib.request.Request(f"{base_url}{path}") with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def api_post(base_url, path, body, timeout=10): data = json.dumps(body).encode("utf-8") req = urllib.request.Request( f"{base_url}{path}", method="POST", headers={"Content-Type": "application/json"}, data=data, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def get_last_run(base_url): try: state = api_get(base_url, "/project/state/atocore?category=status") for entry in state.get("entries", []): if entry.get("key") == "last_extract_batch_run": return entry["value"] except Exception: pass return None def set_last_run(base_url, timestamp): try: api_post(base_url, "/project/state", { "project": "atocore", "category": "status", "key": "last_extract_batch_run", "value": timestamp, "source": "batch_llm_extract_live.py", }) except Exception: pass _known_projects: set[str] = set() def _load_known_projects(base_url): """Fetch registered project IDs from the API for R9 validation.""" global _known_projects try: data = api_get(base_url, "/projects") _known_projects = {p["id"] for p in data.get("projects", [])} for p in data.get("projects", []): for alias in p.get("aliases", []): _known_projects.add(alias) except Exception: pass def extract_one(prompt, response, project, model, timeout_s): """Run claude -p on one interaction, return parsed candidates.""" if not shutil.which("claude"): return [], "claude_cli_missing" user_message = build_user_message(prompt, response, project) args = [ "claude", "-p", "--model", model, "--append-system-prompt", SYSTEM_PROMPT, "--disable-slash-commands", user_message, ] try: completed = subprocess.run( args, capture_output=True, text=True, timeout=timeout_s, cwd=get_sandbox_cwd(), encoding="utf-8", errors="replace", ) except subprocess.TimeoutExpired: return [], "timeout" except Exception as exc: return [], f"subprocess_error: {exc}" if completed.returncode != 0: return [], f"exit_{completed.returncode}" raw = (completed.stdout or "").strip() return parse_candidates(raw, project), "" def parse_candidates(raw, interaction_project): """Parse model JSON output into candidate dicts. Stripping + per-item normalization come from the shared ``_llm_prompt`` module. Host-side project attribution: interaction scope wins, otherwise keep the model's tag (the API's own R9 registry-check will happen server-side in the container on write; here we preserve the signal instead of dropping it). """ results = [] for item in parse_llm_json_array(raw): normalized = normalize_candidate_item(item) if normalized is None: continue project = interaction_project or normalized["project"] or "" results.append({ "memory_type": normalized["type"], "content": normalized["content"], "project": project, "confidence": normalized["confidence"], }) return results def main(): parser = argparse.ArgumentParser(description="Host-side LLM batch extraction") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--limit", type=int, default=50) parser.add_argument("--since", default=None) parser.add_argument("--model", default=DEFAULT_MODEL) args = parser.parse_args() _load_known_projects(args.base_url) since = args.since or get_last_run(args.base_url) print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}") params = [f"limit={args.limit}"] if since: params.append(f"since={urllib.parse.quote(since)}") listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}") interaction_summaries = listing.get("interactions", []) print(f"listed {len(interaction_summaries)} interactions") processed = 0 total_candidates = 0 total_persisted = 0 errors = 0 for summary in interaction_summaries: resp_chars = summary.get("response_chars", 0) or 0 if resp_chars < 50: continue iid = summary["id"] try: raw = api_get( args.base_url, f"/interactions/{urllib.parse.quote(iid, safe='')}", ) except Exception as exc: print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr) errors += 1 continue response_text = raw.get("response", "") or "" if not response_text.strip() or len(response_text) < 50: continue candidates, error = extract_one( prompt=raw.get("prompt", "") or "", response=response_text, project=raw.get("project", "") or "", model=args.model, timeout_s=DEFAULT_TIMEOUT_S, ) if error: print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr) errors += 1 continue processed += 1 total_candidates += len(candidates) for c in candidates: try: api_post(args.base_url, "/memory", { "memory_type": c["memory_type"], "content": c["content"], "project": c["project"], "confidence": c["confidence"], "status": "candidate", }) total_persisted += 1 except urllib.error.HTTPError as exc: if exc.code != 400: errors += 1 except Exception: errors += 1 now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") set_last_run(args.base_url, now) print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}") if __name__ == "__main__": main()