Files
ATOCore/src/atocore/api/routes.py
Anto01 ea3fed3d44 feat(phase9-A): interaction capture loop foundation
Phase 9 Commit A from the agreed plan: turn AtoCore from a stateless
context enhancer into a system that records what it actually fed to an
LLM and what came back. This is the audit trail Reflection (Commit B)
and Extraction (Commit C) will be layered on top of.

The interactions table existed in the schema since the original PoC
but nothing wrote to it. This change makes it real:

Schema migration (additive only):
- response                full LLM response (caller decides how much)
- memories_used           JSON list of memory ids in the context pack
- chunks_used             JSON list of chunk ids in the context pack
- client                  identifier of the calling system
                          (openclaw, claude-code, manual, ...)
- session_id              groups multi-turn conversations
- project                 project name (mirrors the memory module pattern,
                          no FK so capture stays cheap)
- indexes on session_id, project, created_at

The created_at column is now written explicitly with a SQLite-compatible
'YYYY-MM-DD HH:MM:SS' format so the same string lives in the DB and the
returned dataclass. Without this the `since` filter on list_interactions
would silently fail because CURRENT_TIMESTAMP and isoformat use different
shapes that do not compare cleanly as strings.

New module src/atocore/interactions/:
- Interaction dataclass
- record_interaction()    persists one round-trip (prompt required;
                          everything else optional). Refuses empty prompts.
- list_interactions()     filters by project / session_id / client / since,
                          newest-first, hard-capped at 500
- get_interaction()       fetch by id, full response + context pack

API endpoints:
- POST   /interactions                capture one interaction
- GET    /interactions                list with summaries (no full response)
- GET    /interactions/{id}           full record incl. response + pack

Trust model:
- Capture is read-only with respect to memories, project state, and
  source chunks. Nothing here promotes anything into trusted state.
- The audit trail becomes the dataset Commit B (reinforcement) and
  Commit C (extraction + review queue) will operate on.

Tests (13 new, all green):
- service: persist + roundtrip every field
- service: minimum-fields path (prompt only)
- service: empty / whitespace prompt rejected
- service: get by id returns None for missing
- service: filter by project, session, client
- service: ordering newest-first with limit
- service: since filter inclusive on cutoff (the bug the timestamp
  fix above caught)
- service: limit=0 returns empty
- API: POST records and round-trips through GET /interactions/{id}
- API: empty prompt returns 400
- API: missing id returns 404
- API: list filter returns summaries (not full response bodies)

Full suite: 118 passing (was 105).

master-plan-status.md updated to move Phase 9 from "not started" to
"started" with the explicit note that Commit A is in and Commits B/C
remain.
2026-04-06 19:31:43 -04:00

645 lines
19 KiB
Python

