diff --git a/deploy/dalidou/batch-extract.sh b/deploy/dalidou/batch-extract.sh new file mode 100644 index 0000000..6a26109 --- /dev/null +++ b/deploy/dalidou/batch-extract.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# +# deploy/dalidou/batch-extract.sh +# -------------------------------- +# Host-side LLM batch extraction for Dalidou. +# +# The claude CLI is available on the Dalidou HOST but NOT inside the +# Docker container. This script runs on the host, fetches recent +# interactions from the AtoCore API, runs the LLM extractor locally +# (claude -p sonnet), and posts candidates back to the API. +# +# Intended to be called from cron-backup.sh after backup/cleanup/rsync, +# or manually via: +# +# bash /srv/storage/atocore/app/deploy/dalidou/batch-extract.sh +# +# Environment variables: +# ATOCORE_URL default http://127.0.0.1:8100 +# ATOCORE_EXTRACT_LIMIT default 50 + +set -euo pipefail + +ATOCORE_URL="${ATOCORE_URL:-http://127.0.0.1:8100}" +LIMIT="${ATOCORE_EXTRACT_LIMIT:-50}" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +APP_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" +TIMESTAMP="$(date -u +%Y-%m-%dT%H:%M:%SZ)" + +log() { printf '[%s] %s\n' "$TIMESTAMP" "$*"; } + +# The Python script needs the atocore source on PYTHONPATH +export PYTHONPATH="$APP_DIR/src:${PYTHONPATH:-}" + +log "=== AtoCore batch LLM extraction starting ===" +log "URL=$ATOCORE_URL LIMIT=$LIMIT" + +# Run the host-side extraction script +python3 "$APP_DIR/scripts/batch_llm_extract_live.py" \ + --base-url "$ATOCORE_URL" \ + --limit "$LIMIT" \ + 2>&1 || { + log "WARN: batch extraction failed (non-blocking)" +} + +log "=== AtoCore batch LLM extraction complete ===" diff --git a/deploy/dalidou/cron-backup.sh b/deploy/dalidou/cron-backup.sh index 30c7f53..64570f5 100755 --- a/deploy/dalidou/cron-backup.sh +++ b/deploy/dalidou/cron-backup.sh @@ -83,22 +83,18 @@ else fi # Step 4: Batch LLM extraction on recent interactions (optional). -# Runs the LLM extractor (claude -p sonnet) against interactions -# captured since the last batch run. Candidates land as -# status=candidate for human or auto-triage review. +# Runs HOST-SIDE because claude CLI is on the host, not inside the +# Docker container. The script fetches interactions from the API, +# runs claude -p locally, and POSTs candidates back. # Fail-open: extraction failure never blocks backup. -# The endpoint tracks its own last-run timestamp in project state. EXTRACT="${ATOCORE_EXTRACT_BATCH:-true}" if [[ "$EXTRACT" == "true" ]]; then - log "Step 4: running batch LLM extraction" - EXTRACT_RESULT=$(curl -sf -X POST \ - -H "Content-Type: application/json" \ - -d '{"mode": "llm", "persist": true, "limit": 50}' \ - --max-time 600 \ - "$ATOCORE_URL/admin/extract-batch" 2>&1) && { - log "Extraction result: $EXTRACT_RESULT" + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + log "Step 4: running host-side batch LLM extraction" + bash "$SCRIPT_DIR/batch-extract.sh" 2>&1 && { + log "Extraction complete" } || { - log "WARN: batch extraction failed (this is non-blocking): $EXTRACT_RESULT" + log "WARN: batch extraction failed (this is non-blocking)" } else log "Step 4: ATOCORE_EXTRACT_BATCH not set to true, skipping extraction" diff --git a/scripts/batch_llm_extract_live.py b/scripts/batch_llm_extract_live.py new file mode 100644 index 0000000..67d3328 --- /dev/null +++ b/scripts/batch_llm_extract_live.py @@ -0,0 +1,166 @@ +"""Host-side LLM batch extraction against a live AtoCore instance. + +Fetches recent interactions from the AtoCore API, runs the LLM +extractor locally (requires ``claude`` CLI on PATH), and POSTs +candidates back to the API as ``status=candidate``. + +This script runs on the HOST (not inside the Docker container) +because the ``claude`` CLI is installed host-side. The container's +``/admin/extract-batch`` endpoint can't use LLM mode because +``shutil.which("claude")`` returns None inside the container. + +Tracks last-run timestamp via project state so re-runs auto-resume. + +Usage (manual): + + python3 scripts/batch_llm_extract_live.py --base-url http://localhost:8100 + +Usage (cron, via wrapper): + + bash deploy/dalidou/batch-extract.sh +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path + +# Make src/ importable +_REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(_REPO_ROOT / "src")) + +from atocore.interactions.service import Interaction # noqa: E402 +from atocore.memory.extractor_llm import extract_candidates_llm # noqa: E402 + +DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100") +DEFAULT_TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "10")) + + +def api_get(base_url: str, path: str, timeout: int = DEFAULT_TIMEOUT) -> dict: + req = urllib.request.Request(f"{base_url}{path}") + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def api_post(base_url: str, path: str, body: dict, timeout: int = DEFAULT_TIMEOUT) -> dict: + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + f"{base_url}{path}", + method="POST", + headers={"Content-Type": "application/json"}, + data=data, + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def get_last_run(base_url: str) -> str | None: + try: + state = api_get(base_url, "/project/state/atocore?category=status") + for entry in state.get("entries", []): + if entry.get("key") == "last_extract_batch_run": + return entry["value"] + except Exception: + pass + return None + + +def set_last_run(base_url: str, timestamp: str) -> None: + try: + api_post(base_url, "/project/state", { + "project": "atocore", + "category": "status", + "key": "last_extract_batch_run", + "value": timestamp, + "source": "batch_llm_extract_live.py", + }) + except Exception: + pass + + +def fetch_interactions(base_url: str, since: str | None, limit: int) -> list[dict]: + params = [f"limit={limit}"] + if since: + params.append(f"since={urllib.parse.quote(since)}") + query = "?" + "&".join(params) + result = api_get(base_url, f"/interactions{query}") + return result.get("interactions", []) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Host-side LLM batch extraction") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--limit", type=int, default=50) + parser.add_argument("--since", default=None, help="override last-run timestamp") + args = parser.parse_args() + + since = args.since or get_last_run(args.base_url) + print(f"fetching interactions since={since or '(first run)'} limit={args.limit}") + + raw_interactions = fetch_interactions(args.base_url, since, args.limit) + print(f"fetched {len(raw_interactions)} interactions") + + processed = 0 + total_candidates = 0 + total_persisted = 0 + errors = 0 + + for raw in raw_interactions: + response_text = raw.get("response", "") or "" + if not response_text.strip(): + continue + + interaction = Interaction( + id=raw["id"], + prompt=raw.get("prompt", "") or "", + response=response_text, + response_summary=raw.get("response_summary", "") or "", + project=raw.get("project", "") or "", + client=raw.get("client", "") or "", + session_id=raw.get("session_id", "") or "", + created_at=raw.get("created_at", "") or "", + ) + + try: + candidates = extract_candidates_llm(interaction) + except Exception as exc: + print(f" ! extraction error on {interaction.id[:8]}: {exc}") + errors += 1 + continue + + processed += 1 + total_candidates += len(candidates) + + for c in candidates: + try: + api_post(args.base_url, "/memory", { + "memory_type": c.memory_type, + "content": c.content, + "project": c.project, + "confidence": c.confidence, + "status": "candidate", + }) + total_persisted += 1 + except urllib.error.HTTPError as exc: + if exc.code != 400: # 400 = duplicate, skip silently + errors += 1 + except Exception: + errors += 1 + + from datetime import datetime, timezone + + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + set_last_run(args.base_url, now) + + print(f"processed={processed} candidates={total_candidates} persisted={total_persisted} errors={errors}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())