Files
ATOCore/src/atocore/api/routes.py
Anto01 b9da5b6d84 phase9 first-real-use validation + small hygiene wins
Session 1 of the four-session plan. Empirically exercises the Phase 9
loop (capture -> reinforce -> extract) for the first time and lands
three small hygiene fixes.

Validation script + report
--------------------------
scripts/phase9_first_real_use.py — reproducible script that:
  - sets up an isolated SQLite + Chroma store under
    data/validation/phase9-first-use (gitignored)
  - seeds 3 active memories
  - runs 8 sample interactions through capture + reinforce + extract
  - prints what each step produced and reinforcement state at the end
  - supports --json output for downstream tooling

docs/phase9-first-real-use.md — narrative report of the run with:
  - extraction results table (8/8 expectations met exactly)
  - the empirical finding that REINFORCEMENT MATCHED ZERO seeds
    despite sample 5 clearly echoing the rebase preference memory
  - root cause analysis: the substring matcher is too brittle for
    natural paraphrases (e.g. "prefers" vs "I prefer", "history"
    vs "the history")
  - recommended fix: replace substring matcher with a token-overlap
    matcher (>=70% of memory tokens present in response, with
    light stemming and a small stop list)
  - explicit note that the fix is queued as a follow-up commit, not
    bundled into the report — keeps the audit trail clean

Key extraction results from the run:
  - all 7 heading/sentence rules fired correctly
  - 0 false positives on the prose-only sample (the most important
    sanity check)
  - long content preserved without truncation
  - dedup correctly kept three distinct cues from one interaction
  - project scoping flowed cleanly through the pipeline

Hygiene 1: FastAPI lifespan migration (src/atocore/main.py)
- Replaced @app.on_event("startup") with the modern @asynccontextmanager
  lifespan handler
- Same setup work (setup_logging, ensure_runtime_dirs, init_db,
  init_project_state_schema, startup_ready log)
- Removes the two on_event deprecation warnings from every test run
- Test suite now shows 1 warning instead of 3

Hygiene 2: EXTRACTOR_VERSION constant (src/atocore/memory/extractor.py)
- Added EXTRACTOR_VERSION = "0.1.0" with a versioned change log comment
- MemoryCandidate dataclass carries extractor_version on every candidate
- POST /interactions/{id}/extract response now includes extractor_version
  on both the top level (current run) and on each candidate
- Implements the versioning requirement called out in
  docs/architecture/promotion-rules.md so old candidates can be
  identified and re-evaluated when the rule set evolves

Hygiene 3: ~/.git-credentials cleanup (out-of-tree, not committed)
- Removed the dead OAUTH_USER:<jwt> line for dalidou:3000 that was
  being silently rewritten by the system credential manager on every
  push attempt
- Configured credential.http://dalidou:3000.helper with the empty-string
  sentinel pattern so the URL-specific helper chain is exactly
  ["", store] instead of inheriting the system-level "manager" helper
  that ships with Git for Windows
- Same fix for the 100.80.199.40 (Tailscale) entry
- Verified end to end: a fresh push using only the cleaned credentials
  file (no embedded URL) authenticates as Antoine and lands cleanly

Full suite: 160 passing (no change from previous), 1 warning
(was 3) thanks to the lifespan migration.
2026-04-07 06:16:35 -04:00

791 lines
24 KiB
Python