"""FastAPI route definitions."""
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import atocore.config as _config
from atocore.context.builder import (
build_context,
get_last_context_pack,
_pack_to_dict,
)
from atocore.context.project_state import (
CATEGORIES,
get_state,
invalidate_state,
set_state,
)
from atocore.ingestion.pipeline import (
exclusive_ingestion,
get_ingestion_stats,
get_source_status,
ingest_configured_sources,
ingest_file,
ingest_folder,
)
from atocore.interactions.service import (
get_interaction,
list_interactions,
record_interaction,
)
from atocore.memory.service import (
MEMORY_TYPES,
create_memory,
get_memories,
invalidate_memory,
supersede_memory,
update_memory,
)
from atocore.observability.logger import get_logger
from atocore.ops.backup import (
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
from atocore.projects.registry import (
build_project_registration_proposal,
get_project_registry_template,
list_registered_projects,
register_project,
refresh_registered_project,
update_project,
)
from atocore.retrieval.retriever import retrieve
from atocore.retrieval.vector_store import get_vector_store
router = APIRouter()
log = get_logger("api")
# --- Request/Response models ---
class IngestRequest(BaseModel):
path: str
class IngestResponse(BaseModel):
results: list[dict]
class IngestSourcesResponse(BaseModel):
results: list[dict]
class ProjectRefreshResponse(BaseModel):
project: str
aliases: list[str]
description: str
purge_deleted: bool
status: str
roots_ingested: int
roots_skipped: int
roots: list[dict]
class ProjectRegistrationProposalRequest(BaseModel):
project_id: str
aliases: list[str] = []
description: str = ""
ingest_roots: list[dict]
class ProjectUpdateRequest(BaseModel):
aliases: list[str] | None = None
description: str | None = None
ingest_roots: list[dict] | None = None
class QueryRequest(BaseModel):
prompt: str
top_k: int = 10
filter_tags: list[str] | None = None
project: str | None = None
class QueryResponse(BaseModel):
results: list[dict]
class ContextBuildRequest(BaseModel):
prompt: str
project: str | None = None
budget: int | None = None
class ContextBuildResponse(BaseModel):
formatted_context: str
full_prompt: str
chunks_used: int
total_chars: int
budget: int
budget_remaining: int
duration_ms: int
chunks: list[dict]
class MemoryCreateRequest(BaseModel):
memory_type: str
content: str
project: str = ""
confidence: float = 1.0
class MemoryUpdateRequest(BaseModel):
content: str | None = None
confidence: float | None = None
status: str | None = None
class ProjectStateSetRequest(BaseModel):
project: str
category: str
key: str
value: str
source: str = ""
confidence: float = 1.0
class ProjectStateGetRequest(BaseModel):
project: str
category: str | None = None
class ProjectStateInvalidateRequest(BaseModel):
project: str
category: str
key: str
# --- Endpoints ---
@router.post("/ingest", response_model=IngestResponse)
def api_ingest(req: IngestRequest) -> IngestResponse:
"""Ingest a markdown file or folder."""
target = Path(req.path)
try:
with exclusive_ingestion():
if target.is_file():
results = [ingest_file(target)]
elif target.is_dir():
results = ingest_folder(target)
else:
raise HTTPException(status_code=404, detail=f"Path not found: {req.path}")
except HTTPException:
raise
except Exception as e:
log.error("ingest_failed", path=req.path, error=str(e))
raise HTTPException(status_code=500, detail=f"Ingestion failed: {e}")
return IngestResponse(results=results)
@router.post("/ingest/sources", response_model=IngestSourcesResponse)
def api_ingest_sources() -> IngestSourcesResponse:
"""Ingest enabled configured source directories."""
try:
with exclusive_ingestion():
results = ingest_configured_sources()
except Exception as e:
log.error("ingest_sources_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Configured source ingestion failed: {e}")
return IngestSourcesResponse(results=results)
@router.get("/projects")
def api_projects() -> dict:
"""Return registered projects and their resolved ingest roots."""
return {
"projects": list_registered_projects(),
"registry_path": str(_config.settings.resolved_project_registry_path),
}
@router.get("/projects/template")
def api_projects_template() -> dict:
"""Return a starter template for project registry entries."""
return {
"template": get_project_registry_template(),
"registry_path": str(_config.settings.resolved_project_registry_path),
"allowed_sources": ["vault", "drive"],
}
@router.post("/projects/proposal")
def api_project_registration_proposal(req: ProjectRegistrationProposalRequest) -> dict:
"""Return a normalized project registration proposal without writing it."""
try:
return build_project_registration_proposal(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/projects/register")
def api_project_registration(req: ProjectRegistrationProposalRequest) -> dict:
"""Persist a validated project registration to the registry file."""
try:
return register_project(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.put("/projects/{project_name}")
def api_project_update(project_name: str, req: ProjectUpdateRequest) -> dict:
"""Update an existing project registration."""
try:
return update_project(
project_name=project_name,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
detail = str(e)
if detail.startswith("Unknown project"):
raise HTTPException(status_code=404, detail=detail)
raise HTTPException(status_code=400, detail=detail)
@router.post("/projects/{project_name}/refresh", response_model=ProjectRefreshResponse)
def api_refresh_project(project_name: str, purge_deleted: bool = False) -> ProjectRefreshResponse:
"""Refresh one registered project from its configured ingest roots."""
try:
with exclusive_ingestion():
result = refresh_registered_project(project_name, purge_deleted=purge_deleted)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
log.error("project_refresh_failed", project=project_name, error=str(e))
raise HTTPException(status_code=500, detail=f"Project refresh failed: {e}")
return ProjectRefreshResponse(**result)
@router.post("/query", response_model=QueryResponse)
def api_query(req: QueryRequest) -> QueryResponse:
"""Retrieve relevant chunks for a prompt."""
try:
chunks = retrieve(
req.prompt,
top_k=req.top_k,
filter_tags=req.filter_tags,
project_hint=req.project,
)
except Exception as e:
log.error("query_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Query failed: {e}")
return QueryResponse(
results=[
{
"chunk_id": c.chunk_id,
"content": c.content,
"score": c.score,
"heading_path": c.heading_path,
"source_file": c.source_file,
"title": c.title,
}
for c in chunks
]
)
@router.post("/context/build", response_model=ContextBuildResponse)
def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse:
"""Build a full context pack for a prompt."""
try:
pack = build_context(
user_prompt=req.prompt,
project_hint=req.project,
budget=req.budget,
)
except Exception as e:
log.error("context_build_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Context build failed: {e}")
pack_dict = _pack_to_dict(pack)
return ContextBuildResponse(
formatted_context=pack.formatted_context,
full_prompt=pack.full_prompt,
chunks_used=len(pack.chunks_used),
total_chars=pack.total_chars,
budget=pack.budget,
budget_remaining=pack.budget_remaining,
duration_ms=pack.duration_ms,
chunks=pack_dict["chunks"],
)
@router.post("/memory")
def api_create_memory(req: MemoryCreateRequest) -> dict:
"""Create a new memory entry."""
try:
mem = create_memory(
memory_type=req.memory_type,
content=req.content,
project=req.project,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type}
@router.get("/memory")
def api_get_memories(
memory_type: str | None = None,
project: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
) -> dict:
"""List memories, optionally filtered."""
memories = get_memories(
memory_type=memory_type,
project=project,
active_only=active_only,
min_confidence=min_confidence,
limit=limit,
)
return {
"memories": [
{
"id": m.id,
"memory_type": m.memory_type,
"content": m.content,
"project": m.project,
"confidence": m.confidence,
"status": m.status,
"updated_at": m.updated_at,
}
for m in memories
],
"types": MEMORY_TYPES,
}
@router.put("/memory/{memory_id}")
def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
"""Update an existing memory."""
try:
success = update_memory(
memory_id=memory_id,
content=req.content,
confidence=req.confidence,
status=req.status,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "updated", "id": memory_id}
@router.delete("/memory/{memory_id}")
def api_invalidate_memory(memory_id: str) -> dict:
"""Invalidate a memory (error correction)."""
success = invalidate_memory(memory_id)
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "invalidated", "id": memory_id}
@router.post("/project/state")
def api_set_project_state(req: ProjectStateSetRequest) -> dict:
"""Set or update a trusted project state entry."""
try:
entry = set_state(
project_name=req.project,
category=req.category,
key=req.key,
value=req.value,
source=req.source,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
log.error("set_state_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to set state: {e}")
return {"status": "ok", "id": entry.id, "category": entry.category, "key": entry.key}
@router.get("/project/state/{project_name}")
def api_get_project_state(project_name: str, category: str | None = None) -> dict:
"""Get trusted project state entries."""
entries = get_state(project_name, category=category)
return {
"project": project_name,
"entries": [
{
"id": e.id,
"category": e.category,
"key": e.key,
"value": e.value,
"source": e.source,
"confidence": e.confidence,
"status": e.status,
"updated_at": e.updated_at,
}
for e in entries
],
"categories": CATEGORIES,
}
@router.delete("/project/state")
def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict:
"""Invalidate (supersede) a project state entry."""
success = invalidate_state(req.project, req.category, req.key)
if not success:
raise HTTPException(status_code=404, detail="State entry not found or already invalidated")
return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key}
class InteractionRecordRequest(BaseModel):
prompt: str
response: str = ""
response_summary: str = ""
project: str = ""
client: str = ""
session_id: str = ""
memories_used: list[str] = []
chunks_used: list[str] = []
context_pack: dict | None = None
@router.post("/interactions")
def api_record_interaction(req: InteractionRecordRequest) -> dict:
"""Capture one interaction (prompt + response + what was used).
This is the foundation of the AtoCore reflection loop. It records
what the system fed to an LLM and what came back, but does not
promote anything into trusted state. Phase 9 Commit B/C will layer
reinforcement and extraction on top of this audit trail.
"""
try:
interaction = record_interaction(
prompt=req.prompt,
response=req.response,
response_summary=req.response_summary,
project=req.project,
client=req.client,
session_id=req.session_id,
memories_used=req.memories_used,
chunks_used=req.chunks_used,
context_pack=req.context_pack,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "recorded",
"id": interaction.id,
"created_at": interaction.created_at,
}
@router.get("/interactions")
def api_list_interactions(
project: str | None = None,
session_id: str | None = None,
client: str | None = None,
since: str | None = None,
limit: int = 50,
) -> dict:
"""List captured interactions, optionally filtered by project, session,
client, or creation time. Hard-capped at 500 entries per call."""
interactions = list_interactions(
project=project,
session_id=session_id,
client=client,
since=since,
limit=limit,
)
return {
"count": len(interactions),
"interactions": [
{
"id": i.id,
"prompt": i.prompt,
"response_summary": i.response_summary,
"response_chars": len(i.response),
"project": i.project,
"client": i.client,
"session_id": i.session_id,
"memories_used": i.memories_used,
"chunks_used": i.chunks_used,
"created_at": i.created_at,
}
for i in interactions
],
}
@router.get("/interactions/{interaction_id}")
def api_get_interaction(interaction_id: str) -> dict:
"""Fetch a single interaction with the full response and context pack."""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
return {
"id": interaction.id,
"prompt": interaction.prompt,
"response": interaction.response,
"response_summary": interaction.response_summary,
"project": interaction.project,
"client": interaction.client,
"session_id": interaction.session_id,
"memories_used": interaction.memories_used,
"chunks_used": interaction.chunks_used,
"context_pack": interaction.context_pack,
"created_at": interaction.created_at,
}
class BackupCreateRequest(BaseModel):
include_chroma: bool = False
@router.post("/admin/backup")
def api_create_backup(req: BackupCreateRequest | None = None) -> dict:
"""Create a runtime backup snapshot.
When ``include_chroma`` is true the call holds the ingestion lock so a
safe cold copy of the vector store can be taken without racing against
refresh or ingest endpoints.
"""
payload = req or BackupCreateRequest()
try:
if payload.include_chroma:
with exclusive_ingestion():
metadata = create_runtime_backup(include_chroma=True)
else:
metadata = create_runtime_backup(include_chroma=False)
except Exception as e:
log.error("admin_backup_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Backup failed: {e}")
return metadata
@router.get("/admin/backup")
def api_list_backups() -> dict:
"""List all runtime backups under the configured backup directory."""
return {
"backup_dir": str(_config.settings.resolved_backup_dir),
"backups": list_runtime_backups(),
}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""
result = validate_backup(stamp)
if not result.get("exists", False):
raise HTTPException(status_code=404, detail=f"Backup not found: {stamp}")
return result
@router.get("/health")
def api_health() -> dict:
"""Health check."""
store = get_vector_store()
source_status = get_source_status()
return {
"status": "ok",
"version": "0.1.0",
"vectors_count": store.count,
"env": _config.settings.env,
"machine_paths": {
"db_path": str(_config.settings.db_path),
"chroma_path": str(_config.settings.chroma_path),
"log_dir": str(_config.settings.resolved_log_dir),
"backup_dir": str(_config.settings.resolved_backup_dir),
"run_dir": str(_config.settings.resolved_run_dir),
},
"sources_ready": all(
(not source["enabled"]) or (source["exists"] and source["is_dir"])
for source in source_status
),
"source_status": source_status,
}
@router.get("/sources")
def api_sources() -> dict:
"""Return configured ingestion source directories and readiness."""
return {
"sources": get_source_status(),
"vault_enabled": _config.settings.source_vault_enabled,
"drive_enabled": _config.settings.source_drive_enabled,
}
@router.get("/stats")
def api_stats() -> dict:
"""Ingestion statistics."""
return get_ingestion_stats()
@router.get("/debug/context")
def api_debug_context() -> dict:
"""Inspect the last assembled context pack."""
pack = get_last_context_pack()
if pack is None:
return {"message": "No context pack built yet."}
return _pack_to_dict(pack)