"""FastAPI route definitions.""" from pathlib import Path from fastapi import APIRouter, HTTPException from pydantic import BaseModel import atocore.config as _config from atocore.context.builder import ( build_context, get_last_context_pack, _pack_to_dict, ) from atocore.context.project_state import ( CATEGORIES, get_state, invalidate_state, set_state, ) from atocore.ingestion.pipeline import ( exclusive_ingestion, get_ingestion_stats, get_source_status, ingest_configured_sources, ingest_file, ingest_folder, ) from atocore.interactions.service import ( get_interaction, list_interactions, record_interaction, ) from atocore.memory.extractor import ( EXTRACTOR_VERSION, MemoryCandidate, extract_candidates_from_interaction, ) from atocore.memory.extractor_llm import ( LLM_EXTRACTOR_VERSION, extract_candidates_llm, ) from atocore.memory.reinforcement import reinforce_from_interaction from atocore.memory.service import ( MEMORY_STATUSES, MEMORY_TYPES, create_memory, get_memories, invalidate_memory, promote_memory, reject_candidate_memory, supersede_memory, update_memory, ) from atocore.observability.logger import get_logger from atocore.ops.backup import ( cleanup_old_backups, create_runtime_backup, list_runtime_backups, validate_backup, ) from atocore.projects.registry import ( build_project_registration_proposal, get_project_registry_template, list_registered_projects, register_project, refresh_registered_project, update_project, ) from atocore.retrieval.retriever import retrieve from atocore.retrieval.vector_store import get_vector_store router = APIRouter() log = get_logger("api") # --- Request/Response models --- class IngestRequest(BaseModel): path: str class IngestResponse(BaseModel): results: list[dict] class IngestSourcesResponse(BaseModel): results: list[dict] class ProjectRefreshResponse(BaseModel): project: str aliases: list[str] description: str purge_deleted: bool status: str roots_ingested: int roots_skipped: int roots: list[dict] class ProjectRegistrationProposalRequest(BaseModel): project_id: str aliases: list[str] = [] description: str = "" ingest_roots: list[dict] class ProjectUpdateRequest(BaseModel): aliases: list[str] | None = None description: str | None = None ingest_roots: list[dict] | None = None class QueryRequest(BaseModel): prompt: str top_k: int = 10 filter_tags: list[str] | None = None project: str | None = None class QueryResponse(BaseModel): results: list[dict] class ContextBuildRequest(BaseModel): prompt: str project: str | None = None budget: int | None = None class ContextBuildResponse(BaseModel): formatted_context: str full_prompt: str chunks_used: int total_chars: int budget: int budget_remaining: int duration_ms: int chunks: list[dict] class MemoryCreateRequest(BaseModel): memory_type: str content: str project: str = "" confidence: float = 1.0 status: str = "active" class MemoryUpdateRequest(BaseModel): content: str | None = None confidence: float | None = None status: str | None = None class ProjectStateSetRequest(BaseModel): project: str category: str key: str value: str source: str = "" confidence: float = 1.0 class ProjectStateGetRequest(BaseModel): project: str category: str | None = None class ProjectStateInvalidateRequest(BaseModel): project: str category: str key: str # --- Endpoints --- @router.post("/ingest", response_model=IngestResponse) def api_ingest(req: IngestRequest) -> IngestResponse: """Ingest a markdown file or folder.""" target = Path(req.path) try: with exclusive_ingestion(): if target.is_file(): results = [ingest_file(target)] elif target.is_dir(): results = ingest_folder(target) else: raise HTTPException(status_code=404, detail=f"Path not found: {req.path}") except HTTPException: raise except Exception as e: log.error("ingest_failed", path=req.path, error=str(e)) raise HTTPException(status_code=500, detail=f"Ingestion failed: {e}") return IngestResponse(results=results) @router.post("/ingest/sources", response_model=IngestSourcesResponse) def api_ingest_sources() -> IngestSourcesResponse: """Ingest enabled configured source directories.""" try: with exclusive_ingestion(): results = ingest_configured_sources() except Exception as e: log.error("ingest_sources_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Configured source ingestion failed: {e}") return IngestSourcesResponse(results=results) @router.get("/projects") def api_projects() -> dict: """Return registered projects and their resolved ingest roots.""" return { "projects": list_registered_projects(), "registry_path": str(_config.settings.resolved_project_registry_path), } @router.get("/projects/template") def api_projects_template() -> dict: """Return a starter template for project registry entries.""" return { "template": get_project_registry_template(), "registry_path": str(_config.settings.resolved_project_registry_path), "allowed_sources": ["vault", "drive"], } @router.post("/projects/proposal") def api_project_registration_proposal(req: ProjectRegistrationProposalRequest) -> dict: """Return a normalized project registration proposal without writing it.""" try: return build_project_registration_proposal( project_id=req.project_id, aliases=req.aliases, description=req.description, ingest_roots=req.ingest_roots, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/projects/register") def api_project_registration(req: ProjectRegistrationProposalRequest) -> dict: """Persist a validated project registration to the registry file.""" try: return register_project( project_id=req.project_id, aliases=req.aliases, description=req.description, ingest_roots=req.ingest_roots, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.put("/projects/{project_name}") def api_project_update(project_name: str, req: ProjectUpdateRequest) -> dict: """Update an existing project registration.""" try: return update_project( project_name=project_name, aliases=req.aliases, description=req.description, ingest_roots=req.ingest_roots, ) except ValueError as e: detail = str(e) if detail.startswith("Unknown project"): raise HTTPException(status_code=404, detail=detail) raise HTTPException(status_code=400, detail=detail) @router.post("/projects/{project_name}/refresh", response_model=ProjectRefreshResponse) def api_refresh_project(project_name: str, purge_deleted: bool = False) -> ProjectRefreshResponse: """Refresh one registered project from its configured ingest roots.""" try: with exclusive_ingestion(): result = refresh_registered_project(project_name, purge_deleted=purge_deleted) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: log.error("project_refresh_failed", project=project_name, error=str(e)) raise HTTPException(status_code=500, detail=f"Project refresh failed: {e}") return ProjectRefreshResponse(**result) @router.post("/query", response_model=QueryResponse) def api_query(req: QueryRequest) -> QueryResponse: """Retrieve relevant chunks for a prompt.""" try: chunks = retrieve( req.prompt, top_k=req.top_k, filter_tags=req.filter_tags, project_hint=req.project, ) except Exception as e: log.error("query_failed", prompt=req.prompt[:100], error=str(e)) raise HTTPException(status_code=500, detail=f"Query failed: {e}") return QueryResponse( results=[ { "chunk_id": c.chunk_id, "content": c.content, "score": c.score, "heading_path": c.heading_path, "source_file": c.source_file, "title": c.title, } for c in chunks ] ) @router.post("/context/build", response_model=ContextBuildResponse) def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse: """Build a full context pack for a prompt.""" try: pack = build_context( user_prompt=req.prompt, project_hint=req.project, budget=req.budget, ) except Exception as e: log.error("context_build_failed", prompt=req.prompt[:100], error=str(e)) raise HTTPException(status_code=500, detail=f"Context build failed: {e}") pack_dict = _pack_to_dict(pack) return ContextBuildResponse( formatted_context=pack.formatted_context, full_prompt=pack.full_prompt, chunks_used=len(pack.chunks_used), total_chars=pack.total_chars, budget=pack.budget, budget_remaining=pack.budget_remaining, duration_ms=pack.duration_ms, chunks=pack_dict["chunks"], ) @router.post("/memory") def api_create_memory(req: MemoryCreateRequest) -> dict: """Create a new memory entry.""" try: mem = create_memory( memory_type=req.memory_type, content=req.content, project=req.project, confidence=req.confidence, status=req.status, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type} @router.get("/memory") def api_get_memories( memory_type: str | None = None, project: str | None = None, active_only: bool = True, min_confidence: float = 0.0, limit: int = 50, status: str | None = None, ) -> dict: """List memories, optionally filtered. When ``status`` is given explicitly it overrides ``active_only`` so the Phase 9 Commit C review queue can be listed via ``GET /memory?status=candidate``. """ try: memories = get_memories( memory_type=memory_type, project=project, active_only=active_only, min_confidence=min_confidence, limit=limit, status=status, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return { "memories": [ { "id": m.id, "memory_type": m.memory_type, "content": m.content, "project": m.project, "confidence": m.confidence, "status": m.status, "reference_count": m.reference_count, "last_referenced_at": m.last_referenced_at, "updated_at": m.updated_at, } for m in memories ], "types": MEMORY_TYPES, "statuses": MEMORY_STATUSES, } @router.put("/memory/{memory_id}") def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict: """Update an existing memory.""" try: success = update_memory( memory_id=memory_id, content=req.content, confidence=req.confidence, status=req.status, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not success: raise HTTPException(status_code=404, detail="Memory not found") return {"status": "updated", "id": memory_id} @router.delete("/memory/{memory_id}") def api_invalidate_memory(memory_id: str) -> dict: """Invalidate a memory (error correction).""" success = invalidate_memory(memory_id) if not success: raise HTTPException(status_code=404, detail="Memory not found") return {"status": "invalidated", "id": memory_id} @router.post("/memory/{memory_id}/promote") def api_promote_memory(memory_id: str) -> dict: """Promote a candidate memory to active (Phase 9 Commit C).""" try: success = promote_memory(memory_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not success: raise HTTPException( status_code=404, detail=f"Memory not found or not a candidate: {memory_id}", ) return {"status": "promoted", "id": memory_id} @router.post("/memory/{memory_id}/reject") def api_reject_candidate_memory(memory_id: str) -> dict: """Reject a candidate memory (Phase 9 Commit C review queue).""" success = reject_candidate_memory(memory_id) if not success: raise HTTPException( status_code=404, detail=f"Memory not found or not a candidate: {memory_id}", ) return {"status": "rejected", "id": memory_id} @router.post("/project/state") def api_set_project_state(req: ProjectStateSetRequest) -> dict: """Set or update a trusted project state entry.""" try: entry = set_state( project_name=req.project, category=req.category, key=req.key, value=req.value, source=req.source, confidence=req.confidence, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: log.error("set_state_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Failed to set state: {e}") return {"status": "ok", "id": entry.id, "category": entry.category, "key": entry.key} @router.get("/project/state/{project_name}") def api_get_project_state(project_name: str, category: str | None = None) -> dict: """Get trusted project state entries.""" entries = get_state(project_name, category=category) return { "project": project_name, "entries": [ { "id": e.id, "category": e.category, "key": e.key, "value": e.value, "source": e.source, "confidence": e.confidence, "status": e.status, "updated_at": e.updated_at, } for e in entries ], "categories": CATEGORIES, } @router.delete("/project/state") def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict: """Invalidate (supersede) a project state entry.""" success = invalidate_state(req.project, req.category, req.key) if not success: raise HTTPException(status_code=404, detail="State entry not found or already invalidated") return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key} class InteractionRecordRequest(BaseModel): prompt: str response: str = "" response_summary: str = "" project: str = "" client: str = "" session_id: str = "" memories_used: list[str] = [] chunks_used: list[str] = [] context_pack: dict | None = None reinforce: bool = True extract: bool = False @router.post("/interactions") def api_record_interaction(req: InteractionRecordRequest) -> dict: """Capture one interaction (prompt + response + what was used). This is the foundation of the AtoCore reflection loop. It records what the system fed to an LLM and what came back. If ``reinforce`` is true (default) and there is response content, the Phase 9 Commit B reinforcement pass runs automatically, bumping the confidence of any active memory echoed in the response. Nothing is ever promoted into trusted state automatically. """ try: interaction = record_interaction( prompt=req.prompt, response=req.response, response_summary=req.response_summary, project=req.project, client=req.client, session_id=req.session_id, memories_used=req.memories_used, chunks_used=req.chunks_used, context_pack=req.context_pack, reinforce=req.reinforce, extract=req.extract, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return { "status": "recorded", "id": interaction.id, "created_at": interaction.created_at, } @router.post("/interactions/{interaction_id}/reinforce") def api_reinforce_interaction(interaction_id: str) -> dict: """Run the reinforcement pass on an already-captured interaction. Useful for backfilling reinforcement over historical interactions, or for retrying after a transient failure in the automatic pass that runs inside ``POST /interactions``. """ interaction = get_interaction(interaction_id) if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") results = reinforce_from_interaction(interaction) return { "interaction_id": interaction_id, "reinforced_count": len(results), "reinforced": [ { "memory_id": r.memory_id, "memory_type": r.memory_type, "old_confidence": round(r.old_confidence, 4), "new_confidence": round(r.new_confidence, 4), } for r in results ], } class InteractionExtractRequest(BaseModel): persist: bool = False mode: str = "rule" # "rule" or "llm" @router.post("/interactions/{interaction_id}/extract") def api_extract_from_interaction( interaction_id: str, req: InteractionExtractRequest | None = None, ) -> dict: """Extract candidate memories from a captured interaction. Phase 9 Commit C. The extractor is rule-based and deliberately conservative — it only surfaces candidates that matched an explicit structural cue (decision heading, preference sentence, etc.). By default the candidates are returned *without* being persisted so a caller can preview them before committing to a review queue. Pass ``persist: true`` to immediately create candidate memories for each extraction result. """ interaction = get_interaction(interaction_id) if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") payload = req or InteractionExtractRequest() if payload.mode == "llm": candidates: list[MemoryCandidate] = extract_candidates_llm(interaction) else: candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction) persisted_ids: list[str] = [] if payload.persist: for candidate in candidates: try: mem = create_memory( memory_type=candidate.memory_type, content=candidate.content, project=candidate.project, confidence=candidate.confidence, status="candidate", ) persisted_ids.append(mem.id) except ValueError as e: log.error( "extract_persist_failed", interaction_id=interaction_id, rule=candidate.rule, error=str(e), ) return { "interaction_id": interaction_id, "candidate_count": len(candidates), "persisted": payload.persist, "persisted_ids": persisted_ids, "extractor_version": EXTRACTOR_VERSION, "candidates": [ { "memory_type": c.memory_type, "content": c.content, "project": c.project, "confidence": c.confidence, "rule": c.rule, "source_span": c.source_span, "extractor_version": c.extractor_version, } for c in candidates ], } @router.get("/interactions") def api_list_interactions( project: str | None = None, session_id: str | None = None, client: str | None = None, since: str | None = None, limit: int = 50, ) -> dict: """List captured interactions, optionally filtered by project, session, client, or creation time. Hard-capped at 500 entries per call.""" interactions = list_interactions( project=project, session_id=session_id, client=client, since=since, limit=limit, ) return { "count": len(interactions), "interactions": [ { "id": i.id, "prompt": i.prompt, "response_summary": i.response_summary, "response_chars": len(i.response), "project": i.project, "client": i.client, "session_id": i.session_id, "memories_used": i.memories_used, "chunks_used": i.chunks_used, "created_at": i.created_at, } for i in interactions ], } @router.get("/interactions/{interaction_id}") def api_get_interaction(interaction_id: str) -> dict: """Fetch a single interaction with the full response and context pack.""" interaction = get_interaction(interaction_id) if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") return { "id": interaction.id, "prompt": interaction.prompt, "response": interaction.response, "response_summary": interaction.response_summary, "project": interaction.project, "client": interaction.client, "session_id": interaction.session_id, "memories_used": interaction.memories_used, "chunks_used": interaction.chunks_used, "context_pack": interaction.context_pack, "created_at": interaction.created_at, } class BackupCreateRequest(BaseModel): include_chroma: bool = False @router.post("/admin/backup") def api_create_backup(req: BackupCreateRequest | None = None) -> dict: """Create a runtime backup snapshot. When ``include_chroma`` is true the call holds the ingestion lock so a safe cold copy of the vector store can be taken without racing against refresh or ingest endpoints. """ payload = req or BackupCreateRequest() try: if payload.include_chroma: with exclusive_ingestion(): metadata = create_runtime_backup(include_chroma=True) else: metadata = create_runtime_backup(include_chroma=False) except Exception as e: log.error("admin_backup_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Backup failed: {e}") return metadata @router.get("/admin/backup") def api_list_backups() -> dict: """List all runtime backups under the configured backup directory.""" return { "backup_dir": str(_config.settings.resolved_backup_dir), "backups": list_runtime_backups(), } class BackupCleanupRequest(BaseModel): confirm: bool = False @router.post("/admin/backup/cleanup") def api_cleanup_backups(req: BackupCleanupRequest | None = None) -> dict: """Apply retention policy to old backup snapshots. Dry-run by default. Pass ``confirm: true`` to actually delete. Retention: last 7 daily, last 4 weekly (Sundays), last 6 monthly (1st). """ payload = req or BackupCleanupRequest() try: return cleanup_old_backups(confirm=payload.confirm) except Exception as e: log.error("admin_cleanup_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Cleanup failed: {e}") class ExtractBatchRequest(BaseModel): since: str | None = None mode: str = "llm" limit: int = 50 persist: bool = True @router.post("/admin/extract-batch") def api_extract_batch(req: ExtractBatchRequest | None = None) -> dict: """Run batch extraction across recent interactions. Fetches interactions since ``since`` (or since the last recorded batch run), runs the extractor (rule or LLM) on each, and persists any candidates as ``status=candidate``. The last-run timestamp is stored in project state under ``atocore / status / last_extract_batch_run`` so subsequent calls without ``since`` automatically pick up where the last run left off. This endpoint is the operational home for R1 / R5 — it makes the LLM extractor accessible as an API operation rather than a script-only eval tool. Still NOT on the capture hot path: callers invoke this endpoint explicitly (cron, manual curl, CLI). """ payload = req or ExtractBatchRequest() since = payload.since if not since: state_entries = get_state("atocore") for entry in state_entries: if entry.category == "status" and entry.key == "last_extract_batch_run": since = entry.value break interactions = list_interactions(since=since, limit=min(payload.limit, 200)) processed = 0 total_candidates = 0 total_persisted = 0 errors: list[dict] = [] for interaction in interactions: if not (interaction.response or interaction.response_summary): continue try: if payload.mode == "llm": candidates = extract_candidates_llm(interaction) else: candidates = extract_candidates_from_interaction(interaction) except Exception as exc: errors.append({"interaction_id": interaction.id, "error": str(exc)}) continue processed += 1 total_candidates += len(candidates) if payload.persist and candidates: for candidate in candidates: try: create_memory( memory_type=candidate.memory_type, content=candidate.content, project=candidate.project, confidence=candidate.confidence, status="candidate", ) total_persisted += 1 except ValueError: pass # duplicate — skip silently from datetime import datetime, timezone now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") try: set_state( project="atocore", category="status", key="last_extract_batch_run", value=now, source="admin/extract-batch endpoint", ) except Exception: pass # best-effort timestamp tracking log.info( "extract_batch_complete", mode=payload.mode, processed=processed, total_candidates=total_candidates, total_persisted=total_persisted, errors=len(errors), ) return { "processed": processed, "total_candidates": total_candidates, "total_persisted": total_persisted, "mode": payload.mode, "persist": payload.persist, "since": since or "(first run)", "errors": errors, } @router.get("/admin/backup/{stamp}/validate") def api_validate_backup(stamp: str) -> dict: """Validate that a previously created backup is structurally usable.""" result = validate_backup(stamp) if not result.get("exists", False): raise HTTPException(status_code=404, detail=f"Backup not found: {stamp}") return result @router.get("/health") def api_health() -> dict: """Health check. Three layers of version reporting, in increasing precision: - ``version`` / ``code_version``: ``atocore.__version__`` (e.g. "0.2.0"). Bumped manually on commits that change the API surface, schema, or user-visible behavior. Coarse — any number of commits can land between bumps without changing this value. - ``build_sha``: full git SHA of the commit the running container was built from. Set by ``deploy/dalidou/deploy.sh`` via the ``ATOCORE_BUILD_SHA`` env var on every rebuild. Reports ``"unknown"`` for builds that bypass deploy.sh (direct ``docker compose up`` etc.). This is the precise drift signal: if the live ``build_sha`` doesn't match the tip of the deployed branch on Gitea, the service is stale regardless of what ``code_version`` says. - ``build_time`` / ``build_branch``: when and from which branch the live container was built. Useful for forensics when multiple branches are in flight or when build_sha is ambiguous (e.g. a force-push to the same SHA). The deploy.sh post-deploy verification step compares the live ``build_sha`` to the SHA it just set, and exits non-zero on mismatch. """ import os from atocore import __version__ store = get_vector_store() source_status = get_source_status() return { "status": "ok", "version": __version__, "code_version": __version__, "build_sha": os.environ.get("ATOCORE_BUILD_SHA", "unknown"), "build_time": os.environ.get("ATOCORE_BUILD_TIME", "unknown"), "build_branch": os.environ.get("ATOCORE_BUILD_BRANCH", "unknown"), "vectors_count": store.count, "env": _config.settings.env, "machine_paths": { "db_path": str(_config.settings.db_path), "chroma_path": str(_config.settings.chroma_path), "log_dir": str(_config.settings.resolved_log_dir), "backup_dir": str(_config.settings.resolved_backup_dir), "run_dir": str(_config.settings.resolved_run_dir), }, "sources_ready": all( (not source["enabled"]) or (source["exists"] and source["is_dir"]) for source in source_status ), "source_status": source_status, } @router.get("/sources") def api_sources() -> dict: """Return configured ingestion source directories and readiness.""" return { "sources": get_source_status(), "vault_enabled": _config.settings.source_vault_enabled, "drive_enabled": _config.settings.source_drive_enabled, } @router.get("/stats") def api_stats() -> dict: """Ingestion statistics.""" return get_ingestion_stats() @router.get("/debug/context") def api_debug_context() -> dict: """Inspect the last assembled context pack.""" pack = get_last_context_pack() if pack is None: return {"message": "No context pack built yet."} return _pack_to_dict(pack)