"""FastAPI route definitions."""
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import atocore.config as _config
from atocore.context.builder import (
build_context,
get_last_context_pack,
_pack_to_dict,
)
from atocore.context.project_state import (
CATEGORIES,
get_state,
invalidate_state,
set_state,
)
from atocore.ingestion.pipeline import (
exclusive_ingestion,
get_ingestion_stats,
get_source_status,
ingest_configured_sources,
ingest_file,
ingest_folder,
)
from atocore.interactions.service import (
get_interaction,
list_interactions,
record_interaction,
)
from atocore.memory.extractor import (
EXTRACTOR_VERSION,
MemoryCandidate,
extract_candidates_from_interaction,
)
from atocore.memory.reinforcement import reinforce_from_interaction
from atocore.memory.service import (
MEMORY_STATUSES,
MEMORY_TYPES,
create_memory,
get_memories,
invalidate_memory,
promote_memory,
reject_candidate_memory,
supersede_memory,
update_memory,
)
from atocore.observability.logger import get_logger
from atocore.ops.backup import (
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
from atocore.projects.registry import (
build_project_registration_proposal,
get_project_registry_template,
list_registered_projects,
register_project,
refresh_registered_project,
update_project,
)
from atocore.retrieval.retriever import retrieve
from atocore.retrieval.vector_store import get_vector_store
router = APIRouter()
log = get_logger("api")
# --- Request/Response models ---
class IngestRequest(BaseModel):
path: str
class IngestResponse(BaseModel):
results: list[dict]
class IngestSourcesResponse(BaseModel):
results: list[dict]
class ProjectRefreshResponse(BaseModel):
project: str
aliases: list[str]
description: str
purge_deleted: bool
status: str
roots_ingested: int
roots_skipped: int
roots: list[dict]
class ProjectRegistrationProposalRequest(BaseModel):
project_id: str
aliases: list[str] = []
description: str = ""
ingest_roots: list[dict]
class ProjectUpdateRequest(BaseModel):
aliases: list[str] | None = None
description: str | None = None
ingest_roots: list[dict] | None = None
class QueryRequest(BaseModel):
prompt: str
top_k: int = 10
filter_tags: list[str] | None = None
project: str | None = None
class QueryResponse(BaseModel):
results: list[dict]
class ContextBuildRequest(BaseModel):
prompt: str
project: str | None = None
budget: int | None = None
class ContextBuildResponse(BaseModel):
formatted_context: str
full_prompt: str
chunks_used: int
total_chars: int
budget: int
budget_remaining: int
duration_ms: int
chunks: list[dict]
class MemoryCreateRequest(BaseModel):
memory_type: str
content: str
project: str = ""
confidence: float = 1.0
class MemoryUpdateRequest(BaseModel):
content: str | None = None
confidence: float | None = None
status: str | None = None
class ProjectStateSetRequest(BaseModel):
project: str
category: str
key: str
value: str
source: str = ""
confidence: float = 1.0
class ProjectStateGetRequest(BaseModel):
project: str
category: str | None = None
class ProjectStateInvalidateRequest(BaseModel):
project: str
category: str
key: str
# --- Endpoints ---
@router.post("/ingest", response_model=IngestResponse)
def api_ingest(req: IngestRequest) -> IngestResponse:
"""Ingest a markdown file or folder."""
target = Path(req.path)
try:
with exclusive_ingestion():
if target.is_file():
results = [ingest_file(target)]
elif target.is_dir():
results = ingest_folder(target)
else:
raise HTTPException(status_code=404, detail=f"Path not found: {req.path}")
except HTTPException:
raise
except Exception as e:
log.error("ingest_failed", path=req.path, error=str(e))
raise HTTPException(status_code=500, detail=f"Ingestion failed: {e}")
return IngestResponse(results=results)
@router.post("/ingest/sources", response_model=IngestSourcesResponse)
def api_ingest_sources() -> IngestSourcesResponse:
"""Ingest enabled configured source directories."""
try:
with exclusive_ingestion():
results = ingest_configured_sources()
except Exception as e:
log.error("ingest_sources_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Configured source ingestion failed: {e}")
return IngestSourcesResponse(results=results)
@router.get("/projects")
def api_projects() -> dict:
"""Return registered projects and their resolved ingest roots."""
return {
"projects": list_registered_projects(),
"registry_path": str(_config.settings.resolved_project_registry_path),
}
@router.get("/projects/template")
def api_projects_template() -> dict:
"""Return a starter template for project registry entries."""
return {
"template": get_project_registry_template(),
"registry_path": str(_config.settings.resolved_project_registry_path),
"allowed_sources": ["vault", "drive"],
}
@router.post("/projects/proposal")
def api_project_registration_proposal(req: ProjectRegistrationProposalRequest) -> dict:
"""Return a normalized project registration proposal without writing it."""
try:
return build_project_registration_proposal(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/projects/register")
def api_project_registration(req: ProjectRegistrationProposalRequest) -> dict:
"""Persist a validated project registration to the registry file."""
try:
return register_project(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.put("/projects/{project_name}")
def api_project_update(project_name: str, req: ProjectUpdateRequest) -> dict:
"""Update an existing project registration."""
try:
return update_project(
project_name=project_name,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
detail = str(e)
if detail.startswith("Unknown project"):
raise HTTPException(status_code=404, detail=detail)
raise HTTPException(status_code=400, detail=detail)
@router.post("/projects/{project_name}/refresh", response_model=ProjectRefreshResponse)
def api_refresh_project(project_name: str, purge_deleted: bool = False) -> ProjectRefreshResponse:
"""Refresh one registered project from its configured ingest roots."""
try:
with exclusive_ingestion():
result = refresh_registered_project(project_name, purge_deleted=purge_deleted)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
log.error("project_refresh_failed", project=project_name, error=str(e))
raise HTTPException(status_code=500, detail=f"Project refresh failed: {e}")
return ProjectRefreshResponse(**result)
@router.post("/query", response_model=QueryResponse)
def api_query(req: QueryRequest) -> QueryResponse:
"""Retrieve relevant chunks for a prompt."""
try:
chunks = retrieve(
req.prompt,
top_k=req.top_k,
filter_tags=req.filter_tags,
project_hint=req.project,
)
except Exception as e:
log.error("query_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Query failed: {e}")
return QueryResponse(
results=[
{
"chunk_id": c.chunk_id,
"content": c.content,
"score": c.score,
"heading_path": c.heading_path,
"source_file": c.source_file,
"title": c.title,
}
for c in chunks
]
)
@router.post("/context/build", response_model=ContextBuildResponse)
def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse:
"""Build a full context pack for a prompt."""
try:
pack = build_context(
user_prompt=req.prompt,
project_hint=req.project,
budget=req.budget,
)
except Exception as e:
log.error("context_build_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Context build failed: {e}")
pack_dict = _pack_to_dict(pack)
return ContextBuildResponse(
formatted_context=pack.formatted_context,
full_prompt=pack.full_prompt,
chunks_used=len(pack.chunks_used),
total_chars=pack.total_chars,
budget=pack.budget,
budget_remaining=pack.budget_remaining,
duration_ms=pack.duration_ms,
chunks=pack_dict["chunks"],
)
@router.post("/memory")
def api_create_memory(req: MemoryCreateRequest) -> dict:
"""Create a new memory entry."""
try:
mem = create_memory(
memory_type=req.memory_type,
content=req.content,
project=req.project,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type}
@router.get("/memory")
def api_get_memories(
memory_type: str | None = None,
project: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
status: str | None = None,
) -> dict:
"""List memories, optionally filtered.
When ``status`` is given explicitly it overrides ``active_only`` so
the Phase 9 Commit C review queue can be listed via
``GET /memory?status=candidate``.
"""
try:
memories = get_memories(
memory_type=memory_type,
project=project,
active_only=active_only,
min_confidence=min_confidence,
limit=limit,
status=status,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"memories": [
{
"id": m.id,
"memory_type": m.memory_type,
"content": m.content,
"project": m.project,
"confidence": m.confidence,
"status": m.status,
"reference_count": m.reference_count,
"last_referenced_at": m.last_referenced_at,
"updated_at": m.updated_at,
}
for m in memories
],
"types": MEMORY_TYPES,
"statuses": MEMORY_STATUSES,
}
@router.put("/memory/{memory_id}")
def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
"""Update an existing memory."""
try:
success = update_memory(
memory_id=memory_id,
content=req.content,
confidence=req.confidence,
status=req.status,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "updated", "id": memory_id}
@router.delete("/memory/{memory_id}")
def api_invalidate_memory(memory_id: str) -> dict:
"""Invalidate a memory (error correction)."""
success = invalidate_memory(memory_id)
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "invalidated", "id": memory_id}
@router.post("/memory/{memory_id}/promote")
def api_promote_memory(memory_id: str) -> dict:
"""Promote a candidate memory to active (Phase 9 Commit C)."""
try:
success = promote_memory(memory_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not a candidate: {memory_id}",
)
return {"status": "promoted", "id": memory_id}
@router.post("/memory/{memory_id}/reject")
def api_reject_candidate_memory(memory_id: str) -> dict:
"""Reject a candidate memory (Phase 9 Commit C review queue)."""
success = reject_candidate_memory(memory_id)
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not a candidate: {memory_id}",
)
return {"status": "rejected", "id": memory_id}
@router.post("/project/state")
def api_set_project_state(req: ProjectStateSetRequest) -> dict:
"""Set or update a trusted project state entry."""
try:
entry = set_state(
project_name=req.project,
category=req.category,
key=req.key,
value=req.value,
source=req.source,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
log.error("set_state_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to set state: {e}")
return {"status": "ok", "id": entry.id, "category": entry.category, "key": entry.key}
@router.get("/project/state/{project_name}")
def api_get_project_state(project_name: str, category: str | None = None) -> dict:
"""Get trusted project state entries."""
entries = get_state(project_name, category=category)
return {
"project": project_name,
"entries": [
{
"id": e.id,
"category": e.category,
"key": e.key,
"value": e.value,
"source": e.source,
"confidence": e.confidence,
"status": e.status,
"updated_at": e.updated_at,
}
for e in entries
],
"categories": CATEGORIES,
}
@router.delete("/project/state")
def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict:
"""Invalidate (supersede) a project state entry."""
success = invalidate_state(req.project, req.category, req.key)
if not success:
raise HTTPException(status_code=404, detail="State entry not found or already invalidated")
return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key}
class InteractionRecordRequest(BaseModel):
prompt: str
response: str = ""
response_summary: str = ""
project: str = ""
client: str = ""
session_id: str = ""
memories_used: list[str] = []
chunks_used: list[str] = []
context_pack: dict | None = None
reinforce: bool = True
@router.post("/interactions")
def api_record_interaction(req: InteractionRecordRequest) -> dict:
"""Capture one interaction (prompt + response + what was used).
This is the foundation of the AtoCore reflection loop. It records
what the system fed to an LLM and what came back. If ``reinforce``
is true (default) and there is response content, the Phase 9
Commit B reinforcement pass runs automatically, bumping the
confidence of any active memory echoed in the response. Nothing is
ever promoted into trusted state automatically.
"""
try:
interaction = record_interaction(
prompt=req.prompt,
response=req.response,
response_summary=req.response_summary,
project=req.project,
client=req.client,
session_id=req.session_id,
memories_used=req.memories_used,
chunks_used=req.chunks_used,
context_pack=req.context_pack,
reinforce=req.reinforce,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "recorded",
"id": interaction.id,
"created_at": interaction.created_at,
}
@router.post("/interactions/{interaction_id}/reinforce")
def api_reinforce_interaction(interaction_id: str) -> dict:
"""Run the reinforcement pass on an already-captured interaction.
Useful for backfilling reinforcement over historical interactions,
or for retrying after a transient failure in the automatic pass
that runs inside ``POST /interactions``.
"""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
results = reinforce_from_interaction(interaction)
return {
"interaction_id": interaction_id,
"reinforced_count": len(results),
"reinforced": [
{
"memory_id": r.memory_id,
"memory_type": r.memory_type,
"old_confidence": round(r.old_confidence, 4),
"new_confidence": round(r.new_confidence, 4),
}
for r in results
],
}
class InteractionExtractRequest(BaseModel):
persist: bool = False
@router.post("/interactions/{interaction_id}/extract")
def api_extract_from_interaction(
interaction_id: str,
req: InteractionExtractRequest | None = None,
) -> dict:
"""Extract candidate memories from a captured interaction.
Phase 9 Commit C. The extractor is rule-based and deliberately
conservative — it only surfaces candidates that matched an explicit
structural cue (decision heading, preference sentence, etc.). By
default the candidates are returned *without* being persisted so a
caller can preview them before committing to a review queue. Pass
``persist: true`` to immediately create candidate memories for
each extraction result.
"""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
payload = req or InteractionExtractRequest()
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
persisted_ids: list[str] = []
if payload.persist:
for candidate in candidates:
try:
mem = create_memory(
memory_type=candidate.memory_type,
content=candidate.content,
project=candidate.project,
confidence=candidate.confidence,
status="candidate",
)
persisted_ids.append(mem.id)
except ValueError as e:
log.error(
"extract_persist_failed",
interaction_id=interaction_id,
rule=candidate.rule,
error=str(e),
)
return {
"interaction_id": interaction_id,
"candidate_count": len(candidates),
"persisted": payload.persist,
"persisted_ids": persisted_ids,
"extractor_version": EXTRACTOR_VERSION,
"candidates": [
{
"memory_type": c.memory_type,
"content": c.content,
"project": c.project,
"confidence": c.confidence,
"rule": c.rule,
"source_span": c.source_span,
"extractor_version": c.extractor_version,
}
for c in candidates
],
}
@router.get("/interactions")
def api_list_interactions(
project: str | None = None,
session_id: str | None = None,
client: str | None = None,
since: str | None = None,
limit: int = 50,
) -> dict:
"""List captured interactions, optionally filtered by project, session,
client, or creation time. Hard-capped at 500 entries per call."""
interactions = list_interactions(
project=project,
session_id=session_id,
client=client,
since=since,
limit=limit,
)
return {
"count": len(interactions),
"interactions": [
{
"id": i.id,
"prompt": i.prompt,
"response_summary": i.response_summary,
"response_chars": len(i.response),
"project": i.project,
"client": i.client,
"session_id": i.session_id,
"memories_used": i.memories_used,
"chunks_used": i.chunks_used,
"created_at": i.created_at,
}
for i in interactions
],
}
@router.get("/interactions/{interaction_id}")
def api_get_interaction(interaction_id: str) -> dict:
"""Fetch a single interaction with the full response and context pack."""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
return {
"id": interaction.id,
"prompt": interaction.prompt,
"response": interaction.response,
"response_summary": interaction.response_summary,
"project": interaction.project,
"client": interaction.client,
"session_id": interaction.session_id,
"memories_used": interaction.memories_used,
"chunks_used": interaction.chunks_used,
"context_pack": interaction.context_pack,
"created_at": interaction.created_at,
}
class BackupCreateRequest(BaseModel):
include_chroma: bool = False
@router.post("/admin/backup")
def api_create_backup(req: BackupCreateRequest | None = None) -> dict:
"""Create a runtime backup snapshot.
When ``include_chroma`` is true the call holds the ingestion lock so a
safe cold copy of the vector store can be taken without racing against
refresh or ingest endpoints.
"""
payload = req or BackupCreateRequest()
try:
if payload.include_chroma:
with exclusive_ingestion():
metadata = create_runtime_backup(include_chroma=True)
else:
metadata = create_runtime_backup(include_chroma=False)
except Exception as e:
log.error("admin_backup_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Backup failed: {e}")
return metadata
@router.get("/admin/backup")
def api_list_backups() -> dict:
"""List all runtime backups under the configured backup directory."""
return {
"backup_dir": str(_config.settings.resolved_backup_dir),
"backups": list_runtime_backups(),
}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""
result = validate_backup(stamp)
if not result.get("exists", False):
raise HTTPException(status_code=404, detail=f"Backup not found: {stamp}")
return result
@router.get("/health")
def api_health() -> dict:
"""Health check."""
store = get_vector_store()
source_status = get_source_status()
return {
"status": "ok",
"version": "0.1.0",
"vectors_count": store.count,
"env": _config.settings.env,
"machine_paths": {
"db_path": str(_config.settings.db_path),
"chroma_path": str(_config.settings.chroma_path),
"log_dir": str(_config.settings.resolved_log_dir),
"backup_dir": str(_config.settings.resolved_backup_dir),
"run_dir": str(_config.settings.resolved_run_dir),
},
"sources_ready": all(
(not source["enabled"]) or (source["exists"] and source["is_dir"])
for source in source_status
),
"source_status": source_status,
}
@router.get("/sources")
def api_sources() -> dict:
"""Return configured ingestion source directories and readiness."""
return {
"sources": get_source_status(),
"vault_enabled": _config.settings.source_vault_enabled,
"drive_enabled": _config.settings.source_drive_enabled,
}
@router.get("/stats")
def api_stats() -> dict:
"""Ingestion statistics."""
return get_ingestion_stats()
@router.get("/debug/context")
def api_debug_context() -> dict:
"""Inspect the last assembled context pack."""
pack = get_last_context_pack()
if pack is None:
return {"message": "No context pack built yet."}
return _pack_to_dict(pack)