The extraction system now produces two kinds of candidates from
the same conversation:
A. PROJECT-SPECIFIC: applied facts scoped to a named project
(unchanged behavior)
B. DOMAIN KNOWLEDGE: generalizable engineering insight earned
through project work, tagged with a domain (physics, materials,
optics, mechanics, manufacturing, metrology, controls, software,
math, finance) and stored with project="" so it surfaces across
all projects.
Critical quality bar enforced in the system prompt: "Would a
competent engineer need experience to know this, or could they
find it in 30 seconds on Google?" Textbook values, definitions,
and obvious facts are explicitly excluded. Only hard-won insight
qualifies — the kind that takes weeks of FEA or real machining
experience to discover.
Domain tags are embedded in the content as a prefix ("[physics]",
"[materials]") so they survive without a schema migration. A future
column can parse them out.
Context builder gains a new tier between project memories and
retrieved chunks:
Tier 1: Trusted Project State (project-specific)
Tier 2: Identity / Preferences (global)
Tier 3: Project Memories (project-specific)
Tier 4: Domain Knowledge (NEW) (cross-project, 10% budget)
Tier 5: Retrieved Chunks (project-boosted)
Trim order: chunks -> domain knowledge -> project memories ->
identity/preference -> project state.
Host-side extraction script updated with the same prompt and
domain-tag handling.
LLM_EXTRACTOR_VERSION bumped to llm-0.3.0.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
323 lines
11 KiB
Python
323 lines
11 KiB
Python
"""Host-side LLM batch extraction — pure HTTP client, no atocore imports.
|
|
|
|
Fetches interactions from the AtoCore API, runs ``claude -p`` locally
|
|
for each, and POSTs candidates back. Zero dependency on atocore source
|
|
or Python packages — only uses stdlib + the ``claude`` CLI on PATH.
|
|
|
|
This is necessary because the ``claude`` CLI is on the Dalidou HOST
|
|
but not inside the Docker container, and the host's Python doesn't
|
|
have the container's dependencies (pydantic_settings, etc.).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet")
|
|
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90"))
|
|
MAX_RESPONSE_CHARS = 8000
|
|
MAX_PROMPT_CHARS = 2000
|
|
|
|
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
|
|
|
|
SYSTEM_PROMPT = """You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore.
|
|
|
|
AtoCore stores two kinds of knowledge:
|
|
|
|
A. PROJECT-SPECIFIC: applied decisions, constraints, and architecture for a named project (p04-gigabit, p05-interferometer, p06-polisher, atomizer-v2, atocore). These stay scoped to one project.
|
|
|
|
B. DOMAIN KNOWLEDGE: generalizable engineering insight that was EARNED through project work and is reusable across projects. Tag these with a domain instead of a project.
|
|
|
|
THE CRITICAL BAR FOR DOMAIN KNOWLEDGE:
|
|
Only extract insight that took real effort to discover. The test: "Would a competent engineer need experience to know this, or could they find it in 30 seconds on Google?" If they can look it up, do NOT extract it.
|
|
|
|
EXTRACT (earned insight):
|
|
- "At F/1.2, Zerodur CTE gradient across the blank is the second-largest WFE contributor after gravity sag"
|
|
- "Preston removal rate model breaks down below 5N applied force because the contact assumption fails"
|
|
- "For swing-arm polishing, m=1 (coma) is NOT correctable by force modulation (score 0.09)"
|
|
|
|
DO NOT EXTRACT (common knowledge):
|
|
- "Zerodur CTE is 0.05 ppm/K" (textbook value)
|
|
- "FEA uses finite elements to discretize continuous domains" (definition)
|
|
- "Python is a programming language" (obvious)
|
|
|
|
Rules:
|
|
|
|
1. Only surface durable claims. Skip transient status, instructional guidance, troubleshooting, ephemeral recommendations, session recaps.
|
|
2. A candidate is durable when a reader coming back in two weeks would still need to know it.
|
|
3. Each candidate must stand alone in one sentence under 200 characters.
|
|
4. Type must be one of: project, knowledge, preference, adaptation.
|
|
5. For project-specific claims, set ``project`` to the project id.
|
|
6. For generalizable domain insight, set ``project`` to empty and set ``domain`` to one of: physics, materials, optics, mechanics, manufacturing, metrology, controls, software, math, finance.
|
|
7. When one conversation produces BOTH a project-specific fact AND a generalizable principle, emit BOTH as separate candidates.
|
|
8. Return [] on most turns. The bar is high. Empty is correct and expected.
|
|
9. Confidence 0.5 default. Raise to 0.6 only for unambiguous committed claims.
|
|
10. Output a raw JSON array only. No prose, no markdown fences.
|
|
|
|
Each array element:
|
|
|
|
{"type": "project|knowledge|preference|adaptation", "content": "...", "project": "...", "domain": "", "confidence": 0.5}
|
|
|
|
Use ``project`` for project-scoped candidates. Use ``domain`` for cross-project knowledge. Never set both."""
|
|
|
|
_sandbox_cwd = None
|
|
|
|
|
|
def get_sandbox_cwd():
|
|
global _sandbox_cwd
|
|
if _sandbox_cwd is None:
|
|
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-")
|
|
return _sandbox_cwd
|
|
|
|
|
|
def api_get(base_url, path, timeout=10):
|
|
req = urllib.request.Request(f"{base_url}{path}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def api_post(base_url, path, body, timeout=10):
|
|
data = json.dumps(body).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{base_url}{path}", method="POST",
|
|
headers={"Content-Type": "application/json"}, data=data,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def get_last_run(base_url):
|
|
try:
|
|
state = api_get(base_url, "/project/state/atocore?category=status")
|
|
for entry in state.get("entries", []):
|
|
if entry.get("key") == "last_extract_batch_run":
|
|
return entry["value"]
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def set_last_run(base_url, timestamp):
|
|
try:
|
|
api_post(base_url, "/project/state", {
|
|
"project": "atocore", "category": "status",
|
|
"key": "last_extract_batch_run", "value": timestamp,
|
|
"source": "batch_llm_extract_live.py",
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
_known_projects: set[str] = set()
|
|
|
|
|
|
def _load_known_projects(base_url):
|
|
"""Fetch registered project IDs from the API for R9 validation."""
|
|
global _known_projects
|
|
try:
|
|
data = api_get(base_url, "/projects")
|
|
_known_projects = {p["id"] for p in data.get("projects", [])}
|
|
for p in data.get("projects", []):
|
|
for alias in p.get("aliases", []):
|
|
_known_projects.add(alias)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def extract_one(prompt, response, project, model, timeout_s):
|
|
"""Run claude -p on one interaction, return parsed candidates."""
|
|
if not shutil.which("claude"):
|
|
return [], "claude_cli_missing"
|
|
|
|
prompt_excerpt = prompt[:MAX_PROMPT_CHARS]
|
|
response_excerpt = response[:MAX_RESPONSE_CHARS]
|
|
user_message = (
|
|
f"PROJECT HINT (may be empty): {project}\n\n"
|
|
f"USER PROMPT:\n{prompt_excerpt}\n\n"
|
|
f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n"
|
|
"Return the JSON array now."
|
|
)
|
|
|
|
args = [
|
|
"claude", "-p",
|
|
"--model", model,
|
|
"--append-system-prompt", SYSTEM_PROMPT,
|
|
"--disable-slash-commands",
|
|
user_message,
|
|
]
|
|
|
|
try:
|
|
completed = subprocess.run(
|
|
args, capture_output=True, text=True,
|
|
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
|
encoding="utf-8", errors="replace",
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return [], "timeout"
|
|
except Exception as exc:
|
|
return [], f"subprocess_error: {exc}"
|
|
|
|
if completed.returncode != 0:
|
|
return [], f"exit_{completed.returncode}"
|
|
|
|
raw = (completed.stdout or "").strip()
|
|
return parse_candidates(raw, project), ""
|
|
|
|
|
|
def parse_candidates(raw, interaction_project):
|
|
"""Parse model JSON output into candidate dicts."""
|
|
text = raw.strip()
|
|
if text.startswith("```"):
|
|
text = text.strip("`")
|
|
nl = text.find("\n")
|
|
if nl >= 0:
|
|
text = text[nl + 1:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
text = text.strip()
|
|
|
|
if not text or text == "[]":
|
|
return []
|
|
|
|
if not text.lstrip().startswith("["):
|
|
start = text.find("[")
|
|
end = text.rfind("]")
|
|
if start >= 0 and end > start:
|
|
text = text[start:end + 1]
|
|
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
if not isinstance(parsed, list):
|
|
return []
|
|
|
|
results = []
|
|
for item in parsed:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
mem_type = str(item.get("type") or "").strip().lower()
|
|
content = str(item.get("content") or "").strip()
|
|
model_project = str(item.get("project") or "").strip()
|
|
domain = str(item.get("domain") or "").strip().lower()
|
|
# R9 trust hierarchy: interaction scope always wins when set.
|
|
if interaction_project:
|
|
project = interaction_project
|
|
elif model_project and model_project in _known_projects:
|
|
project = model_project
|
|
else:
|
|
project = ""
|
|
# Domain knowledge: embed tag in content for cross-project retrieval
|
|
if domain and not project:
|
|
content = f"[{domain}] {content}"
|
|
conf = item.get("confidence", 0.5)
|
|
if mem_type not in MEMORY_TYPES or not content:
|
|
continue
|
|
try:
|
|
conf = max(0.0, min(1.0, float(conf)))
|
|
except (TypeError, ValueError):
|
|
conf = 0.5
|
|
results.append({
|
|
"memory_type": mem_type,
|
|
"content": content[:1000],
|
|
"project": project,
|
|
"confidence": conf,
|
|
})
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--limit", type=int, default=50)
|
|
parser.add_argument("--since", default=None)
|
|
parser.add_argument("--model", default=DEFAULT_MODEL)
|
|
args = parser.parse_args()
|
|
|
|
_load_known_projects(args.base_url)
|
|
since = args.since or get_last_run(args.base_url)
|
|
print(f"since={since or '(first run)'} limit={args.limit} model={args.model} known_projects={len(_known_projects)}")
|
|
|
|
params = [f"limit={args.limit}"]
|
|
if since:
|
|
params.append(f"since={urllib.parse.quote(since)}")
|
|
listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}")
|
|
interaction_summaries = listing.get("interactions", [])
|
|
print(f"listed {len(interaction_summaries)} interactions")
|
|
|
|
processed = 0
|
|
total_candidates = 0
|
|
total_persisted = 0
|
|
errors = 0
|
|
|
|
for summary in interaction_summaries:
|
|
resp_chars = summary.get("response_chars", 0) or 0
|
|
if resp_chars < 50:
|
|
continue
|
|
iid = summary["id"]
|
|
try:
|
|
raw = api_get(
|
|
args.base_url,
|
|
f"/interactions/{urllib.parse.quote(iid, safe='')}",
|
|
)
|
|
except Exception as exc:
|
|
print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
response_text = raw.get("response", "") or ""
|
|
if not response_text.strip() or len(response_text) < 50:
|
|
continue
|
|
|
|
candidates, error = extract_one(
|
|
prompt=raw.get("prompt", "") or "",
|
|
response=response_text,
|
|
project=raw.get("project", "") or "",
|
|
model=args.model,
|
|
timeout_s=DEFAULT_TIMEOUT_S,
|
|
)
|
|
|
|
if error:
|
|
print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr)
|
|
errors += 1
|
|
continue
|
|
|
|
processed += 1
|
|
total_candidates += len(candidates)
|
|
|
|
for c in candidates:
|
|
try:
|
|
api_post(args.base_url, "/memory", {
|
|
"memory_type": c["memory_type"],
|
|
"content": c["content"],
|
|
"project": c["project"],
|
|
"confidence": c["confidence"],
|
|
"status": "candidate",
|
|
})
|
|
total_persisted += 1
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code != 400:
|
|
errors += 1
|
|
except Exception:
|
|
errors += 1
|
|
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
set_last_run(args.base_url, now)
|
|
|
|
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|