feat(R1/R5): POST /admin/extract-batch + LLM mode on single extract

Day 1 of the operational-reflection batch. Two changes:

1. POST /admin/extract-batch: batch extraction endpoint that fetches
   recent interactions (since last run or explicit 'since' param),
   runs the extractor (rule or LLM mode), and persists candidates
   with status=candidate. Tracks last-run timestamp in project state
   (atocore/status/last_extract_batch_run) so subsequent calls
   auto-resume. This is the operational home for R1/R5 — makes the
   LLM extractor an API operation, not just a script.

2. POST /interactions/{id}/extract now accepts mode: "rule" | "llm"
   (default "rule" for backward compatibility). When "llm", it uses
   extract_candidates_llm (claude -p sonnet, OAuth).

Both changes preserve the standing decision: extraction stays off
the capture hot path. The batch endpoint is invoked explicitly by
cron, manual curl, or CLI — never inline with POST /interactions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-12 10:45:42 -04:00
parent 54d84b52cb
commit bcb7675a0d

View File

@@ -35,6 +35,10 @@ from atocore.memory.extractor import (
MemoryCandidate,
extract_candidates_from_interaction,
)
from atocore.memory.extractor_llm import (
LLM_EXTRACTOR_VERSION,
extract_candidates_llm,
)
from atocore.memory.reinforcement import reinforce_from_interaction
from atocore.memory.service import (
MEMORY_STATUSES,
@@ -580,6 +584,7 @@ def api_reinforce_interaction(interaction_id: str) -> dict:
class InteractionExtractRequest(BaseModel):
persist: bool = False
mode: str = "rule" # "rule" or "llm"
@router.post("/interactions/{interaction_id}/extract")
@@ -601,7 +606,10 @@ def api_extract_from_interaction(
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
payload = req or InteractionExtractRequest()
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
if payload.mode == "llm":
candidates: list[MemoryCandidate] = extract_candidates_llm(interaction)
else:
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
persisted_ids: list[str] = []
if payload.persist:
@@ -755,6 +763,109 @@ def api_cleanup_backups(req: BackupCleanupRequest | None = None) -> dict:
raise HTTPException(status_code=500, detail=f"Cleanup failed: {e}")
class ExtractBatchRequest(BaseModel):
since: str | None = None
mode: str = "llm"
limit: int = 50
persist: bool = True
@router.post("/admin/extract-batch")
def api_extract_batch(req: ExtractBatchRequest | None = None) -> dict:
"""Run batch extraction across recent interactions.
Fetches interactions since ``since`` (or since the last recorded
batch run), runs the extractor (rule or LLM) on each, and persists
any candidates as ``status=candidate``. The last-run timestamp is
stored in project state under ``atocore / status /
last_extract_batch_run`` so subsequent calls without ``since``
automatically pick up where the last run left off.
This endpoint is the operational home for R1 / R5 — it makes the
LLM extractor accessible as an API operation rather than a
script-only eval tool. Still NOT on the capture hot path: callers
invoke this endpoint explicitly (cron, manual curl, CLI).
"""
payload = req or ExtractBatchRequest()
since = payload.since
if not since:
state_entries = get_state("atocore")
for entry in state_entries:
if entry.category == "status" and entry.key == "last_extract_batch_run":
since = entry.value
break
interactions = list_interactions(since=since, limit=min(payload.limit, 200))
processed = 0
total_candidates = 0
total_persisted = 0
errors: list[dict] = []
for interaction in interactions:
if not (interaction.response or interaction.response_summary):
continue
try:
if payload.mode == "llm":
candidates = extract_candidates_llm(interaction)
else:
candidates = extract_candidates_from_interaction(interaction)
except Exception as exc:
errors.append({"interaction_id": interaction.id, "error": str(exc)})
continue
processed += 1
total_candidates += len(candidates)
if payload.persist and candidates:
for candidate in candidates:
try:
create_memory(
memory_type=candidate.memory_type,
content=candidate.content,
project=candidate.project,
confidence=candidate.confidence,
status="candidate",
)
total_persisted += 1
except ValueError:
pass # duplicate — skip silently
from datetime import datetime, timezone
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
try:
set_state(
project="atocore",
category="status",
key="last_extract_batch_run",
value=now,
source="admin/extract-batch endpoint",
)
except Exception:
pass # best-effort timestamp tracking
log.info(
"extract_batch_complete",
mode=payload.mode,
processed=processed,
total_candidates=total_candidates,
total_persisted=total_persisted,
errors=len(errors),
)
return {
"processed": processed,
"total_candidates": total_candidates,
"total_persisted": total_persisted,
"mode": payload.mode,
"persist": payload.persist,
"since": since or "(first run)",
"errors": errors,
}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""