"""Host-side LLM batch extraction — pure HTTP client, no atocore imports. Fetches interactions from the AtoCore API, runs ``claude -p`` locally for each, and POSTs candidates back. Zero dependency on atocore source or Python packages — only uses stdlib + the ``claude`` CLI on PATH. This is necessary because the ``claude`` CLI is on the Dalidou HOST but not inside the Docker container, and the host's Python doesn't have the container's dependencies (pydantic_settings, etc.). """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import tempfile import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90")) MAX_RESPONSE_CHARS = 8000 MAX_PROMPT_CHARS = 2000 MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"} SYSTEM_PROMPT = """You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore. AtoCore stores two kinds of knowledge: A. PROJECT-SPECIFIC: applied decisions, constraints, and architecture for a named project (p04-gigabit, p05-interferometer, p06-polisher, atomizer-v2, atocore). These stay scoped to one project. B. DOMAIN KNOWLEDGE: generalizable engineering insight that was EARNED through project work and is reusable across projects. Tag these with a domain instead of a project. THE CRITICAL BAR FOR DOMAIN KNOWLEDGE: Only extract insight that took real effort to discover. The test: "Would a competent engineer need experience to know this, or could they find it in 30 seconds on Google?" If they can look it up, do NOT extract it. EXTRACT (earned insight): - "At F/1.2, Zerodur CTE gradient across the blank is the second-largest WFE contributor after gravity sag" - "Preston removal rate model breaks down below 5N applied force because the contact assumption fails" - "For swing-arm polishing, m=1 (coma) is NOT correctable by force modulation (score 0.09)" DO NOT EXTRACT (common knowledge): - "Zerodur CTE is 0.05 ppm/K" (textbook value) - "FEA uses finite elements to discretize continuous domains" (definition) - "Python is a programming language" (obvious) Rules: 1. Only surface durable claims. Skip transient status, instructional guidance, troubleshooting, ephemeral recommendations, session recaps. 2. A candidate is durable when a reader coming back in two weeks would still need to know it. 3. Each candidate must stand alone in one sentence under 200 characters. 4. Type must be one of: project, knowledge, preference, adaptation. 5. For project-specific claims, set ``project`` to the project id. 6. For generalizable domain insight, set ``project`` to empty and set ``domain`` to one of: physics, materials, optics, mechanics, manufacturing, metrology, controls, software, math, finance. 7. When one conversation produces BOTH a project-specific fact AND a generalizable principle, emit BOTH as separate candidates. 8. Return [] on most turns. The bar is high. Empty is correct and expected. 9. Confidence 0.5 default. Raise to 0.6 only for unambiguous committed claims. 10. Output a raw JSON array only. No prose, no markdown fences. Each array element: {"type": "project|knowledge|preference|adaptation", "content": "...", "project": "...", "domain": "", "confidence": 0.5} Use ``project`` for project-scoped candidates. Use ``domain`` for cross-project knowledge. Never set both.""" _sandbox_cwd = None def get_sandbox_cwd(): global _sandbox_cwd if _sandbox_cwd is None: _sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-") return _sandbox_cwd def api_get(base_url, path, timeout=10): req = urllib.request.Request(f"{base_url}{path}") with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def api_post(base_url, path, body, timeout=10): data = json.dumps(body).encode("utf-8") req = urllib.request.Request( f"{base_url}{path}", method="POST", headers={"Content-Type": "application/json"}, data=data, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def get_last_run(base_url): try: state = api_get(base_url, "/project/state/atocore?category=status") for entry in state.get("entries", []): if entry.get("key") == "last_extract_batch_run": return entry["value"] except Exception: pass return None def set_last_run(base_url, timestamp): try: api_post(base_url, "/project/state", { "project": "atocore", "category": "status", "key": "last_extract_batch_run", "value": timestamp, "source": "batch_llm_extract_live.py", }) except Exception: pass _known_projects: set[str] = set() def _load_known_projects(base_url): """Fetch registered project IDs from the API for R9 validation.""" global _known_projects try: data = api_get(base_url, "/projects") _known_projects = {p["id"] for p in data.get("projects", [])} for p in data.get("projects", []): for alias in p.get("aliases", []): _known_projects.add(alias) except Exception: pass def extract_one(prompt, response, project, model, timeout_s): """Run claude -p on one interaction, return parsed candidates.""" if not shutil.which("claude"): return [], "claude_cli_missing" prompt_excerpt = prompt[:MAX_PROMPT_CHARS] response_excerpt = response[:MAX_RESPONSE_CHARS] user_message = ( f"PROJECT HINT (may be empty): {project}\n\n" f"USER PROMPT:\n{prompt_excerpt}\n\n" f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n" "Return the JSON array now." ) args = [ "claude", "-p", "--model", model, "--append-system-prompt", SYSTEM_PROMPT, "--disable-slash-commands", user_message, ] try: completed = subprocess.run( args, capture_output=True, text=True, timeout=timeout_s, cwd=get_sandbox_cwd(), encoding="utf-8", errors="replace", ) except subprocess.TimeoutExpired: return [], "timeout" except Exception as exc: return [], f"subprocess_error: {exc}" if completed.returncode != 0: return [], f"exit_{completed.returncode}" raw = (completed.stdout or "").strip() return parse_candidates(raw, project), "" def parse_candidates(raw, interaction_project): """Parse model JSON output into candidate dicts.""" text = raw.strip() if text.startswith("```"): text = text.strip("`") nl = text.find("\n") if nl >= 0: text = text[nl + 1:] if text.endswith("```"): text = text[:-3] text = text.strip() if not text or text == "[]": return [] if not text.lstrip().startswith("["): start = text.find("[") end = text.rfind("]") if start >= 0 and end > start: text = text[start:end + 1] try: parsed = json.loads(text) except json.JSONDecodeError: return [] if not isinstance(parsed, list): return [] results = [] for item in parsed: if not isinstance(item, dict): continue mem_type = str(item.get("type") or "").strip().lower() content = str(item.get("content") or "").strip() model_project = str(item.get("project") or "").strip() domain = str(item.get("domain") or "").strip().lower() # R9 trust hierarchy: interaction scope always wins when set. if interaction_project: project = interaction_project elif model_project and model_project in _known_projects: project = model_project else: project = "" # Domain knowledge: embed tag in content for cross-project retrieval if domain and not project: content = f"[{domain}] {content}" conf = item.get("confidence", 0.5) if mem_type not in MEMORY_TYPES or not content: continue try: conf = max(0.0, min(1.0, float(conf))) except (TypeError, ValueError): conf = 0.5 results.append({ "memory_type": mem_type, "content": content[:1000], "project": project, "confidence": conf, }) return results def main(): parser = argparse.ArgumentParser(description="Host-side LLM batch extraction") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--limit", type=int, default=50) parser.add_argument("--since", default=None) parser.add_argument("--model", default=DEFAULT_MODEL) args = parser.parse_args() _load_known_projects(args.base_url) since = args.since or get_last_run(args.base_url) print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}") params = [f"limit={args.limit}"] if since: params.append(f"since={urllib.parse.quote(since)}") listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}") interaction_summaries = listing.get("interactions", []) print(f"listed {len(interaction_summaries)} interactions") processed = 0 total_candidates = 0 total_persisted = 0 errors = 0 for summary in interaction_summaries: resp_chars = summary.get("response_chars", 0) or 0 if resp_chars < 50: continue iid = summary["id"] try: raw = api_get( args.base_url, f"/interactions/{urllib.parse.quote(iid, safe='')}", ) except Exception as exc: print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr) errors += 1 continue response_text = raw.get("response", "") or "" if not response_text.strip() or len(response_text) < 50: continue candidates, error = extract_one( prompt=raw.get("prompt", "") or "", response=response_text, project=raw.get("project", "") or "", model=args.model, timeout_s=DEFAULT_TIMEOUT_S, ) if error: print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr) errors += 1 continue processed += 1 total_candidates += len(candidates) for c in candidates: try: api_post(args.base_url, "/memory", { "memory_type": c["memory_type"], "content": c["content"], "project": c["project"], "confidence": c["confidence"], "status": "candidate", }) total_persisted += 1 except urllib.error.HTTPError as exc: if exc.code != 400: errors += 1 except Exception: errors += 1 now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") set_last_run(args.base_url, now) print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}") if __name__ == "__main__": main()