diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index 847621c..dccc0b8 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -35,6 +35,10 @@ from atocore.memory.extractor import ( MemoryCandidate, extract_candidates_from_interaction, ) +from atocore.memory.extractor_llm import ( + LLM_EXTRACTOR_VERSION, + extract_candidates_llm, +) from atocore.memory.reinforcement import reinforce_from_interaction from atocore.memory.service import ( MEMORY_STATUSES, @@ -580,6 +584,7 @@ def api_reinforce_interaction(interaction_id: str) -> dict: class InteractionExtractRequest(BaseModel): persist: bool = False + mode: str = "rule" # "rule" or "llm" @router.post("/interactions/{interaction_id}/extract") @@ -601,7 +606,10 @@ def api_extract_from_interaction( if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") payload = req or InteractionExtractRequest() - candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction) + if payload.mode == "llm": + candidates: list[MemoryCandidate] = extract_candidates_llm(interaction) + else: + candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction) persisted_ids: list[str] = [] if payload.persist: @@ -755,6 +763,109 @@ def api_cleanup_backups(req: BackupCleanupRequest | None = None) -> dict: raise HTTPException(status_code=500, detail=f"Cleanup failed: {e}") +class ExtractBatchRequest(BaseModel): + since: str | None = None + mode: str = "llm" + limit: int = 50 + persist: bool = True + + +@router.post("/admin/extract-batch") +def api_extract_batch(req: ExtractBatchRequest | None = None) -> dict: + """Run batch extraction across recent interactions. + + Fetches interactions since ``since`` (or since the last recorded + batch run), runs the extractor (rule or LLM) on each, and persists + any candidates as ``status=candidate``. The last-run timestamp is + stored in project state under ``atocore / status / + last_extract_batch_run`` so subsequent calls without ``since`` + automatically pick up where the last run left off. + + This endpoint is the operational home for R1 / R5 — it makes the + LLM extractor accessible as an API operation rather than a + script-only eval tool. Still NOT on the capture hot path: callers + invoke this endpoint explicitly (cron, manual curl, CLI). + """ + payload = req or ExtractBatchRequest() + since = payload.since + + if not since: + state_entries = get_state("atocore") + for entry in state_entries: + if entry.category == "status" and entry.key == "last_extract_batch_run": + since = entry.value + break + + interactions = list_interactions(since=since, limit=min(payload.limit, 200)) + + processed = 0 + total_candidates = 0 + total_persisted = 0 + errors: list[dict] = [] + + for interaction in interactions: + if not (interaction.response or interaction.response_summary): + continue + try: + if payload.mode == "llm": + candidates = extract_candidates_llm(interaction) + else: + candidates = extract_candidates_from_interaction(interaction) + except Exception as exc: + errors.append({"interaction_id": interaction.id, "error": str(exc)}) + continue + + processed += 1 + total_candidates += len(candidates) + + if payload.persist and candidates: + for candidate in candidates: + try: + create_memory( + memory_type=candidate.memory_type, + content=candidate.content, + project=candidate.project, + confidence=candidate.confidence, + status="candidate", + ) + total_persisted += 1 + except ValueError: + pass # duplicate — skip silently + + from datetime import datetime, timezone + + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + try: + set_state( + project="atocore", + category="status", + key="last_extract_batch_run", + value=now, + source="admin/extract-batch endpoint", + ) + except Exception: + pass # best-effort timestamp tracking + + log.info( + "extract_batch_complete", + mode=payload.mode, + processed=processed, + total_candidates=total_candidates, + total_persisted=total_persisted, + errors=len(errors), + ) + + return { + "processed": processed, + "total_candidates": total_candidates, + "total_persisted": total_persisted, + "mode": payload.mode, + "persist": payload.persist, + "since": since or "(first run)", + "errors": errors, + } + + @router.get("/admin/backup/{stamp}/validate") def api_validate_backup(stamp: str) -> dict: """Validate that a previously created backup is structurally usable."""