Compare commits
7 Commits
codex/audi
...
ac7f77d86d
| Author | SHA1 | Date | |
|---|---|---|---|
| ac7f77d86d | |||
| 719ff649a8 | |||
| 8af8af90d0 | |||
| cd0fd390a8 | |||
| c67bec095c | |||
| bcb7675a0d | |||
| 54d84b52cb |
45
deploy/dalidou/batch-extract.sh
Normal file
45
deploy/dalidou/batch-extract.sh
Normal file
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# deploy/dalidou/batch-extract.sh
|
||||
# --------------------------------
|
||||
# Host-side LLM batch extraction for Dalidou.
|
||||
#
|
||||
# The claude CLI is available on the Dalidou HOST but NOT inside the
|
||||
# Docker container. This script runs on the host, fetches recent
|
||||
# interactions from the AtoCore API, runs the LLM extractor locally
|
||||
# (claude -p sonnet), and posts candidates back to the API.
|
||||
#
|
||||
# Intended to be called from cron-backup.sh after backup/cleanup/rsync,
|
||||
# or manually via:
|
||||
#
|
||||
# bash /srv/storage/atocore/app/deploy/dalidou/batch-extract.sh
|
||||
#
|
||||
# Environment variables:
|
||||
# ATOCORE_URL default http://127.0.0.1:8100
|
||||
# ATOCORE_EXTRACT_LIMIT default 50
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
ATOCORE_URL="${ATOCORE_URL:-http://127.0.0.1:8100}"
|
||||
LIMIT="${ATOCORE_EXTRACT_LIMIT:-50}"
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
APP_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"
|
||||
TIMESTAMP="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
|
||||
|
||||
log() { printf '[%s] %s\n' "$TIMESTAMP" "$*"; }
|
||||
|
||||
# The Python script needs the atocore source on PYTHONPATH
|
||||
export PYTHONPATH="$APP_DIR/src:${PYTHONPATH:-}"
|
||||
|
||||
log "=== AtoCore batch LLM extraction starting ==="
|
||||
log "URL=$ATOCORE_URL LIMIT=$LIMIT"
|
||||
|
||||
# Run the host-side extraction script
|
||||
python3 "$APP_DIR/scripts/batch_llm_extract_live.py" \
|
||||
--base-url "$ATOCORE_URL" \
|
||||
--limit "$LIMIT" \
|
||||
2>&1 || {
|
||||
log "WARN: batch extraction failed (non-blocking)"
|
||||
}
|
||||
|
||||
log "=== AtoCore batch LLM extraction complete ==="
|
||||
@@ -82,4 +82,22 @@ else
|
||||
log "Step 3: ATOCORE_BACKUP_RSYNC not set, skipping off-host copy"
|
||||
fi
|
||||
|
||||
# Step 4: Batch LLM extraction on recent interactions (optional).
|
||||
# Runs HOST-SIDE because claude CLI is on the host, not inside the
|
||||
# Docker container. The script fetches interactions from the API,
|
||||
# runs claude -p locally, and POSTs candidates back.
|
||||
# Fail-open: extraction failure never blocks backup.
|
||||
EXTRACT="${ATOCORE_EXTRACT_BATCH:-true}"
|
||||
if [[ "$EXTRACT" == "true" ]]; then
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
log "Step 4: running host-side batch LLM extraction"
|
||||
bash "$SCRIPT_DIR/batch-extract.sh" 2>&1 && {
|
||||
log "Extraction complete"
|
||||
} || {
|
||||
log "WARN: batch extraction failed (this is non-blocking)"
|
||||
}
|
||||
else
|
||||
log "Step 4: ATOCORE_EXTRACT_BATCH not set to true, skipping extraction"
|
||||
fi
|
||||
|
||||
log "=== AtoCore daily backup complete ==="
|
||||
|
||||
277
scripts/batch_llm_extract_live.py
Normal file
277
scripts/batch_llm_extract_live.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""Host-side LLM batch extraction — pure HTTP client, no atocore imports.
|
||||
|
||||
Fetches interactions from the AtoCore API, runs ``claude -p`` locally
|
||||
for each, and POSTs candidates back. Zero dependency on atocore source
|
||||
or Python packages — only uses stdlib + the ``claude`` CLI on PATH.
|
||||
|
||||
This is necessary because the ``claude`` CLI is on the Dalidou HOST
|
||||
but not inside the Docker container, and the host's Python doesn't
|
||||
have the container's dependencies (pydantic_settings, etc.).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
|
||||
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
||||
DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet")
|
||||
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_LLM_EXTRACTOR_TIMEOUT_S", "90"))
|
||||
MAX_RESPONSE_CHARS = 8000
|
||||
MAX_PROMPT_CHARS = 2000
|
||||
|
||||
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
|
||||
|
||||
SYSTEM_PROMPT = """You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore.
|
||||
|
||||
Your job is to read one user prompt plus the assistant's response and decide which durable facts, decisions, preferences, architectural rules, or project invariants should be remembered across future sessions.
|
||||
|
||||
Rules:
|
||||
|
||||
1. Only surface durable claims. Skip transient status ("deploy is still running"), instructional guidance ("here is how to run the command"), troubleshooting tactics, ephemeral recommendations ("merge this PR now"), and session recaps.
|
||||
2. A candidate is durable when a reader coming back in two weeks would still need to know it. Architectural choices, named rules, ratified decisions, invariants, procurement commitments, and project-level constraints qualify. Conversational fillers and step-by-step instructions do not.
|
||||
3. Each candidate must stand alone. Rewrite the claim in one sentence under 200 characters with enough context that a reader without the conversation understands it.
|
||||
4. Each candidate must have a type from this closed set: project, knowledge, preference, adaptation.
|
||||
5. If the conversation is clearly scoped to a project (p04-gigabit, p05-interferometer, p06-polisher, atocore), set ``project`` to that id. Otherwise leave ``project`` empty.
|
||||
6. If the response makes no durable claim, return an empty list. It is correct and expected to return [] on most conversational turns.
|
||||
7. Confidence should be 0.5 by default so human review workload is honest. Raise to 0.6 only when the response states the claim in an unambiguous, committed form (e.g. "the decision is X", "the selected approach is Y", "X is non-negotiable").
|
||||
8. Output must be a raw JSON array and nothing else. No prose before or after. No markdown fences. No explanations.
|
||||
|
||||
Each array element has exactly this shape:
|
||||
|
||||
{"type": "project|knowledge|preference|adaptation", "content": "...", "project": "...", "confidence": 0.5}
|
||||
|
||||
Return [] when there is nothing to extract."""
|
||||
|
||||
_sandbox_cwd = None
|
||||
|
||||
|
||||
def get_sandbox_cwd():
|
||||
global _sandbox_cwd
|
||||
if _sandbox_cwd is None:
|
||||
_sandbox_cwd = tempfile.mkdtemp(prefix="ato-llm-extract-")
|
||||
return _sandbox_cwd
|
||||
|
||||
|
||||
def api_get(base_url, path, timeout=10):
|
||||
req = urllib.request.Request(f"{base_url}{path}")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def api_post(base_url, path, body, timeout=10):
|
||||
data = json.dumps(body).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{base_url}{path}", method="POST",
|
||||
headers={"Content-Type": "application/json"}, data=data,
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def get_last_run(base_url):
|
||||
try:
|
||||
state = api_get(base_url, "/project/state/atocore?category=status")
|
||||
for entry in state.get("entries", []):
|
||||
if entry.get("key") == "last_extract_batch_run":
|
||||
return entry["value"]
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def set_last_run(base_url, timestamp):
|
||||
try:
|
||||
api_post(base_url, "/project/state", {
|
||||
"project": "atocore", "category": "status",
|
||||
"key": "last_extract_batch_run", "value": timestamp,
|
||||
"source": "batch_llm_extract_live.py",
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def extract_one(prompt, response, project, model, timeout_s):
|
||||
"""Run claude -p on one interaction, return parsed candidates."""
|
||||
if not shutil.which("claude"):
|
||||
return [], "claude_cli_missing"
|
||||
|
||||
prompt_excerpt = prompt[:MAX_PROMPT_CHARS]
|
||||
response_excerpt = response[:MAX_RESPONSE_CHARS]
|
||||
user_message = (
|
||||
f"PROJECT HINT (may be empty): {project}\n\n"
|
||||
f"USER PROMPT:\n{prompt_excerpt}\n\n"
|
||||
f"ASSISTANT RESPONSE:\n{response_excerpt}\n\n"
|
||||
"Return the JSON array now."
|
||||
)
|
||||
|
||||
args = [
|
||||
"claude", "-p",
|
||||
"--model", model,
|
||||
"--append-system-prompt", SYSTEM_PROMPT,
|
||||
"--disable-slash-commands",
|
||||
user_message,
|
||||
]
|
||||
|
||||
try:
|
||||
completed = subprocess.run(
|
||||
args, capture_output=True, text=True,
|
||||
timeout=timeout_s, cwd=get_sandbox_cwd(),
|
||||
encoding="utf-8", errors="replace",
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return [], "timeout"
|
||||
except Exception as exc:
|
||||
return [], f"subprocess_error: {exc}"
|
||||
|
||||
if completed.returncode != 0:
|
||||
return [], f"exit_{completed.returncode}"
|
||||
|
||||
raw = (completed.stdout or "").strip()
|
||||
return parse_candidates(raw, project), ""
|
||||
|
||||
|
||||
def parse_candidates(raw, interaction_project):
|
||||
"""Parse model JSON output into candidate dicts."""
|
||||
text = raw.strip()
|
||||
if text.startswith("```"):
|
||||
text = text.strip("`")
|
||||
nl = text.find("\n")
|
||||
if nl >= 0:
|
||||
text = text[nl + 1:]
|
||||
if text.endswith("```"):
|
||||
text = text[:-3]
|
||||
text = text.strip()
|
||||
|
||||
if not text or text == "[]":
|
||||
return []
|
||||
|
||||
if not text.lstrip().startswith("["):
|
||||
start = text.find("[")
|
||||
end = text.rfind("]")
|
||||
if start >= 0 and end > start:
|
||||
text = text[start:end + 1]
|
||||
|
||||
try:
|
||||
parsed = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
if not isinstance(parsed, list):
|
||||
return []
|
||||
|
||||
results = []
|
||||
for item in parsed:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
mem_type = str(item.get("type") or "").strip().lower()
|
||||
content = str(item.get("content") or "").strip()
|
||||
project = str(item.get("project") or "").strip()
|
||||
if not project and interaction_project:
|
||||
project = interaction_project
|
||||
conf = item.get("confidence", 0.5)
|
||||
if mem_type not in MEMORY_TYPES or not content:
|
||||
continue
|
||||
try:
|
||||
conf = max(0.0, min(1.0, float(conf)))
|
||||
except (TypeError, ValueError):
|
||||
conf = 0.5
|
||||
results.append({
|
||||
"memory_type": mem_type,
|
||||
"content": content[:1000],
|
||||
"project": project,
|
||||
"confidence": conf,
|
||||
})
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
|
||||
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
||||
parser.add_argument("--limit", type=int, default=50)
|
||||
parser.add_argument("--since", default=None)
|
||||
parser.add_argument("--model", default=DEFAULT_MODEL)
|
||||
args = parser.parse_args()
|
||||
|
||||
since = args.since or get_last_run(args.base_url)
|
||||
print(f"since={since or '(first run)'} limit={args.limit} model={args.model}")
|
||||
|
||||
params = [f"limit={args.limit}"]
|
||||
if since:
|
||||
params.append(f"since={urllib.parse.quote(since)}")
|
||||
listing = api_get(args.base_url, f"/interactions?{'&'.join(params)}")
|
||||
interaction_summaries = listing.get("interactions", [])
|
||||
print(f"listed {len(interaction_summaries)} interactions")
|
||||
|
||||
processed = 0
|
||||
total_candidates = 0
|
||||
total_persisted = 0
|
||||
errors = 0
|
||||
|
||||
for summary in interaction_summaries:
|
||||
resp_chars = summary.get("response_chars", 0) or 0
|
||||
if resp_chars < 50:
|
||||
continue
|
||||
iid = summary["id"]
|
||||
try:
|
||||
raw = api_get(
|
||||
args.base_url,
|
||||
f"/interactions/{urllib.parse.quote(iid, safe='')}",
|
||||
)
|
||||
except Exception as exc:
|
||||
print(f" ! {iid[:8]}: fetch failed: {exc}", file=sys.stderr)
|
||||
errors += 1
|
||||
continue
|
||||
response_text = raw.get("response", "") or ""
|
||||
if not response_text.strip() or len(response_text) < 50:
|
||||
continue
|
||||
|
||||
candidates, error = extract_one(
|
||||
prompt=raw.get("prompt", "") or "",
|
||||
response=response_text,
|
||||
project=raw.get("project", "") or "",
|
||||
model=args.model,
|
||||
timeout_s=DEFAULT_TIMEOUT_S,
|
||||
)
|
||||
|
||||
if error:
|
||||
print(f" ! {raw['id'][:8]}: {error}", file=sys.stderr)
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
processed += 1
|
||||
total_candidates += len(candidates)
|
||||
|
||||
for c in candidates:
|
||||
try:
|
||||
api_post(args.base_url, "/memory", {
|
||||
"memory_type": c["memory_type"],
|
||||
"content": c["content"],
|
||||
"project": c["project"],
|
||||
"confidence": c["confidence"],
|
||||
"status": "candidate",
|
||||
})
|
||||
total_persisted += 1
|
||||
except urllib.error.HTTPError as exc:
|
||||
if exc.code != 400:
|
||||
errors += 1
|
||||
except Exception:
|
||||
errors += 1
|
||||
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
set_last_run(args.base_url, now)
|
||||
|
||||
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -35,6 +35,10 @@ from atocore.memory.extractor import (
|
||||
MemoryCandidate,
|
||||
extract_candidates_from_interaction,
|
||||
)
|
||||
from atocore.memory.extractor_llm import (
|
||||
LLM_EXTRACTOR_VERSION,
|
||||
extract_candidates_llm,
|
||||
)
|
||||
from atocore.memory.reinforcement import reinforce_from_interaction
|
||||
from atocore.memory.service import (
|
||||
MEMORY_STATUSES,
|
||||
@@ -580,6 +584,7 @@ def api_reinforce_interaction(interaction_id: str) -> dict:
|
||||
|
||||
class InteractionExtractRequest(BaseModel):
|
||||
persist: bool = False
|
||||
mode: str = "rule" # "rule" or "llm"
|
||||
|
||||
|
||||
@router.post("/interactions/{interaction_id}/extract")
|
||||
@@ -601,7 +606,10 @@ def api_extract_from_interaction(
|
||||
if interaction is None:
|
||||
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
|
||||
payload = req or InteractionExtractRequest()
|
||||
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
|
||||
if payload.mode == "llm":
|
||||
candidates: list[MemoryCandidate] = extract_candidates_llm(interaction)
|
||||
else:
|
||||
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
|
||||
|
||||
persisted_ids: list[str] = []
|
||||
if payload.persist:
|
||||
@@ -755,6 +763,109 @@ def api_cleanup_backups(req: BackupCleanupRequest | None = None) -> dict:
|
||||
raise HTTPException(status_code=500, detail=f"Cleanup failed: {e}")
|
||||
|
||||
|
||||
class ExtractBatchRequest(BaseModel):
|
||||
since: str | None = None
|
||||
mode: str = "llm"
|
||||
limit: int = 50
|
||||
persist: bool = True
|
||||
|
||||
|
||||
@router.post("/admin/extract-batch")
|
||||
def api_extract_batch(req: ExtractBatchRequest | None = None) -> dict:
|
||||
"""Run batch extraction across recent interactions.
|
||||
|
||||
Fetches interactions since ``since`` (or since the last recorded
|
||||
batch run), runs the extractor (rule or LLM) on each, and persists
|
||||
any candidates as ``status=candidate``. The last-run timestamp is
|
||||
stored in project state under ``atocore / status /
|
||||
last_extract_batch_run`` so subsequent calls without ``since``
|
||||
automatically pick up where the last run left off.
|
||||
|
||||
This endpoint is the operational home for R1 / R5 — it makes the
|
||||
LLM extractor accessible as an API operation rather than a
|
||||
script-only eval tool. Still NOT on the capture hot path: callers
|
||||
invoke this endpoint explicitly (cron, manual curl, CLI).
|
||||
"""
|
||||
payload = req or ExtractBatchRequest()
|
||||
since = payload.since
|
||||
|
||||
if not since:
|
||||
state_entries = get_state("atocore")
|
||||
for entry in state_entries:
|
||||
if entry.category == "status" and entry.key == "last_extract_batch_run":
|
||||
since = entry.value
|
||||
break
|
||||
|
||||
interactions = list_interactions(since=since, limit=min(payload.limit, 200))
|
||||
|
||||
processed = 0
|
||||
total_candidates = 0
|
||||
total_persisted = 0
|
||||
errors: list[dict] = []
|
||||
|
||||
for interaction in interactions:
|
||||
if not (interaction.response or interaction.response_summary):
|
||||
continue
|
||||
try:
|
||||
if payload.mode == "llm":
|
||||
candidates = extract_candidates_llm(interaction)
|
||||
else:
|
||||
candidates = extract_candidates_from_interaction(interaction)
|
||||
except Exception as exc:
|
||||
errors.append({"interaction_id": interaction.id, "error": str(exc)})
|
||||
continue
|
||||
|
||||
processed += 1
|
||||
total_candidates += len(candidates)
|
||||
|
||||
if payload.persist and candidates:
|
||||
for candidate in candidates:
|
||||
try:
|
||||
create_memory(
|
||||
memory_type=candidate.memory_type,
|
||||
content=candidate.content,
|
||||
project=candidate.project,
|
||||
confidence=candidate.confidence,
|
||||
status="candidate",
|
||||
)
|
||||
total_persisted += 1
|
||||
except ValueError:
|
||||
pass # duplicate — skip silently
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
try:
|
||||
set_state(
|
||||
project="atocore",
|
||||
category="status",
|
||||
key="last_extract_batch_run",
|
||||
value=now,
|
||||
source="admin/extract-batch endpoint",
|
||||
)
|
||||
except Exception:
|
||||
pass # best-effort timestamp tracking
|
||||
|
||||
log.info(
|
||||
"extract_batch_complete",
|
||||
mode=payload.mode,
|
||||
processed=processed,
|
||||
total_candidates=total_candidates,
|
||||
total_persisted=total_persisted,
|
||||
errors=len(errors),
|
||||
)
|
||||
|
||||
return {
|
||||
"processed": processed,
|
||||
"total_candidates": total_candidates,
|
||||
"total_persisted": total_persisted,
|
||||
"mode": payload.mode,
|
||||
"persist": payload.persist,
|
||||
"since": since or "(first run)",
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/admin/backup/{stamp}/validate")
|
||||
def api_validate_backup(stamp: str) -> dict:
|
||||
"""Validate that a previously created backup is structurally usable."""
|
||||
|
||||
@@ -168,7 +168,6 @@ def extract_candidates_llm_verbose(
|
||||
model or DEFAULT_MODEL,
|
||||
"--append-system-prompt",
|
||||
_SYSTEM_PROMPT,
|
||||
"--no-session-persistence",
|
||||
"--disable-slash-commands",
|
||||
user_message,
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user