Extraction prompt rewritten for signal-aggressive mode. The old prompt
rewarded silence ("durable insight only, empty is correct") which
caused quiet failures — real project signal (Schott quotes arriving,
stakeholder events, blockers) was dropped as "not architectural enough".
New prompt explicitly lists what to emit:
1. Project activity (mentions with context — quote received, blocker,
action item)
2. Decisions and choices (architectural commitments, vendor selection)
3. Durable engineering insight (earned knowledge, generalizable)
4. Stakeholder and vendor events (emails sent, meetings scheduled)
5. Preferences and adaptations (how Antoine works)
Philosophy shift: "capture more signal, let triage filter noise"
replaces "extract only durable architectural facts". Auto-triage
already rejects noise well, so moving the filter downstream gives us
visibility into weak signals without polluting active memory.
Added 'episodic' to the candidate types list to support stakeholder
events with a timestamp feel.
LLM_EXTRACTOR_VERSION bumped to llm-0.4.0.
Also: cron-backup.sh now runs POST /ingest/sources before extraction
so new PKM files flow in automatically. Fail-open, non-blocking.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""Host-side LLM batch extraction — pure HTTP client, no atocore imports.
|
|
|
|
Fetches interactions from the AtoCore API, runs ``claude -p`` locally
|
|
for each, and POSTs candidates back. Zero dependency on atocore source
|
|
or Python packages — only uses stdlib + the ``claude`` CLI on PATH.
|
|
|
|
This is necessary because the ``claude`` CLI is on the Dalidou HOST
|
|
but not inside the Docker container, and the host's Python doesn't
|
|
have the container's dependencies (pydantic_settings, etc.).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet")
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90"))
|
|
MAX_RESPONSE_CHARS = 8000
|
|
MAX_PROMPT_CHARS = 2000
|
|
|
|
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
|
|
|
|
SYSTEM_PROMPT = """You extract memory candidates from LLM conversation turns for a personal context engine called AtoCore.
|
|
|
|
AtoCore is the brain for Atomaste's engineering work. Known projects:
|
|
p04-gigabit, p05-interferometer, p06-polisher, atomizer-v2, atocore,
|
|
abb-space. Unknown project names — still tag them, the system auto-detects.
|
|
|
|
Your job is to emit SIGNALS that matter for future context. Be aggressive:
|
|
err on the side of capturing useful signal. Triage filters noise downstream.
|
|
|
|
WHAT TO EMIT (in order of importance):
|
|
|
|
1. PROJECT ACTIVITY — any mention of a project with context worth remembering:
|
|
- "Schott quote received for ABB-Space" (event + project)
|
|
- "Cédric asked about p06 firmware timing" (stakeholder event)
|
|
- "Still waiting on Zygo lead-time from Nabeel" (blocker status)
|
|
- "p05 vendor decision needs to happen this week" (action item)
|
|
|
|
2. DECISIONS AND CHOICES — anything that commits to a direction:
|
|
- "Going with Zygo Verifire SV for p05" (decision)
|
|
- "Dropping stitching from primary workflow" (design choice)
|
|
- "USB SSD mandatory, not SD card" (architectural commitment)
|
|
|
|
3. DURABLE ENGINEERING INSIGHT — earned knowledge that generalizes:
|
|
- "CTE gradient dominates WFE at F/1.2" (materials insight)
|
|
- "Preston model breaks below 5N because contact assumption fails"
|
|
- "m=1 coma NOT correctable by force modulation" (controls insight)
|
|
Test: would a competent engineer NEED experience to know this?
|
|
If it's textbook/google-findable, skip it.
|
|
|
|
4. STAKEHOLDER AND VENDOR EVENTS:
|
|
- "Email sent to Nabeel 2026-04-13 asking for lead time"
|
|
- "Meeting with Jason on Table 7 next Tuesday"
|
|
- "Starspec wants updated CAD by Friday"
|
|
|
|
5. PREFERENCES AND ADAPTATIONS that shape how Antoine works:
|
|
- "Antoine prefers OAuth over API keys"
|
|
- "Extraction stays off the capture hot path"
|
|
|
|
WHAT TO SKIP:
|
|
|
|
- Pure conversational filler ("ok thanks", "let me check")
|
|
- Instructional help content ("run this command", "here's how to...")
|
|
- Obvious textbook facts anyone can google in 30 seconds
|
|
- Session meta-chatter ("let me commit this", "deploy running")
|
|
- Transient system state snapshots ("36 active memories right now")
|
|
|
|
CANDIDATE TYPES — choose the best fit:
|
|
|
|
- project — a fact, decision, or event specific to one named project
|
|
- knowledge — durable engineering insight (use domain, not project)
|
|
- preference — how Antoine works / wants things done
|
|
- adaptation — a standing rule or adjustment to behavior
|
|
- episodic — a stakeholder event or milestone worth remembering
|
|
|
|
DOMAINS for knowledge candidates (required when type=knowledge and project is empty):
|
|
physics, materials, optics, mechanics, manufacturing, metrology,
|
|
controls, software, math, finance, business
|
|
|
|
TRUST HIERARCHY:
|
|
|
|
- project-specific: set project to the project id, leave domain empty
|
|
- domain knowledge: set domain, leave project empty
|
|
- events/activity: use project, type=project or episodic
|
|
- one conversation can produce MULTIPLE candidates — emit them all
|
|
|
|
OUTPUT RULES:
|
|
|
|
- Each candidate content under 250 characters, stands alone
|
|
- Default confidence 0.5. Raise to 0.7 only for ratified/committed claims.
|
|
- Raw JSON array, no prose, no markdown fences
|
|
- Empty array [] is fine when the conversation has no durable signal
|
|
|
|
Each element:
|
|
{"type": "project|knowledge|preference|adaptation|episodic", "content": "...", "project": "...", "domain": "", "confidence": 0.5}"""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body, timeout=10):
|
|
data = json.dumps(body).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def get_last_run(base_url):
|
|
try:
|
|
state = api_get(base_url, "/project/state/atocore?category=status")
|
|
for entry in state.get("entries", []):
|
|
if entry.get("key") == "last_extract_batch_run":
|
|
return entry["value"]
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def set_last_run(base_url, timestamp):
|
|
try:
|
|
api_post(base_url, "/project/state", {
|
|
"project": "atocore", "category": "status",
|
|
"key": "last_extract_batch_run", "value": timestamp,
|
|
"source": "batch_llm_extract_live.py",
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
_known_projects: set[str] = set()
|
|
|
|
|
|
def _load_known_projects(base_url):
|
|
"""Fetch registered project IDs from the API for R9 validation."""
|
|
global _known_projects
|
|
try:
|
|
data = api_get(base_url, "/projects")
|
|
_known_projects = {p["id"] for p in data.get("projects", [])}
|
|
for p in data.get("projects", []):
|
|
for alias in p.get("aliases", []):
|
|
_known_projects.add(alias)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def extract_one(prompt, response, project, model, timeout_s):
|
|
"""Run claude -p on one interaction, return parsed candidates."""
|
|
if not shutil.which("claude"):
|
|
return [], "claude_cli_missing"
|
|
|
|
prompt_excerpt = prompt[:MAX_PROMPT_CHARS]
|
|
response_excerpt = response[:MAX_RESPONSE_CHARS]
|
|
user_message = (
|
|
f"PROJECT HINT (may be empty): {project}\n\n"
|
|
f"USER PROMPT:\n{prompt_excerpt}\n\n"
|
|
f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n"
|
|
"Return the JSON array now."
|
|
)
|
|
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", SYSTEM_PROMPT,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return [], "timeout"
|
|
except Exception as exc:
|
|
return [], f"subprocess_error: {exc}"
|
|
|
|
if completed.returncode != 0:
|
|
return [], f"exit_{completed.returncode}"
|
|
|
|
raw = (completed.stdout or "").strip()
|
|
return parse_candidates(raw, project), ""
|
|
|
|
|
|
def parse_candidates(raw, interaction_project):
|
|
"""Parse model JSON output into candidate dicts."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text or text == "[]":
|
|
return []
|
|
|
|
if not text.lstrip().startswith("["):
|
|
start = text.find("[")
|
|
end = text.rfind("]")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
if not isinstance(parsed, list):
|
|
return []
|
|
|
|
results = []
|
|
for item in parsed:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
mem_type = str(item.get("type") or "").strip().lower()
|
|
content = str(item.get("content") or "").strip()
|
|
model_project = str(item.get("project") or "").strip()
|
|
domain = str(item.get("domain") or "").strip().lower()
|
|
# R9 trust hierarchy: interaction scope always wins when set.
|
|
# For unscoped interactions, keep model's project tag even if
|
|
# unregistered — the system will detect new projects/leads.
|
|
if interaction_project:
|
|
project = interaction_project
|
|
elif model_project:
|
|
project = model_project
|
|
else:
|
|
project = ""
|
|
# Domain knowledge: embed tag in content for cross-project retrieval
|
|
if domain and not project:
|
|
content = f"[{domain}] {content}"
|
|
conf = item.get("confidence", 0.5)
|
|
if mem_type not in MEMORY_TYPES or not content:
|
|
continue
|
|
try:
|
|
conf = max(0.0, min(1.0, float(conf)))
|
|
except (TypeError, ValueError):
|
|
conf = 0.5
|
|
results.append({
|
|
"memory_type": mem_type,
|
|
"content": content[:1000],
|
|
"project": project,
|
|
"confidence": conf,
|
|
})
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--limit", type=int, default=50)
|
|
parser.add_argument("--since", default=None)
|
|
parser.add_argument("--model", default=DEFAULT_MODEL)
|
|
args = parser.parse_args()
|
|
|
|
_load_known_projects(args.base_url)
|
|
since = args.since or get_last_run(args.base_url)
|
|
print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}")
|
|
|
|
params = [f"limit={args.limit}"]
|
|
if since:
|
|
params.append(f"since={urllib.parse.quote(since)}")
|
|
listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}")
|
|
interaction_summaries = listing.get("interactions", [])
|
|
print(f"listed {len(interaction_summaries)} interactions")
|
|
|
|
processed = 0
|
|
total_candidates = 0
|
|
total_persisted = 0
|
|
errors = 0
|
|
|
|
for summary in interaction_summaries:
|
|
resp_chars = summary.get("response_chars", 0) or 0
|
|
if resp_chars < 50:
|
|
continue
|
|
iid = summary["id"]
|
|
try:
|
|
raw = api_get(
|
|
args.base_url,
|
|
f"/interactions/{urllib.parse.quote(iid, safe='')}",
|
|
)
|
|
except Exception as exc:
|
|
print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
response_text = raw.get("response", "") or ""
|
|
if not response_text.strip() or len(response_text) < 50:
|
|
continue
|
|
|
|
candidates, error = extract_one(
|
|
prompt=raw.get("prompt", "") or "",
|
|
response=response_text,
|
|
project=raw.get("project", "") or "",
|
|
model=args.model,
|
|
timeout_s=DEFAULT_TIMEOUT_S,
|
|
)
|
|
|
|
if error:
|
|
print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
|
|
processed += 1
|
|
total_candidates += len(candidates)
|
|
|
|
for c in candidates:
|
|
try:
|
|
api_post(args.base_url, "/memory", {
|
|
"memory_type": c["memory_type"],
|
|
"content": c["content"],
|
|
"project": c["project"],
|
|
"confidence": c["confidence"],
|
|
"status": "candidate",
|
|
})
|
|
total_persisted += 1
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code != 400:
|
|
errors += 1
|
|
except Exception:
|
|
errors += 1
|
|
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
set_last_run(args.base_url, now)
|
|
|
|
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|