"""Host-side LLM batch extraction — pure HTTP client, no atocore imports. Fetches interactions from the AtoCore API, runs ``claude -p`` locally for each, and POSTs candidates back. Zero dependency on atocore source or Python packages — only uses stdlib + the ``claude`` CLI on PATH. This is necessary because the ``claude`` CLI is on the Dalidou HOST but not inside the Docker container, and the host's Python doesn't have the container's dependencies (pydantic_settings, etc.). """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import tempfile import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90")) MAX_RESPONSE_CHARS = 8000 MAX_PROMPT_CHARS = 2000 MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"} SYSTEM_PROMPT = """You extract memory candidates from LLM conversation turns for a personal context engine called AtoCore. AtoCore is the brain for Atomaste's engineering work. Known projects: p04-gigabit, p05-interferometer, p06-polisher, atomizer-v2, atocore, abb-space. Unknown project names — still tag them, the system auto-detects. Your job is to emit SIGNALS that matter for future context. Be aggressive: err on the side of capturing useful signal. Triage filters noise downstream. WHAT TO EMIT (in order of importance): 1. PROJECT ACTIVITY — any mention of a project with context worth remembering: - "Schott quote received for ABB-Space" (event + project) - "Cédric asked about p06 firmware timing" (stakeholder event) - "Still waiting on Zygo lead-time from Nabeel" (blocker status) - "p05 vendor decision needs to happen this week" (action item) 2. DECISIONS AND CHOICES — anything that commits to a direction: - "Going with Zygo Verifire SV for p05" (decision) - "Dropping stitching from primary workflow" (design choice) - "USB SSD mandatory, not SD card" (architectural commitment) 3. DURABLE ENGINEERING INSIGHT — earned knowledge that generalizes: - "CTE gradient dominates WFE at F/1.2" (materials insight) - "Preston model breaks below 5N because contact assumption fails" - "m=1 coma NOT correctable by force modulation" (controls insight) Test: would a competent engineer NEED experience to know this? If it's textbook/google-findable, skip it. 4. STAKEHOLDER AND VENDOR EVENTS: - "Email sent to Nabeel 2026-04-13 asking for lead time" - "Meeting with Jason on Table 7 next Tuesday" - "Starspec wants updated CAD by Friday" 5. PREFERENCES AND ADAPTATIONS that shape how Antoine works: - "Antoine prefers OAuth over API keys" - "Extraction stays off the capture hot path" WHAT TO SKIP: - Pure conversational filler ("ok thanks", "let me check") - Instructional help content ("run this command", "here's how to...") - Obvious textbook facts anyone can google in 30 seconds - Session meta-chatter ("let me commit this", "deploy running") - Transient system state snapshots ("36 active memories right now") CANDIDATE TYPES — choose the best fit: - project — a fact, decision, or event specific to one named project - knowledge — durable engineering insight (use domain, not project) - preference — how Antoine works / wants things done - adaptation — a standing rule or adjustment to behavior - episodic — a stakeholder event or milestone worth remembering DOMAINS for knowledge candidates (required when type=knowledge and project is empty): physics, materials, optics, mechanics, manufacturing, metrology, controls, software, math, finance, business TRUST HIERARCHY: - project-specific: set project to the project id, leave domain empty - domain knowledge: set domain, leave project empty - events/activity: use project, type=project or episodic - one conversation can produce MULTIPLE candidates — emit them all OUTPUT RULES: - Each candidate content under 250 characters, stands alone - Default confidence 0.5. Raise to 0.7 only for ratified/committed claims. - Raw JSON array, no prose, no markdown fences - Empty array [] is fine when the conversation has no durable signal Each element: {"type": "project|knowledge|preference|adaptation|episodic", "content": "...", "project": "...", "domain": "", "confidence": 0.5}""" _sandbox_cwd = None def get_sandbox_cwd(): global _sandbox_cwd if _sandbox_cwd is None: _sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-") return _sandbox_cwd def api_get(base_url, path, timeout=10): req = urllib.request.Request(f"{base_url}{path}") with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def api_post(base_url, path, body, timeout=10): data = json.dumps(body).encode("utf-8") req = urllib.request.Request( f"{base_url}{path}", method="POST", headers={"Content-Type": "application/json"}, data=data, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def get_last_run(base_url): try: state = api_get(base_url, "/project/state/atocore?category=status") for entry in state.get("entries", []): if entry.get("key") == "last_extract_batch_run": return entry["value"] except Exception: pass return None def set_last_run(base_url, timestamp): try: api_post(base_url, "/project/state", { "project": "atocore", "category": "status", "key": "last_extract_batch_run", "value": timestamp, "source": "batch_llm_extract_live.py", }) except Exception: pass _known_projects: set[str] = set() def _load_known_projects(base_url): """Fetch registered project IDs from the API for R9 validation.""" global _known_projects try: data = api_get(base_url, "/projects") _known_projects = {p["id"] for p in data.get("projects", [])} for p in data.get("projects", []): for alias in p.get("aliases", []): _known_projects.add(alias) except Exception: pass def extract_one(prompt, response, project, model, timeout_s): """Run claude -p on one interaction, return parsed candidates.""" if not shutil.which("claude"): return [], "claude_cli_missing" prompt_excerpt = prompt[:MAX_PROMPT_CHARS] response_excerpt = response[:MAX_RESPONSE_CHARS] user_message = ( f"PROJECT HINT (may be empty): {project}\n\n" f"USER PROMPT:\n{prompt_excerpt}\n\n" f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n" "Return the JSON array now." ) args = [ "claude", "-p", "--model", model, "--append-system-prompt", SYSTEM_PROMPT, "--disable-slash-commands", user_message, ] try: completed = subprocess.run( args, capture_output=True, text=True, timeout=timeout_s, cwd=get_sandbox_cwd(), encoding="utf-8", errors="replace", ) except subprocess.TimeoutExpired: return [], "timeout" except Exception as exc: return [], f"subprocess_error: {exc}" if completed.returncode != 0: return [], f"exit_{completed.returncode}" raw = (completed.stdout or "").strip() return parse_candidates(raw, project), "" def parse_candidates(raw, interaction_project): """Parse model JSON output into candidate dicts.""" text = raw.strip() if text.startswith("```"): text = text.strip("`") nl = text.find("\n") if nl >= 0: text = text[nl + 1:] if text.endswith("```"): text = text[:-3] text = text.strip() if not text or text == "[]": return [] if not text.lstrip().startswith("["): start = text.find("[") end = text.rfind("]") if start >= 0 and end > start: text = text[start:end + 1] try: parsed = json.loads(text) except json.JSONDecodeError: return [] if not isinstance(parsed, list): return [] results = [] for item in parsed: if not isinstance(item, dict): continue mem_type = str(item.get("type") or "").strip().lower() content = str(item.get("content") or "").strip() model_project = str(item.get("project") or "").strip() domain = str(item.get("domain") or "").strip().lower() # R9 trust hierarchy: interaction scope always wins when set. # For unscoped interactions, keep model's project tag even if # unregistered — the system will detect new projects/leads. if interaction_project: project = interaction_project elif model_project: project = model_project else: project = "" # Domain knowledge: embed tag in content for cross-project retrieval if domain and not project: content = f"[{domain}] {content}" conf = item.get("confidence", 0.5) if mem_type not in MEMORY_TYPES or not content: continue try: conf = max(0.0, min(1.0, float(conf))) except (TypeError, ValueError): conf = 0.5 results.append({ "memory_type": mem_type, "content": content[:1000], "project": project, "confidence": conf, }) return results def main(): parser = argparse.ArgumentParser(description="Host-side LLM batch extraction") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--limit", type=int, default=50) parser.add_argument("--since", default=None) parser.add_argument("--model", default=DEFAULT_MODEL) args = parser.parse_args() _load_known_projects(args.base_url) since = args.since or get_last_run(args.base_url) print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}") params = [f"limit={args.limit}"] if since: params.append(f"since={urllib.parse.quote(since)}") listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}") interaction_summaries = listing.get("interactions", []) print(f"listed {len(interaction_summaries)} interactions") processed = 0 total_candidates = 0 total_persisted = 0 errors = 0 for summary in interaction_summaries: resp_chars = summary.get("response_chars", 0) or 0 if resp_chars < 50: continue iid = summary["id"] try: raw = api_get( args.base_url, f"/interactions/{urllib.parse.quote(iid, safe='')}", ) except Exception as exc: print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr) errors += 1 continue response_text = raw.get("response", "") or "" if not response_text.strip() or len(response_text) < 50: continue candidates, error = extract_one( prompt=raw.get("prompt", "") or "", response=response_text, project=raw.get("project", "") or "", model=args.model, timeout_s=DEFAULT_TIMEOUT_S, ) if error: print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr) errors += 1 continue processed += 1 total_candidates += len(candidates) for c in candidates: try: api_post(args.base_url, "/memory", { "memory_type": c["memory_type"], "content": c["content"], "project": c["project"], "confidence": c["confidence"], "status": "candidate", }) total_persisted += 1 except urllib.error.HTTPError as exc: if exc.code != 400: errors += 1 except Exception: errors += 1 now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") set_last_run(args.base_url, now) print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}") if __name__ == "__main__": main()