Files
ATOCore/scripts/batch_llm_extract_live.py

167 lines
5.4 KiB
Python
Raw Normal View History

"""Host-side LLM batch extraction against a live AtoCore instance.
Fetches recent interactions from the AtoCore API, runs the LLM
extractor locally (requires ``claude`` CLI on PATH), and POSTs
candidates back to the API as ``status=candidate``.
This script runs on the HOST (not inside the Docker container)
because the ``claude`` CLI is installed host-side. The container's
``/admin/extract-batch`` endpoint can't use LLM mode because
``shutil.which("claude")`` returns None inside the container.
Tracks last-run timestamp via project state so re-runs auto-resume.
Usage (manual):
python3 scripts/batch_llm_extract_live.py --base-url http://localhost:8100
Usage (cron, via wrapper):
bash deploy/dalidou/batch-extract.sh
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
# Make src/ importable
_REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_REPO_ROOT / "src"))
from atocore.interactions.service import Interaction # noqa: E402
from atocore.memory.extractor_llm import extract_candidates_llm # noqa: E402
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
DEFAULT_TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "10"))
def api_get(base_url: str, path: str, timeout: int = DEFAULT_TIMEOUT) -> dict:
req = urllib.request.Request(f"{base_url}{path}")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def api_post(base_url: str, path: str, body: dict, timeout: int = DEFAULT_TIMEOUT) -> dict:
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
f"{base_url}{path}",
method="POST",
headers={"Content-Type": "application/json"},
data=data,
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_last_run(base_url: str) -> str | None:
try:
state = api_get(base_url, "/project/state/atocore?category=status")
for entry in state.get("entries", []):
if entry.get("key") == "last_extract_batch_run":
return entry["value"]
except Exception:
pass
return None
def set_last_run(base_url: str, timestamp: str) -> None:
try:
api_post(base_url, "/project/state", {
"project": "atocore",
"category": "status",
"key": "last_extract_batch_run",
"value": timestamp,
"source": "batch_llm_extract_live.py",
})
except Exception:
pass
def fetch_interactions(base_url: str, since: str | None, limit: int) -> list[dict]:
params = [f"limit={limit}"]
if since:
params.append(f"since={urllib.parse.quote(since)}")
query = "?" + "&".join(params)
result = api_get(base_url, f"/interactions{query}")
return result.get("interactions", [])
def main() -> int:
parser = argparse.ArgumentParser(description="Host-side LLM batch extraction")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--limit", type=int, default=50)
parser.add_argument("--since", default=None, help="override last-run timestamp")
args = parser.parse_args()
since = args.since or get_last_run(args.base_url)
print(f"fetching interactions since={since or '(first run)'} limit={args.limit}")
raw_interactions = fetch_interactions(args.base_url, since, args.limit)
print(f"fetched {len(raw_interactions)} interactions")
processed = 0
total_candidates = 0
total_persisted = 0
errors = 0
for raw in raw_interactions:
response_text = raw.get("response", "") or ""
if not response_text.strip():
continue
interaction = Interaction(
id=raw["id"],
prompt=raw.get("prompt", "") or "",
response=response_text,
response_summary=raw.get("response_summary", "") or "",
project=raw.get("project", "") or "",
client=raw.get("client", "") or "",
session_id=raw.get("session_id", "") or "",
created_at=raw.get("created_at", "") or "",
)
try:
candidates = extract_candidates_llm(interaction)
except Exception as exc:
print(f" ! extraction error on {interaction.id[:8]}: {exc}")
errors += 1
continue
processed += 1
total_candidates += len(candidates)
for c in candidates:
try:
api_post(args.base_url, "/memory", {
"memory_type": c.memory_type,
"content": c.content,
"project": c.project,
"confidence": c.confidence,
"status": "candidate",
})
total_persisted += 1
except urllib.error.HTTPError as exc:
if exc.code != 400: # 400 = duplicate, skip silently
errors += 1
except Exception:
errors += 1
from datetime import datetime, timezone
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
set_last_run(args.base_url, now)
print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}")
return 0
if __name__ == "__main__":
raise SystemExit(main())