Files
ATOCore/src/atocore/api/routes.py

2758 lines
95 KiB
Python

"""FastAPI route definitions."""
from pathlib import Path
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi.responses import HTMLResponse, Response
from pydantic import BaseModel
import atocore.config as _config
from atocore.context.builder import (
build_context,
get_last_context_pack,
_pack_to_dict,
)
from atocore.context.project_state import (
CATEGORIES,
get_state,
invalidate_state,
set_state,
)
from atocore.ingestion.pipeline import (
exclusive_ingestion,
get_ingestion_stats,
get_source_status,
ingest_configured_sources,
ingest_file,
ingest_folder,
)
from atocore.interactions.service import (
get_interaction,
list_interactions,
record_interaction,
)
from atocore.engineering.mirror import generate_project_overview
from atocore.engineering.wiki import (
render_activity,
render_capture,
render_domain,
render_entity,
render_homepage,
render_memory_detail,
render_project,
render_search,
)
from atocore.engineering.service import (
ENTITY_TYPES,
RELATIONSHIP_TYPES,
create_entity,
create_relationship,
get_entities,
get_entity,
get_entity_with_context,
get_relationships,
)
from atocore.memory.extractor import (
EXTRACTOR_VERSION,
MemoryCandidate,
extract_candidates_from_interaction,
)
from atocore.memory.extractor_llm import (
LLM_EXTRACTOR_VERSION,
_cli_available as _llm_cli_available,
extract_candidates_llm,
)
from atocore.memory.reinforcement import reinforce_from_interaction
from atocore.memory.service import (
MEMORY_STATUSES,
MEMORY_TYPES,
create_memory,
get_memories,
invalidate_memory,
promote_memory,
reject_candidate_memory,
supersede_memory,
update_memory,
)
from atocore.observability.logger import get_logger
from atocore.ops.backup import (
cleanup_old_backups,
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
from atocore.projects.registry import (
build_project_registration_proposal,
get_project_registry_template,
list_registered_projects,
register_project,
refresh_registered_project,
update_project,
)
from atocore.retrieval.retriever import retrieve
from atocore.retrieval.vector_store import get_vector_store
router = APIRouter()
log = get_logger("api")
# --- Wiki routes (HTML, served first for clean URLs) ---
@router.get("/wiki", response_class=HTMLResponse)
def wiki_home() -> HTMLResponse:
return HTMLResponse(content=render_homepage())
@router.get("/wiki/projects/{project_name}", response_class=HTMLResponse)
def wiki_project(project_name: str) -> HTMLResponse:
from atocore.projects.registry import resolve_project_name as _resolve
return HTMLResponse(content=render_project(_resolve(project_name)))
@router.get("/wiki/entities/{entity_id}", response_class=HTMLResponse)
def wiki_entity(entity_id: str) -> HTMLResponse:
html = render_entity(entity_id)
if html is None:
raise HTTPException(status_code=404, detail="Entity not found")
return HTMLResponse(content=html)
@router.get("/wiki/search", response_class=HTMLResponse)
def wiki_search(q: str = "") -> HTMLResponse:
return HTMLResponse(content=render_search(q))
@router.get("/wiki/capture", response_class=HTMLResponse)
def wiki_capture() -> HTMLResponse:
"""Phase 7I follow-up: paste mobile/desktop chats into AtoCore."""
return HTMLResponse(content=render_capture())
@router.get("/wiki/new", response_class=HTMLResponse)
def wiki_new_entity(name: str = "", project: str = "") -> HTMLResponse:
"""Issue B: "create this entity" form pre-filled from a redlink."""
from atocore.engineering.wiki import render_new_entity_form
return HTMLResponse(content=render_new_entity_form(name=name, project=project))
@router.get("/wiki/memories/{memory_id}", response_class=HTMLResponse)
def wiki_memory(memory_id: str) -> HTMLResponse:
"""Phase 7E: memory detail with audit trail + neighbors."""
html = render_memory_detail(memory_id)
if html is None:
raise HTTPException(status_code=404, detail="Memory not found")
return HTMLResponse(content=html)
@router.get("/wiki/domains/{tag}", response_class=HTMLResponse)
def wiki_domain(tag: str) -> HTMLResponse:
"""Phase 7F: cross-project view for a domain tag."""
return HTMLResponse(content=render_domain(tag))
@router.get("/wiki/activity", response_class=HTMLResponse)
def wiki_activity(hours: int = 48, limit: int = 100) -> HTMLResponse:
"""Autonomous-activity timeline feed."""
return HTMLResponse(content=render_activity(hours=hours, limit=limit))
@router.get("/admin/triage", response_class=HTMLResponse)
def admin_triage(limit: int = 100) -> HTMLResponse:
"""Human triage UI for candidate memories.
Lists pending candidates with inline promote/reject/edit buttons.
Keyboard shortcuts: Y=promote, N=reject, E=edit content.
"""
from atocore.engineering.triage_ui import render_triage_page
return HTMLResponse(content=render_triage_page(limit=limit))
@router.post("/admin/triage/request-drain")
def admin_triage_request_drain() -> dict:
"""Request a host-side auto-triage run.
Writes a flag in project state. A host cron watcher picks it up
within ~2min and runs auto_triage.py, then clears the flag.
This is the bridge between "user clicked button in web UI" and
"claude CLI (on host, not in container) runs".
"""
from datetime import datetime as _dt, timezone as _tz
from atocore.context.project_state import set_state
now = _dt.now(_tz.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
set_state(
project_name="atocore",
category="config",
key="auto_triage_requested_at",
value=now,
source="admin ui",
)
return {"requested_at": now, "note": "Host watcher will pick this up within 2 minutes."}
@router.get("/admin/triage/drain-status")
def admin_triage_drain_status() -> dict:
"""Current state of the auto-triage pipeline (for UI polling)."""
from atocore.context.project_state import get_state
out = {
"requested_at": None,
"last_started_at": None,
"last_finished_at": None,
"last_result": None,
"is_running": False,
}
try:
for e in get_state("atocore"):
if e.category == "config" and e.key == "auto_triage_requested_at":
out["requested_at"] = e.value
elif e.category == "status" and e.key == "auto_triage_last_started_at":
out["last_started_at"] = e.value
elif e.category == "status" and e.key == "auto_triage_last_finished_at":
out["last_finished_at"] = e.value
elif e.category == "status" and e.key == "auto_triage_last_result":
out["last_result"] = e.value
elif e.category == "status" and e.key == "auto_triage_running":
out["is_running"] = (e.value == "1")
except Exception:
pass
return out
# --- Request/Response models ---
class IngestRequest(BaseModel):
path: str
class IngestResponse(BaseModel):
results: list[dict]
class IngestSourcesResponse(BaseModel):
results: list[dict]
class ProjectRefreshResponse(BaseModel):
project: str
aliases: list[str]
description: str
purge_deleted: bool
status: str
roots_ingested: int
roots_skipped: int
roots: list[dict]
class ProjectRegistrationProposalRequest(BaseModel):
project_id: str
aliases: list[str] = []
description: str = ""
ingest_roots: list[dict]
class ProjectUpdateRequest(BaseModel):
aliases: list[str] | None = None
description: str | None = None
ingest_roots: list[dict] | None = None
class QueryRequest(BaseModel):
prompt: str
top_k: int = 10
filter_tags: list[str] | None = None
project: str | None = None
class QueryResponse(BaseModel):
results: list[dict]
class ContextBuildRequest(BaseModel):
prompt: str
project: str | None = None
budget: int | None = None
class ContextBuildResponse(BaseModel):
formatted_context: str
full_prompt: str
chunks_used: int
total_chars: int
budget: int
budget_remaining: int
duration_ms: int
chunks: list[dict]
class MemoryCreateRequest(BaseModel):
memory_type: str
content: str
project: str = ""
confidence: float = 1.0
status: str = "active"
domain_tags: list[str] | None = None
valid_until: str = ""
class MemoryUpdateRequest(BaseModel):
content: str | None = None
confidence: float | None = None
status: str | None = None
memory_type: str | None = None
domain_tags: list[str] | None = None
valid_until: str | None = None
class ProjectStateSetRequest(BaseModel):
project: str
category: str
key: str
value: str
source: str = ""
confidence: float = 1.0
class ProjectStateGetRequest(BaseModel):
project: str
category: str | None = None
class ProjectStateInvalidateRequest(BaseModel):
project: str
category: str
key: str
# --- Endpoints ---
@router.post("/ingest", response_model=IngestResponse)
def api_ingest(req: IngestRequest) -> IngestResponse:
"""Ingest a markdown file or folder."""
target = Path(req.path)
try:
with exclusive_ingestion():
if target.is_file():
results = [ingest_file(target)]
elif target.is_dir():
results = ingest_folder(target)
else:
raise HTTPException(status_code=404, detail=f"Path not found: {req.path}")
except HTTPException:
raise
except Exception as e:
log.error("ingest_failed", path=req.path, error=str(e))
raise HTTPException(status_code=500, detail=f"Ingestion failed: {e}")
return IngestResponse(results=results)
@router.post("/ingest/sources", response_model=IngestSourcesResponse)
def api_ingest_sources() -> IngestSourcesResponse:
"""Ingest enabled configured source directories."""
try:
with exclusive_ingestion():
results = ingest_configured_sources()
except Exception as e:
log.error("ingest_sources_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Configured source ingestion failed: {e}")
return IngestSourcesResponse(results=results)
@router.get("/projects")
def api_projects() -> dict:
"""Return registered projects and their resolved ingest roots."""
return {
"projects": list_registered_projects(),
"registry_path": str(_config.settings.resolved_project_registry_path),
}
@router.get("/projects/template")
def api_projects_template() -> dict:
"""Return a starter template for project registry entries."""
return {
"template": get_project_registry_template(),
"registry_path": str(_config.settings.resolved_project_registry_path),
"allowed_sources": ["vault", "drive"],
}
@router.post("/projects/proposal")
def api_project_registration_proposal(req: ProjectRegistrationProposalRequest) -> dict:
"""Return a normalized project registration proposal without writing it."""
try:
return build_project_registration_proposal(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/projects/register")
def api_project_registration(req: ProjectRegistrationProposalRequest) -> dict:
"""Persist a validated project registration to the registry file."""
try:
return register_project(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
class RegisterEmergingRequest(BaseModel):
project_id: str
description: str = ""
aliases: list[str] | None = None
@router.post("/admin/projects/register-emerging")
def api_register_emerging_project(req: RegisterEmergingRequest) -> dict:
"""Phase 6 C.2 — one-click register a detected emerging project.
Fills in sensible defaults so the user doesn't have to think about
paths: ingest_roots defaults to vault:incoming/projects/<project_id>/
(will be empty until the user creates content there, which is fine).
Delegates to the existing register_project() for validation + file
write. Clears the project from the unregistered_projects proposal
list so it stops appearing in the dashboard.
"""
import json as _json
pid = (req.project_id or "").strip().lower()
if not pid:
raise HTTPException(status_code=400, detail="project_id is required")
aliases = req.aliases or []
description = req.description or f"Emerging project registered from dashboard: {pid}"
ingest_roots = [{
"source": "vault",
"subpath": f"incoming/projects/{pid}/",
"label": pid,
}]
try:
result = register_project(
project_id=pid,
aliases=aliases,
description=description,
ingest_roots=ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Clear from proposals so dashboard doesn't keep showing it
try:
from atocore.context.project_state import get_state, set_state
for e in get_state("atocore"):
if e.category == "proposals" and e.key == "unregistered_projects":
try:
current = _json.loads(e.value)
except Exception:
current = []
filtered = [p for p in current if p.get("project") != pid]
set_state(
project_name="atocore",
category="proposals",
key="unregistered_projects",
value=_json.dumps(filtered),
source="register-emerging",
)
break
except Exception:
pass # non-fatal
result["message"] = f"Project {pid!r} registered. Now has a wiki page, system map, and killer queries."
return result
@router.put("/projects/{project_name}")
def api_project_update(project_name: str, req: ProjectUpdateRequest) -> dict:
"""Update an existing project registration."""
try:
return update_project(
project_name=project_name,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
detail = str(e)
if detail.startswith("Unknown project"):
raise HTTPException(status_code=404, detail=detail)
raise HTTPException(status_code=400, detail=detail)
@router.post("/projects/{project_name}/refresh", response_model=ProjectRefreshResponse)
def api_refresh_project(project_name: str, purge_deleted: bool = False) -> ProjectRefreshResponse:
"""Refresh one registered project from its configured ingest roots."""
try:
with exclusive_ingestion():
result = refresh_registered_project(project_name, purge_deleted=purge_deleted)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
log.error("project_refresh_failed", project=project_name, error=str(e))
raise HTTPException(status_code=500, detail=f"Project refresh failed: {e}")
return ProjectRefreshResponse(**result)
@router.post("/query", response_model=QueryResponse)
def api_query(req: QueryRequest) -> QueryResponse:
"""Retrieve relevant chunks for a prompt."""
try:
chunks = retrieve(
req.prompt,
top_k=req.top_k,
filter_tags=req.filter_tags,
project_hint=req.project,
)
except Exception as e:
log.error("query_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Query failed: {e}")
return QueryResponse(
results=[
{
"chunk_id": c.chunk_id,
"content": c.content,
"score": c.score,
"heading_path": c.heading_path,
"source_file": c.source_file,
"title": c.title,
}
for c in chunks
]
)
@router.post("/context/build", response_model=ContextBuildResponse)
def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse:
"""Build a full context pack for a prompt."""
try:
pack = build_context(
user_prompt=req.prompt,
project_hint=req.project,
budget=req.budget,
)
except Exception as e:
log.error("context_build_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Context build failed: {e}")
pack_dict = _pack_to_dict(pack)
return ContextBuildResponse(
formatted_context=pack.formatted_context,
full_prompt=pack.full_prompt,
chunks_used=len(pack.chunks_used),
total_chars=pack.total_chars,
budget=pack.budget,
budget_remaining=pack.budget_remaining,
duration_ms=pack.duration_ms,
chunks=pack_dict["chunks"],
)
@router.post("/memory")
def api_create_memory(req: MemoryCreateRequest) -> dict:
"""Create a new memory entry."""
try:
mem = create_memory(
memory_type=req.memory_type,
content=req.content,
project=req.project,
confidence=req.confidence,
status=req.status,
domain_tags=req.domain_tags or [],
valid_until=req.valid_until or "",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type}
@router.get("/memory")
def api_get_memories(
memory_type: str | None = None,
project: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
status: str | None = None,
) -> dict:
"""List memories, optionally filtered.
When ``status`` is given explicitly it overrides ``active_only`` so
the Phase 9 Commit C review queue can be listed via
``GET /memory?status=candidate``.
"""
try:
memories = get_memories(
memory_type=memory_type,
project=project,
active_only=active_only,
min_confidence=min_confidence,
limit=limit,
status=status,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"memories": [
{
"id": m.id,
"memory_type": m.memory_type,
"content": m.content,
"project": m.project,
"confidence": m.confidence,
"status": m.status,
"reference_count": m.reference_count,
"last_referenced_at": m.last_referenced_at,
"updated_at": m.updated_at,
"created_at": m.created_at,
"domain_tags": m.domain_tags or [],
"valid_until": m.valid_until or "",
}
for m in memories
],
"types": MEMORY_TYPES,
"statuses": MEMORY_STATUSES,
}
@router.put("/memory/{memory_id}")
def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
"""Update an existing memory."""
try:
success = update_memory(
memory_id=memory_id,
content=req.content,
confidence=req.confidence,
status=req.status,
memory_type=req.memory_type,
domain_tags=req.domain_tags,
valid_until=req.valid_until,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "updated", "id": memory_id}
@router.delete("/memory/{memory_id}")
def api_invalidate_memory(memory_id: str) -> dict:
"""Invalidate a memory (error correction)."""
success = invalidate_memory(memory_id, actor="api-http")
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "invalidated", "id": memory_id}
@router.get("/memory/{memory_id}/audit")
def api_memory_audit(memory_id: str, limit: int = 100) -> dict:
"""Return the audit history for a specific memory (newest first)."""
from atocore.memory.service import get_memory_audit
entries = get_memory_audit(memory_id, limit=limit)
return {"memory_id": memory_id, "entries": entries, "count": len(entries)}
@router.get("/admin/audit/recent")
def api_recent_audit(limit: int = 50) -> dict:
"""Return recent memory_audit entries across all memories (newest first)."""
from atocore.memory.service import get_recent_audit
entries = get_recent_audit(limit=limit)
return {"entries": entries, "count": len(entries)}
@router.post("/admin/integrity-check")
def api_integrity_check(persist: bool = True) -> dict:
"""Run the integrity scan inside the container (where DB + deps live).
Returns findings and persists them to project state when persist=True.
"""
from atocore.models.database import get_connection
from atocore.context.project_state import set_state
import json as _json
findings = {
"orphan_chunk_refs": 0, "duplicate_active": 0,
"orphan_project_state": 0, "orphan_chunks": 0,
"memory_count": 0, "active_memory_count": 0,
}
details: list[str] = []
with get_connection() as conn:
r = conn.execute(
"SELECT COUNT(*) FROM memories m "
"WHERE m.source_chunk_id IS NOT NULL AND m.source_chunk_id != '' "
"AND NOT EXISTS (SELECT 1 FROM source_chunks c WHERE c.id = m.source_chunk_id)"
).fetchone()
findings["orphan_chunk_refs"] = int(r[0] or 0)
if findings["orphan_chunk_refs"]:
details.append(f"{findings['orphan_chunk_refs']} memory(ies) reference missing source_chunk_id")
dup_rows = conn.execute(
"SELECT memory_type, project, content, COUNT(*) AS n FROM memories "
"WHERE status = 'active' GROUP BY memory_type, project, content HAVING n > 1"
).fetchall()
findings["duplicate_active"] = sum(int(r[3]) - 1 for r in dup_rows)
if findings["duplicate_active"]:
details.append(f"{findings['duplicate_active']} duplicate active row(s) across {len(dup_rows)} group(s)")
r = conn.execute(
"SELECT COUNT(*) FROM project_state ps "
"WHERE NOT EXISTS (SELECT 1 FROM projects p WHERE p.id = ps.project_id)"
).fetchone()
findings["orphan_project_state"] = int(r[0] or 0)
if findings["orphan_project_state"]:
details.append(f"{findings['orphan_project_state']} project_state row(s) reference missing project")
r = conn.execute(
"SELECT COUNT(*) FROM source_chunks c "
"WHERE NOT EXISTS (SELECT 1 FROM source_documents d WHERE d.id = c.document_id)"
).fetchone()
findings["orphan_chunks"] = int(r[0] or 0)
if findings["orphan_chunks"]:
details.append(f"{findings['orphan_chunks']} chunk(s) have no parent document")
findings["memory_count"] = int(conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0])
findings["active_memory_count"] = int(
conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0]
)
result = {"findings": findings, "details": details, "ok": not details}
if persist:
try:
set_state(
project_name="atocore", category="status",
key="integrity_check_result",
value=_json.dumps(result),
source="integrity check endpoint",
)
except Exception as e:
log.warning("integrity_check_state_write_failed", error=str(e))
if details:
try:
from atocore.observability.alerts import emit_alert
emit_alert(
severity="warning",
title="Integrity drift detected",
message="; ".join(details),
context={k: v for k, v in findings.items() if not k.endswith("_count")},
)
except Exception:
pass
return result
@router.post("/memory/{memory_id}/promote")
def api_promote_memory(memory_id: str) -> dict:
"""Promote a candidate memory to active (Phase 9 Commit C)."""
try:
success = promote_memory(memory_id, actor="api-http")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not a candidate: {memory_id}",
)
return {"status": "promoted", "id": memory_id}
@router.post("/memory/{memory_id}/reject")
def api_reject_candidate_memory(memory_id: str) -> dict:
"""Reject a candidate memory (Phase 9 Commit C review queue)."""
success = reject_candidate_memory(memory_id, actor="api-http")
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not a candidate: {memory_id}",
)
return {"status": "rejected", "id": memory_id}
class MemoryInvalidateRequest(BaseModel):
reason: str = ""
class MemorySupersedeRequest(BaseModel):
reason: str = ""
@router.post("/memory/{memory_id}/invalidate")
def api_invalidate_memory(
memory_id: str,
req: MemoryInvalidateRequest | None = None,
) -> dict:
"""Retract an active memory (Issue E — active → invalid)."""
from atocore.memory.service import get_memories as _get_memories, invalidate_memory
reason = req.reason if req else ""
# Quick existence/status check for a clean 404 vs 409.
existing = [
m for m in _get_memories(status="active", limit=1)
if m.id == memory_id
]
if not existing:
# Fall through to generic not-active if the id exists in another status.
all_match = [
m for m in _get_memories(status="candidate", limit=5000)
+ _get_memories(status="invalid", limit=5000)
+ _get_memories(status="superseded", limit=5000)
if m.id == memory_id
]
if all_match:
if all_match[0].status == "invalid":
return {"status": "already_invalid", "id": memory_id}
raise HTTPException(
status_code=409,
detail=(
f"Memory {memory_id} is {all_match[0].status}; "
"use /reject for candidates"
),
)
raise HTTPException(status_code=404, detail=f"Memory not found: {memory_id}")
success = invalidate_memory(memory_id, actor="api-http", reason=reason)
if not success:
raise HTTPException(status_code=409, detail=f"Memory {memory_id} could not be invalidated")
return {"status": "invalidated", "id": memory_id}
@router.post("/memory/{memory_id}/supersede")
def api_supersede_memory(
memory_id: str,
req: MemorySupersedeRequest | None = None,
) -> dict:
"""Supersede an active memory (Issue E — active → superseded)."""
from atocore.memory.service import supersede_memory
reason = req.reason if req else ""
success = supersede_memory(memory_id, actor="api-http", reason=reason)
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not active: {memory_id}",
)
return {"status": "superseded", "id": memory_id}
@router.post("/project/state")
def api_set_project_state(req: ProjectStateSetRequest) -> dict:
"""Set or update a trusted project state entry."""
try:
entry = set_state(
project_name=req.project,
category=req.category,
key=req.key,
value=req.value,
source=req.source,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
log.error("set_state_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to set state: {e}")
return {"status": "ok", "id": entry.id, "category": entry.category, "key": entry.key}
@router.get("/project/state/{project_name}")
def api_get_project_state(project_name: str, category: str | None = None) -> dict:
"""Get trusted project state entries."""
entries = get_state(project_name, category=category)
return {
"project": project_name,
"entries": [
{
"id": e.id,
"category": e.category,
"key": e.key,
"value": e.value,
"source": e.source,
"confidence": e.confidence,
"status": e.status,
"updated_at": e.updated_at,
}
for e in entries
],
"categories": CATEGORIES,
}
@router.delete("/project/state")
def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict:
"""Invalidate (supersede) a project state entry."""
success = invalidate_state(req.project, req.category, req.key)
if not success:
raise HTTPException(status_code=404, detail="State entry not found or already invalidated")
return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key}
class InteractionRecordRequest(BaseModel):
prompt: str
response: str = ""
response_summary: str = ""
project: str = ""
client: str = ""
session_id: str = ""
memories_used: list[str] = []
chunks_used: list[str] = []
context_pack: dict | None = None
reinforce: bool = True
extract: bool = False
@router.post("/interactions")
def api_record_interaction(req: InteractionRecordRequest) -> dict:
"""Capture one interaction (prompt + response + what was used).
This is the foundation of the AtoCore reflection loop. It records
what the system fed to an LLM and what came back. If ``reinforce``
is true (default) and there is response content, the Phase 9
Commit B reinforcement pass runs automatically, bumping the
confidence of any active memory echoed in the response. Nothing is
ever promoted into trusted state automatically.
"""
try:
interaction = record_interaction(
prompt=req.prompt,
response=req.response,
response_summary=req.response_summary,
project=req.project,
client=req.client,
session_id=req.session_id,
memories_used=req.memories_used,
chunks_used=req.chunks_used,
context_pack=req.context_pack,
reinforce=req.reinforce,
extract=req.extract,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "recorded",
"id": interaction.id,
"created_at": interaction.created_at,
}
@router.post("/interactions/{interaction_id}/reinforce")
def api_reinforce_interaction(interaction_id: str) -> dict:
"""Run the reinforcement pass on an already-captured interaction.
Useful for backfilling reinforcement over historical interactions,
or for retrying after a transient failure in the automatic pass
that runs inside ``POST /interactions``.
"""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
results = reinforce_from_interaction(interaction)
return {
"interaction_id": interaction_id,
"reinforced_count": len(results),
"reinforced": [
{
"memory_id": r.memory_id,
"memory_type": r.memory_type,
"old_confidence": round(r.old_confidence, 4),
"new_confidence": round(r.new_confidence, 4),
}
for r in results
],
}
class InteractionExtractRequest(BaseModel):
persist: bool = False
mode: str = "rule" # "rule" or "llm"
@router.post("/interactions/{interaction_id}/extract")
def api_extract_from_interaction(
interaction_id: str,
req: InteractionExtractRequest | None = None,
) -> dict:
"""Extract candidate memories from a captured interaction.
Phase 9 Commit C. The extractor is rule-based and deliberately
conservative — it only surfaces candidates that matched an explicit
structural cue (decision heading, preference sentence, etc.). By
default the candidates are returned *without* being persisted so a
caller can preview them before committing to a review queue. Pass
``persist: true`` to immediately create candidate memories for
each extraction result.
"""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
payload = req or InteractionExtractRequest()
if payload.mode == "llm":
candidates: list[MemoryCandidate] = extract_candidates_llm(interaction)
else:
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
persisted_ids: list[str] = []
if payload.persist:
for candidate in candidates:
try:
mem = create_memory(
memory_type=candidate.memory_type,
content=candidate.content,
project=candidate.project,
confidence=candidate.confidence,
status="candidate",
)
persisted_ids.append(mem.id)
except ValueError as e:
log.error(
"extract_persist_failed",
interaction_id=interaction_id,
rule=candidate.rule,
error=str(e),
)
return {
"interaction_id": interaction_id,
"candidate_count": len(candidates),
"persisted": payload.persist,
"persisted_ids": persisted_ids,
"extractor_version": EXTRACTOR_VERSION,
"candidates": [
{
"memory_type": c.memory_type,
"content": c.content,
"project": c.project,
"confidence": c.confidence,
"rule": c.rule,
"source_span": c.source_span,
"extractor_version": c.extractor_version,
}
for c in candidates
],
}
@router.get("/interactions")
def api_list_interactions(
project: str | None = None,
session_id: str | None = None,
client: str | None = None,
since: str | None = None,
limit: int = 50,
) -> dict:
"""List captured interactions, optionally filtered by project, session,
client, or creation time. Hard-capped at 500 entries per call."""
interactions = list_interactions(
project=project,
session_id=session_id,
client=client,
since=since,
limit=limit,
)
return {
"count": len(interactions),
"interactions": [
{
"id": i.id,
"prompt": i.prompt,
"response_summary": i.response_summary,
"response_chars": len(i.response),
"project": i.project,
"client": i.client,
"session_id": i.session_id,
"memories_used": i.memories_used,
"chunks_used": i.chunks_used,
"created_at": i.created_at,
}
for i in interactions
],
}
@router.get("/interactions/{interaction_id}")
def api_get_interaction(interaction_id: str) -> dict:
"""Fetch a single interaction with the full response and context pack."""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
return {
"id": interaction.id,
"prompt": interaction.prompt,
"response": interaction.response,
"response_summary": interaction.response_summary,
"project": interaction.project,
"client": interaction.client,
"session_id": interaction.session_id,
"memories_used": interaction.memories_used,
"chunks_used": interaction.chunks_used,
"context_pack": interaction.context_pack,
"created_at": interaction.created_at,
}
class BackupCreateRequest(BaseModel):
include_chroma: bool = False
@router.post("/admin/backup")
def api_create_backup(req: BackupCreateRequest | None = None) -> dict:
"""Create a runtime backup snapshot.
When ``include_chroma`` is true the call holds the ingestion lock so a
safe cold copy of the vector store can be taken without racing against
refresh or ingest endpoints.
"""
payload = req or BackupCreateRequest()
try:
if payload.include_chroma:
with exclusive_ingestion():
metadata = create_runtime_backup(include_chroma=True)
else:
metadata = create_runtime_backup(include_chroma=False)
except Exception as e:
log.error("admin_backup_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Backup failed: {e}")
return metadata
@router.get("/admin/backup")
def api_list_backups() -> dict:
"""List all runtime backups under the configured backup directory."""
return {
"backup_dir": str(_config.settings.resolved_backup_dir),
"backups": list_runtime_backups(),
}
class BackupCleanupRequest(BaseModel):
confirm: bool = False
@router.post("/admin/backup/cleanup")
def api_cleanup_backups(req: BackupCleanupRequest | None = None) -> dict:
"""Apply retention policy to old backup snapshots.
Dry-run by default. Pass ``confirm: true`` to actually delete.
Retention: last 7 daily, last 4 weekly (Sundays), last 6 monthly (1st).
"""
payload = req or BackupCleanupRequest()
try:
return cleanup_old_backups(confirm=payload.confirm)
except Exception as e:
log.error("admin_cleanup_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Cleanup failed: {e}")
class ExtractBatchRequest(BaseModel):
since: str | None = None
mode: str = "llm"
limit: int = 50
persist: bool = True
@router.post("/admin/extract-batch")
def api_extract_batch(req: ExtractBatchRequest | None = None) -> dict:
"""Run batch extraction across recent interactions.
Fetches interactions since ``since`` (or since the last recorded
batch run), runs the extractor (rule or LLM) on each, and persists
any candidates as ``status=candidate``. The last-run timestamp is
stored in project state under ``atocore / status /
last_extract_batch_run`` so subsequent calls without ``since``
automatically pick up where the last run left off.
This endpoint is the operational home for R1 / R5 — it makes the
LLM extractor accessible as an API operation rather than a
script-only eval tool. Still NOT on the capture hot path: callers
invoke this endpoint explicitly (cron, manual curl, CLI).
"""
payload = req or ExtractBatchRequest()
if payload.mode == "llm" and not _llm_cli_available():
raise HTTPException(
status_code=503,
detail=(
"LLM extraction unavailable in this runtime: the `claude` CLI "
"is not on PATH. Run host-side via "
"`scripts/batch_llm_extract_live.py` instead, or call this "
"endpoint with mode=\"rule\"."
),
)
since = payload.since
if not since:
state_entries = get_state("atocore")
for entry in state_entries:
if entry.category == "status" and entry.key == "last_extract_batch_run":
since = entry.value
break
interactions = list_interactions(since=since, limit=min(payload.limit, 200))
processed = 0
total_candidates = 0
total_persisted = 0
errors: list[dict] = []
for interaction in interactions:
if not (interaction.response or interaction.response_summary):
continue
try:
if payload.mode == "llm":
candidates = extract_candidates_llm(interaction)
else:
candidates = extract_candidates_from_interaction(interaction)
except Exception as exc:
errors.append({"interaction_id": interaction.id, "error": str(exc)})
continue
processed += 1
total_candidates += len(candidates)
if payload.persist and candidates:
for candidate in candidates:
try:
create_memory(
memory_type=candidate.memory_type,
content=candidate.content,
project=candidate.project,
confidence=candidate.confidence,
status="candidate",
)
total_persisted += 1
except ValueError:
pass # duplicate — skip silently
from datetime import datetime, timezone
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
try:
set_state(
project="atocore",
category="status",
key="last_extract_batch_run",
value=now,
source="admin/extract-batch endpoint",
)
except Exception:
pass # best-effort timestamp tracking
log.info(
"extract_batch_complete",
mode=payload.mode,
processed=processed,
total_candidates=total_candidates,
total_persisted=total_persisted,
errors=len(errors),
)
return {
"processed": processed,
"total_candidates": total_candidates,
"total_persisted": total_persisted,
"mode": payload.mode,
"persist": payload.persist,
"since": since or "(first run)",
"errors": errors,
}
@router.get("/admin/dashboard")
def api_dashboard() -> dict:
"""One-shot system observability dashboard.
Returns memory counts by type/project/status, project state
entry counts, interaction volume by client, pipeline health
(harness, triage stats, last run), and extraction pipeline
status — everything an operator needs to understand AtoCore's
health beyond the basic /health endpoint.
"""
import json as _json
from collections import Counter
from datetime import datetime as _dt, timezone as _tz
all_memories = get_memories(active_only=False, limit=500)
active = [m for m in all_memories if m.status == "active"]
candidates = [m for m in all_memories if m.status == "candidate"]
type_counts = dict(Counter(m.memory_type for m in active))
project_counts = dict(Counter(m.project or "(none)" for m in active))
reinforced = [m for m in active if m.reference_count > 0]
# Interaction stats — total + by_client from DB directly
interaction_stats: dict = {"most_recent": None, "total": 0, "by_client": {}}
try:
from atocore.models.database import get_connection as _gc
with _gc() as conn:
row = conn.execute("SELECT count(*) FROM interactions").fetchone()
interaction_stats["total"] = row[0] if row else 0
rows = conn.execute(
"SELECT client, count(*) FROM interactions GROUP BY client"
).fetchall()
interaction_stats["by_client"] = {r[0]: r[1] for r in rows}
row = conn.execute(
"SELECT created_at FROM interactions ORDER BY created_at DESC LIMIT 1"
).fetchone()
interaction_stats["most_recent"] = row[0] if row else None
except Exception:
interactions = list_interactions(limit=1)
interaction_stats["most_recent"] = (
interactions[0].created_at if interactions else None
)
# Pipeline health from project state
pipeline: dict = {}
extract_state: dict = {}
integrity: dict = {}
alerts: dict = {}
try:
state_entries = get_state("atocore")
for entry in state_entries:
if entry.category == "status":
if entry.key == "last_extract_batch_run":
extract_state["last_run"] = entry.value
elif entry.key == "pipeline_last_run":
pipeline["last_run"] = entry.value
try:
last = _dt.fromisoformat(entry.value.replace("Z", "+00:00"))
delta = _dt.now(_tz.utc) - last
pipeline["hours_since_last_run"] = round(
delta.total_seconds() / 3600, 1
)
except Exception:
pass
elif entry.key == "pipeline_summary":
try:
pipeline["summary"] = _json.loads(entry.value)
except Exception:
pipeline["summary_raw"] = entry.value
elif entry.key == "retrieval_harness_result":
try:
pipeline["harness"] = _json.loads(entry.value)
except Exception:
pipeline["harness_raw"] = entry.value
elif entry.key == "integrity_check_result":
try:
integrity = _json.loads(entry.value)
except Exception:
pass
elif entry.category == "alert":
# keys like "last_info", "last_warning", "last_critical"
try:
payload = _json.loads(entry.value)
except Exception:
payload = {"raw": entry.value}
severity = entry.key.replace("last_", "")
alerts[severity] = payload
except Exception:
pass
# Phase 6 C.2: emerging-concepts proposals from the detector
proposals: dict = {}
try:
for entry in get_state("atocore"):
if entry.category != "proposals":
continue
try:
data = _json.loads(entry.value)
except Exception:
continue
if entry.key == "unregistered_projects":
proposals["unregistered_projects"] = data
elif entry.key == "emerging_categories":
proposals["emerging_categories"] = data
elif entry.key == "reinforced_transients":
proposals["reinforced_transients"] = data
except Exception:
pass
# Project state counts — include all registered projects
ps_counts = {}
try:
from atocore.projects.registry import load_project_registry as _lpr
for proj in _lpr():
try:
entries = get_state(proj.project_id)
ps_counts[proj.project_id] = len(entries)
except Exception:
pass
except Exception:
for proj_id in [
"p04-gigabit", "p05-interferometer", "p06-polisher", "atocore",
]:
try:
entries = get_state(proj_id)
ps_counts[proj_id] = len(entries)
except Exception:
pass
# Triage queue health
triage: dict = {
"pending": len(candidates),
"review_url": "/admin/triage",
}
if len(candidates) > 50:
triage["warning"] = f"High queue: {len(candidates)} candidates pending review."
elif len(candidates) > 20:
triage["notice"] = f"{len(candidates)} candidates awaiting triage."
# Recent audit activity (Phase 4 V1) — last 10 mutations for operator
recent_audit: list[dict] = []
try:
from atocore.memory.service import get_recent_audit as _gra
recent_audit = _gra(limit=10)
except Exception:
pass
return {
"memories": {
"active": len(active),
"candidates": len(candidates),
"by_type": type_counts,
"by_project": project_counts,
"reinforced": len(reinforced),
},
"project_state": {
"counts": ps_counts,
"total": sum(ps_counts.values()),
},
"interactions": interaction_stats,
"extraction_pipeline": extract_state,
"pipeline": pipeline,
"triage": triage,
"integrity": integrity,
"alerts": alerts,
"recent_audit": recent_audit,
"proposals": proposals,
}
# --- Engineering Knowledge Layer (Layer 2) ---
class EntityCreateRequest(BaseModel):
entity_type: str
name: str
# project accepts: "" (global, cross-project), "inbox" (pre-project
# lead / quote bucket), or any registered project id/alias. Unknown
# project names are stored verbatim (trust-preserving, same as
# pre-registry contract).
project: str | None = ""
description: str = ""
properties: dict | None = None
status: str = "active"
confidence: float = 1.0
source_refs: list[str] | None = None
# V1-0 provenance enforcement (F-8). Clients must either pass
# non-empty source_refs or set hand_authored=true. The service layer
# raises ValueError otherwise, surfaced here as 400.
hand_authored: bool = False
extractor_version: str | None = None
class EntityPromoteRequest(BaseModel):
target_project: str | None = None
note: str = ""
class RelationshipCreateRequest(BaseModel):
source_entity_id: str
target_entity_id: str
relationship_type: str
confidence: float = 1.0
source_refs: list[str] | None = None
@router.post("/entities")
def api_create_entity(req: EntityCreateRequest) -> dict:
"""Create a new engineering entity."""
try:
entity = create_entity(
entity_type=req.entity_type,
name=req.name,
project=req.project or "",
description=req.description,
properties=req.properties,
status=req.status,
confidence=req.confidence,
source_refs=req.source_refs,
actor="api-http",
hand_authored=req.hand_authored,
extractor_version=req.extractor_version,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "ok", "id": entity.id, "entity_type": entity.entity_type, "name": entity.name}
@router.get("/entities")
def api_list_entities(
entity_type: str | None = None,
project: str | None = None,
status: str = "active",
name_contains: str | None = None,
limit: int = 100,
scope_only: bool = False,
) -> dict:
"""List engineering entities with optional filters.
When ``project`` names a real project, cross-project entities
(``project=""``) are included by default. Pass ``scope_only=true`` to
restrict the result to that project's own entities only.
"""
entities = get_entities(
entity_type=entity_type,
project=project,
status=status,
name_contains=name_contains,
limit=limit,
scope_only=scope_only,
)
return {
"entities": [
{
"id": e.id,
"entity_type": e.entity_type,
"name": e.name,
"project": e.project,
"description": e.description,
"properties": e.properties,
"status": e.status,
"confidence": e.confidence,
}
for e in entities
],
"count": len(entities),
}
@router.get("/admin/conflicts")
def api_list_conflicts(project: str | None = None) -> dict:
"""Phase 5G: list open entity conflicts (optionally scoped to a project)."""
from atocore.engineering.conflicts import list_open_conflicts
conflicts = list_open_conflicts(project=project)
return {"conflicts": conflicts, "count": len(conflicts)}
class ConflictResolveRequest(BaseModel):
action: str # dismiss|supersede_others|no_action
winner_id: str | None = None
@router.post("/admin/conflicts/{conflict_id}/resolve")
def api_resolve_conflict(conflict_id: str, req: ConflictResolveRequest) -> dict:
"""Resolve a conflict. Options: dismiss, supersede_others (needs winner_id), no_action."""
from atocore.engineering.conflicts import resolve_conflict
try:
success = resolve_conflict(
conflict_id=conflict_id,
action=req.action,
winner_id=req.winner_id,
actor="api-http",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(status_code=404, detail=f"Conflict not found or already resolved: {conflict_id}")
return {"status": "resolved", "id": conflict_id, "action": req.action}
class GraduationRequestBody(BaseModel):
project: str = ""
limit: int = 30
@router.post("/admin/graduation/request")
def api_request_graduation(req: GraduationRequestBody) -> dict:
"""Request a host-side memory-graduation run.
Writes a flag in project_state with project + limit. A host cron
watcher picks it up within ~2 min and runs graduate_memories.py.
Mirrors the /admin/triage/request-drain pattern (bridges container
→ host because claude CLI lives on host, not container).
"""
import json as _json
from datetime import datetime as _dt, timezone as _tz
from atocore.context.project_state import set_state
now = _dt.now(_tz.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = _json.dumps({
"project": (req.project or "").strip(),
"limit": max(1, min(req.limit, 500)),
"requested_at": now,
})
set_state(
project_name="atocore",
category="config",
key="graduation_requested_at",
value=payload,
source="admin ui",
)
return {
"requested_at": now,
"project": req.project,
"limit": req.limit,
"note": "Host watcher picks up within ~2 min. Poll /admin/graduation/status for progress.",
}
@router.get("/admin/graduation/status")
def api_graduation_status() -> dict:
"""State of the graduation pipeline (UI polling)."""
import json as _json
from atocore.context.project_state import get_state
out = {
"requested": None,
"last_started_at": None,
"last_finished_at": None,
"last_result": None,
"is_running": False,
}
try:
for e in get_state("atocore"):
if e.category != "config" and e.category != "status":
continue
if e.key == "graduation_requested_at":
try:
out["requested"] = _json.loads(e.value)
except Exception:
out["requested"] = {"raw": e.value}
elif e.key == "graduation_last_started_at":
out["last_started_at"] = e.value
elif e.key == "graduation_last_finished_at":
out["last_finished_at"] = e.value
elif e.key == "graduation_last_result":
out["last_result"] = e.value
elif e.key == "graduation_running":
out["is_running"] = (e.value == "1")
except Exception:
pass
return out
# --- Phase 7C: tag canonicalization ---
class TagAliasProposalBody(BaseModel):
alias: str
canonical: str
confidence: float = 0.0
reason: str = ""
alias_count: int = 0
canonical_count: int = 0
class TagAliasApplyBody(BaseModel):
alias: str
canonical: str
confidence: float = 0.9
reason: str = ""
alias_count: int = 0
canonical_count: int = 0
actor: str = "auto-tag-canon"
class TagAliasResolveBody(BaseModel):
actor: str = "human-triage"
@router.get("/admin/tags/distribution")
def api_tag_distribution() -> dict:
"""Current tag distribution across active memories (for UI / debug)."""
from atocore.memory.service import get_tag_distribution
dist = get_tag_distribution()
sorted_tags = sorted(dist.items(), key=lambda x: x[1], reverse=True)
return {"total_references": sum(dist.values()), "unique_tags": len(dist),
"tags": [{"tag": t, "count": c} for t, c in sorted_tags]}
@router.get("/admin/tags/aliases")
def api_list_tag_aliases(status: str = "pending", limit: int = 100) -> dict:
"""List tag alias proposals (default: pending for review)."""
from atocore.memory.service import get_tag_alias_proposals
rows = get_tag_alias_proposals(status=status, limit=limit)
return {"proposals": rows, "count": len(rows)}
@router.post("/admin/tags/aliases/propose")
def api_propose_tag_alias(body: TagAliasProposalBody) -> dict:
"""Submit a low-confidence alias proposal for human review."""
from atocore.memory.service import create_tag_alias_proposal
pid = create_tag_alias_proposal(
alias=body.alias, canonical=body.canonical,
confidence=body.confidence, alias_count=body.alias_count,
canonical_count=body.canonical_count, reason=body.reason,
)
if pid is None:
return {"proposal_id": None, "duplicate": True}
return {"proposal_id": pid, "duplicate": False}
@router.post("/admin/tags/aliases/apply")
def api_apply_tag_alias(body: TagAliasApplyBody) -> dict:
"""Apply an alias rewrite directly (used by the auto-approval path).
Creates a tag_aliases row in status=approved with the apply result
recorded, so autonomous merges land in the same audit surface as
human approvals.
"""
from datetime import datetime as _dt, timezone as _tz
from atocore.memory.service import apply_tag_alias, create_tag_alias_proposal
from atocore.models.database import get_connection
# Record proposal + apply + mark approved in one flow
pid = create_tag_alias_proposal(
alias=body.alias, canonical=body.canonical,
confidence=body.confidence, alias_count=body.alias_count,
canonical_count=body.canonical_count, reason=body.reason,
)
if pid is None:
# A pending proposal already exists — don't double-apply.
raise HTTPException(status_code=409, detail="A pending proposal already exists for this (alias, canonical) pair — approve it via /admin/tags/aliases/{id}/approve")
try:
result = apply_tag_alias(body.alias, body.canonical, actor=body.actor)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
now_str = _dt.now(_tz.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
conn.execute(
"UPDATE tag_aliases SET status = 'approved', resolved_at = ?, "
"resolved_by = ?, applied_to_memories = ? WHERE id = ?",
(now_str, body.actor, result["memories_touched"], pid),
)
return {
"proposal_id": pid,
"memories_touched": result["memories_touched"],
"alias": body.alias, "canonical": body.canonical,
}
@router.post("/admin/tags/aliases/{proposal_id}/approve")
def api_approve_tag_alias(proposal_id: str, body: TagAliasResolveBody) -> dict:
"""Human-in-the-loop approve for a pending proposal."""
from atocore.memory.service import approve_tag_alias
result = approve_tag_alias(proposal_id, actor=body.actor)
if result is None:
raise HTTPException(status_code=404, detail="Proposal not found or already resolved")
return {"status": "approved", "proposal_id": proposal_id,
"memories_touched": result["memories_touched"]}
@router.post("/admin/tags/aliases/{proposal_id}/reject")
def api_reject_tag_alias(proposal_id: str, body: TagAliasResolveBody) -> dict:
"""Human-in-the-loop reject for a pending proposal."""
from atocore.memory.service import reject_tag_alias
if not reject_tag_alias(proposal_id, actor=body.actor):
raise HTTPException(status_code=404, detail="Proposal not found or already resolved")
return {"status": "rejected", "proposal_id": proposal_id}
class DecayRunBody(BaseModel):
idle_days_threshold: int = 30
daily_decay_factor: float = 0.97
supersede_confidence_floor: float = 0.30
@router.post("/admin/memory/decay-run")
def api_decay_run(body: DecayRunBody | None = None) -> dict:
"""Phase 7D — confidence decay on unreferenced memories.
One-shot run (daily cron or on-demand). For active memories with
reference_count=0 and idle for >30 days: multiply confidence by
0.97 (~2-month half-life). Below 0.3 → auto-supersede with audit.
Reversible: reinforcement bumps confidence back up. Non-destructive:
superseded memories stay queryable with status filter.
"""
from atocore.memory.service import decay_unreferenced_memories
b = body or DecayRunBody()
result = decay_unreferenced_memories(
idle_days_threshold=b.idle_days_threshold,
daily_decay_factor=b.daily_decay_factor,
supersede_confidence_floor=b.supersede_confidence_floor,
)
return {
"decayed_count": len(result["decayed"]),
"superseded_count": len(result["superseded"]),
"decayed": result["decayed"][:20], # cap payload
"superseded": result["superseded"][:20],
}
@router.post("/admin/memory/extend-reinforced")
def api_extend_reinforced() -> dict:
"""Phase 6 C.3 — batch transient-to-durable extension.
Scans active memories with valid_until in the next 30 days and
reference_count >= 5. Extends expiry by 90 days, or clears it
entirely (permanent) if reference_count >= 10. Writes audit rows.
"""
from atocore.memory.service import extend_reinforced_valid_until
extended = extend_reinforced_valid_until()
return {"extended_count": len(extended), "extensions": extended}
# --- Phase 7A: memory dedup / merge-candidate lifecycle ---
class MergeCandidateCreateBody(BaseModel):
memory_ids: list[str]
similarity: float = 0.0
proposed_content: str
proposed_memory_type: str = "knowledge"
proposed_project: str = ""
proposed_tags: list[str] = []
proposed_confidence: float = 0.6
reason: str = ""
class MergeCandidateApproveBody(BaseModel):
actor: str = "human-triage"
content: str | None = None
domain_tags: list[str] | None = None
class MergeCandidateRejectBody(BaseModel):
actor: str = "human-triage"
note: str = ""
class DedupScanRequestBody(BaseModel):
project: str = ""
similarity_threshold: float = 0.88
max_batch: int = 50
class DedupClusterBody(BaseModel):
project: str = ""
similarity_threshold: float = 0.88
max_clusters: int = 100
@router.post("/admin/memory/dedup-cluster")
def api_dedup_cluster(body: DedupClusterBody) -> dict:
"""Server-side near-duplicate clustering for Phase 7A dedup detector.
Host-side scripts/memory_dedup.py can't import atocore.memory.similarity
(that would transitively pull sentence-transformers + torch onto the
host Python, which intentionally stays lean). Instead the host posts
here; we compute embeddings + transitive clusters server-side and
return the cluster shape the host needs to draft merges via claude CLI.
Buckets by (project, memory_type) — cross-bucket merges are deferred
to the 7B contradiction flow. Active non-graduated memories only.
Returns up to max_clusters clusters of size >= 2, ordered by min
intra-cluster similarity descending (strongest candidates first)."""
from atocore.memory.service import get_memories
from atocore.memory.similarity import cluster_by_threshold, similarity_matrix
project_filter = (body.project or "").strip() or None
threshold = max(0.5, min(0.99, body.similarity_threshold))
mems = get_memories(
project=project_filter,
active_only=True,
limit=2000,
)
# Drop graduated (frozen entity pointers) — they're exempt from dedup
mems = [m for m in mems if m.status == "active"]
# Group by (project, memory_type)
buckets: dict[tuple[str, str], list] = {}
for m in mems:
key = ((m.project or "").lower(), (m.memory_type or "").lower())
buckets.setdefault(key, []).append(m)
out_clusters: list[dict] = []
for (proj, mtype), group in sorted(buckets.items()):
if len(group) < 2:
continue
texts = [m.content or "" for m in group]
clusters = cluster_by_threshold(texts, threshold)
clusters = [c for c in clusters if len(c) >= 2]
if not clusters:
continue
# Cache matrix once per bucket so we can report min pairwise sim
matrix = similarity_matrix(texts)
for cluster in clusters:
min_sim = 1.0
for i in range(len(cluster)):
for j in range(i + 1, len(cluster)):
s = matrix[cluster[i]][cluster[j]]
if s < min_sim:
min_sim = s
sources = []
for idx in cluster:
m = group[idx]
sources.append({
"id": m.id,
"memory_type": m.memory_type,
"content": m.content,
"project": m.project or "",
"confidence": m.confidence,
"reference_count": m.reference_count,
"domain_tags": m.domain_tags or [],
"valid_until": m.valid_until or "",
})
out_clusters.append({
"project": proj,
"memory_type": mtype,
"min_similarity": round(min_sim, 4),
"size": len(cluster),
"memory_ids": [s["id"] for s in sources],
"sources": sources,
})
# Strongest clusters first
out_clusters.sort(key=lambda c: -c["min_similarity"])
out_clusters = out_clusters[:body.max_clusters]
return {
"cluster_count": len(out_clusters),
"threshold": threshold,
"project_filter": project_filter or "",
"total_active_scanned": len(mems),
"bucket_count": sum(1 for g in buckets.values() if len(g) >= 2),
"clusters": out_clusters,
}
@router.get("/admin/memory/merge-candidates")
def api_list_merge_candidates(status: str = "pending", limit: int = 100) -> dict:
"""Phase 7A: list merge-candidate proposals for triage UI."""
from atocore.memory.service import get_merge_candidates
cands = get_merge_candidates(status=status, limit=limit)
return {"candidates": cands, "count": len(cands)}
@router.post("/admin/memory/merge-candidates/create")
def api_create_merge_candidate(body: MergeCandidateCreateBody) -> dict:
"""Phase 7A: host-side dedup detector submits a proposal here.
Server-side idempotency: if a pending candidate already exists for
the same sorted memory_id set, returns the existing id.
"""
from atocore.memory.service import create_merge_candidate
try:
cid = create_merge_candidate(
memory_ids=body.memory_ids,
similarity=body.similarity,
proposed_content=body.proposed_content,
proposed_memory_type=body.proposed_memory_type,
proposed_project=body.proposed_project,
proposed_tags=body.proposed_tags,
proposed_confidence=body.proposed_confidence,
reason=body.reason,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if cid is None:
return {"candidate_id": None, "duplicate": True}
return {"candidate_id": cid, "duplicate": False}
@router.post("/admin/memory/merge-candidates/{candidate_id}/approve")
def api_approve_merge_candidate(candidate_id: str, body: MergeCandidateApproveBody) -> dict:
"""Phase 7A: execute an approved merge. Sources → superseded; new
merged memory created. UI can pass content/tag edits via body."""
from atocore.memory.service import merge_memories
new_id = merge_memories(
candidate_id=candidate_id,
actor=body.actor,
override_content=body.content,
override_tags=body.domain_tags,
)
if new_id is None:
raise HTTPException(
status_code=409,
detail="Merge could not execute (candidate not pending, or source memory tampered)",
)
return {"status": "approved", "candidate_id": candidate_id, "result_memory_id": new_id}
@router.post("/admin/memory/merge-candidates/{candidate_id}/reject")
def api_reject_merge_candidate(candidate_id: str, body: MergeCandidateRejectBody) -> dict:
"""Phase 7A: dismiss a merge candidate. Sources stay untouched."""
from atocore.memory.service import reject_merge_candidate
ok = reject_merge_candidate(candidate_id, actor=body.actor, note=body.note)
if not ok:
raise HTTPException(status_code=404, detail="Candidate not found or already resolved")
return {"status": "rejected", "candidate_id": candidate_id}
@router.post("/admin/memory/dedup-scan")
def api_request_dedup_scan(body: DedupScanRequestBody) -> dict:
"""Phase 7A: request a host-side dedup scan.
Writes a flag in project_state with project + threshold + max_batch.
A host cron watcher picks it up within ~2 min and runs
scripts/memory_dedup.py. Mirrors /admin/graduation/request.
"""
import json as _json
from datetime import datetime as _dt, timezone as _tz
from atocore.context.project_state import set_state
now = _dt.now(_tz.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = _json.dumps({
"project": (body.project or "").strip(),
"similarity_threshold": max(0.5, min(0.99, body.similarity_threshold)),
"max_batch": max(1, min(body.max_batch, 200)),
"requested_at": now,
})
set_state(
project_name="atocore",
category="config",
key="dedup_requested_at",
value=payload,
source="admin ui",
)
return {
"requested_at": now,
"project": body.project,
"similarity_threshold": body.similarity_threshold,
"max_batch": body.max_batch,
"note": "Host watcher picks up within ~2 min. Poll /admin/memory/dedup-status for progress.",
}
@router.get("/admin/memory/dedup-status")
def api_dedup_status() -> dict:
"""Phase 7A: state of the dedup scan pipeline (UI polling)."""
import json as _json
from atocore.context.project_state import get_state
out = {
"requested": None,
"last_started_at": None,
"last_finished_at": None,
"last_result": None,
"is_running": False,
}
try:
for e in get_state("atocore"):
if e.category not in ("config", "status"):
continue
if e.key == "dedup_requested_at":
try:
out["requested"] = _json.loads(e.value)
except Exception:
out["requested"] = {"raw": e.value}
elif e.key == "dedup_last_started_at":
out["last_started_at"] = e.value
elif e.key == "dedup_last_finished_at":
out["last_finished_at"] = e.value
elif e.key == "dedup_last_result":
out["last_result"] = e.value
elif e.key == "dedup_running":
out["is_running"] = (e.value == "1")
except Exception:
pass
return out
@router.get("/admin/graduation/stats")
def api_graduation_stats() -> dict:
"""Phase 5F graduation stats for dashboard."""
from atocore.models.database import get_connection
with get_connection() as conn:
total_memories = int(conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0])
graduated = int(conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'graduated'").fetchone()[0])
entity_candidates_from_mem = int(conn.execute(
"SELECT COUNT(*) FROM entities WHERE status = 'candidate' "
"AND source_refs LIKE '%memory:%'"
).fetchone()[0])
active_entities = int(conn.execute("SELECT COUNT(*) FROM entities WHERE status = 'active'").fetchone()[0])
return {
"active_memories": total_memories,
"graduated_memories": graduated,
"entity_candidates_from_memories": entity_candidates_from_mem,
"active_entities": active_entities,
"graduation_rate": (
graduated / (total_memories + graduated) if (total_memories + graduated) > 0 else 0.0
),
}
# --- Phase 5 Engineering V1: The 10 canonical queries ---
@router.get("/engineering/projects/{project_name}/systems")
def api_system_map(project_name: str) -> dict:
"""Q-001 + Q-004: subsystem/component tree for a project."""
from atocore.engineering.queries import system_map
return system_map(project_name)
@router.get("/engineering/decisions")
def api_decisions_affecting(
project: str,
subsystem: str | None = None,
) -> dict:
"""Q-008: decisions affecting a subsystem (or the whole project)."""
from atocore.engineering.queries import decisions_affecting
return decisions_affecting(project, subsystem_id=subsystem)
@router.get("/engineering/components/{component_id}/requirements")
def api_requirements_for_component(component_id: str) -> dict:
"""Q-005: requirements that a component satisfies."""
from atocore.engineering.queries import requirements_for
return requirements_for(component_id)
@router.get("/engineering/changes")
def api_recent_engineering_changes(
project: str,
since: str | None = None,
limit: int = 50,
) -> dict:
"""Q-013: entity changes in project since timestamp."""
from atocore.engineering.queries import recent_changes
return recent_changes(project, since=since, limit=limit)
@router.get("/engineering/gaps/orphan-requirements")
def api_orphan_requirements(project: str) -> dict:
"""Q-006 (killer): requirements with no SATISFIES edge."""
from atocore.engineering.queries import orphan_requirements
return orphan_requirements(project)
@router.get("/engineering/gaps/risky-decisions")
def api_risky_decisions(project: str) -> dict:
"""Q-009 (killer): decisions resting on flagged/superseded assumptions."""
from atocore.engineering.queries import risky_decisions
return risky_decisions(project)
@router.get("/engineering/gaps/unsupported-claims")
def api_unsupported_claims(project: str) -> dict:
"""Q-011 (killer): validation claims with no SUPPORTS edge."""
from atocore.engineering.queries import unsupported_claims
return unsupported_claims(project)
@router.get("/engineering/gaps")
def api_all_gaps(project: str) -> dict:
"""Combined Q-006 + Q-009 + Q-011 for a project."""
from atocore.engineering.queries import all_gaps
return all_gaps(project)
@router.get("/engineering/impact")
def api_impact_analysis(entity: str, max_depth: int = 3) -> dict:
"""Q-016: transitive outbound impact of changing an entity."""
from atocore.engineering.queries import impact_analysis
return impact_analysis(entity, max_depth=max_depth)
@router.get("/engineering/evidence")
def api_evidence_chain(entity: str) -> dict:
"""Q-017: inbound evidence chain for an entity."""
from atocore.engineering.queries import evidence_chain
return evidence_chain(entity)
@router.post("/entities/{entity_id}/promote")
def api_promote_entity(
entity_id: str,
req: EntityPromoteRequest | None = None,
) -> dict:
"""Promote a candidate entity to active.
Optional ``target_project`` in the body retargets the entity's
project on promote — used to graduate inbox/global leads into a real
project when they mature (Issue C).
"""
from atocore.engineering.service import promote_entity
target_project = req.target_project if req is not None else None
note = req.note if req is not None else ""
success = promote_entity(
entity_id,
actor="api-http",
note=note,
target_project=target_project,
)
if not success:
raise HTTPException(status_code=404, detail=f"Entity not found or not a candidate: {entity_id}")
result = {"status": "promoted", "id": entity_id}
if target_project is not None:
result["target_project"] = target_project
return result
@router.post("/entities/{entity_id}/reject")
def api_reject_entity(entity_id: str) -> dict:
"""Reject a candidate entity (Phase 5)."""
from atocore.engineering.service import reject_entity_candidate
success = reject_entity_candidate(entity_id, actor="api-http")
if not success:
raise HTTPException(status_code=404, detail=f"Entity not found or not a candidate: {entity_id}")
return {"status": "rejected", "id": entity_id}
class EntityPatchRequest(BaseModel):
"""Partial update for an existing entity.
``properties`` is a shallow merge: keys with ``null`` delete,
keys with a value overwrite. ``source_refs`` is append-only
(duplicates filtered). Omit a field to leave it unchanged.
"""
description: str | None = None
properties: dict | None = None
confidence: float | None = None
source_refs: list[str] | None = None
note: str = ""
class EntityInvalidateRequest(BaseModel):
reason: str = ""
class EntitySupersedeRequest(BaseModel):
superseded_by: str
reason: str = ""
@router.patch("/entities/{entity_id}")
def api_patch_entity(entity_id: str, req: EntityPatchRequest) -> dict:
"""Update mutable fields on an existing entity.
Allowed: description, properties (shallow merge, null=delete key),
confidence (0..1), source_refs (append, dedup). Forbidden:
entity_type, project, name, status — those require supersede+create
or the dedicated status endpoints.
"""
from atocore.engineering.service import update_entity
try:
updated = update_entity(
entity_id,
description=req.description,
properties_patch=req.properties,
confidence=req.confidence,
append_source_refs=req.source_refs,
actor="api-http",
note=req.note,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if updated is None:
raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}")
return {
"status": "updated",
"id": updated.id,
"entity_type": updated.entity_type,
"name": updated.name,
"description": updated.description,
"properties": updated.properties,
"confidence": updated.confidence,
"source_refs": updated.source_refs,
}
@router.post("/entities/{entity_id}/invalidate")
def api_invalidate_entity(
entity_id: str,
req: EntityInvalidateRequest | None = None,
) -> dict:
"""Retract an active entity (Issue E — active → invalid).
Idempotent: invalidating an already-invalid entity returns 200 with
``status=already_invalid``. Use ``POST /entities/{id}/reject`` for
the distinct candidate→invalid transition.
"""
from atocore.engineering.service import invalidate_active_entity
reason = req.reason if req else ""
ok, code = invalidate_active_entity(
entity_id, actor="api-http", reason=reason,
)
if code == "not_found":
raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}")
if code == "not_active":
raise HTTPException(
status_code=409,
detail=(
f"Entity {entity_id} is not active; use "
"/reject for candidates or /supersede as appropriate"
),
)
return {"status": code, "id": entity_id}
@router.post("/entities/{entity_id}/supersede")
def api_supersede_entity(
entity_id: str,
req: EntitySupersedeRequest,
) -> dict:
"""Supersede an active entity by a newer one (Issue E).
Sets status to ``superseded`` and creates a ``supersedes``
relationship from ``superseded_by`` → ``entity_id`` so the graph
reflects the replacement without a second API call.
"""
from atocore.engineering.service import supersede_entity
try:
ok = supersede_entity(
entity_id,
actor="api-http",
note=req.reason,
superseded_by=req.superseded_by,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not ok:
raise HTTPException(
status_code=409,
detail=f"Entity {entity_id} cannot be superseded (already superseded or not active)",
)
return {
"status": "superseded",
"id": entity_id,
"superseded_by": req.superseded_by,
}
@router.get("/entities/{entity_id}/audit")
def api_entity_audit(entity_id: str, limit: int = 100) -> dict:
"""Return the audit history for a specific entity."""
from atocore.engineering.service import get_entity_audit
entries = get_entity_audit(entity_id, limit=limit)
return {"entity_id": entity_id, "entries": entries, "count": len(entries)}
@router.get("/entities/{entity_id}")
def api_get_entity(entity_id: str) -> dict:
"""Get an entity with its relationships and related entities."""
result = get_entity_with_context(entity_id)
if result is None:
raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}")
entity = result["entity"]
return {
"entity": {
"id": entity.id,
"entity_type": entity.entity_type,
"name": entity.name,
"project": entity.project,
"description": entity.description,
"properties": entity.properties,
"status": entity.status,
"confidence": entity.confidence,
"source_refs": entity.source_refs,
"created_at": entity.created_at,
"updated_at": entity.updated_at,
},
"relationships": [
{
"id": r.id,
"source_entity_id": r.source_entity_id,
"target_entity_id": r.target_entity_id,
"relationship_type": r.relationship_type,
"confidence": r.confidence,
}
for r in result["relationships"]
],
"related_entities": {
eid: {
"entity_type": e.entity_type,
"name": e.name,
"project": e.project,
"description": e.description[:200],
}
for eid, e in result["related_entities"].items()
},
}
@router.post("/relationships")
def api_create_relationship(req: RelationshipCreateRequest) -> dict:
"""Create a relationship between two entities."""
try:
rel = create_relationship(
source_entity_id=req.source_entity_id,
target_entity_id=req.target_entity_id,
relationship_type=req.relationship_type,
confidence=req.confidence,
source_refs=req.source_refs,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "ok",
"id": rel.id,
"relationship_type": rel.relationship_type,
}
@router.get("/projects/{project_name}/mirror.html", response_class=HTMLResponse)
def api_project_mirror_html(project_name: str) -> HTMLResponse:
"""Serve a readable HTML project overview page.
Open in a browser for a clean, styled project dashboard derived
from AtoCore's structured data. Source of truth is the database —
this page is a derived view.
"""
from atocore.projects.registry import resolve_project_name as _resolve
canonical = _resolve(project_name)
try:
md_content = generate_project_overview(canonical)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Mirror generation failed: {e}")
import markdown
html_body = markdown.markdown(md_content, extensions=["tables", "fenced_code"])
html = _MIRROR_HTML_TEMPLATE.replace("{{title}}", f"{canonical} — AtoCore Mirror")
html = html.replace("{{body}}", html_body)
return HTMLResponse(content=html)
_MIRROR_HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{{title}}</title>
<style>
:root { --bg: #fafafa; --text: #1a1a2e; --accent: #2563eb; --border: #e2e8f0; --card: #fff; }
@media (prefers-color-scheme: dark) {
:root { --bg: #0f172a; --text: #e2e8f0; --accent: #60a5fa; --border: #334155; --card: #1e293b; }
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.7; color: var(--text); background: var(--bg);
max-width: 800px; margin: 0 auto; padding: 2rem 1.5rem;
}
h1 { font-size: 1.8rem; margin-bottom: 0.5rem; color: var(--accent); }
h2 { font-size: 1.4rem; margin-top: 2.5rem; margin-bottom: 0.8rem; padding-bottom: 0.3rem; border-bottom: 2px solid var(--border); }
h3 { font-size: 1.15rem; margin-top: 1.5rem; margin-bottom: 0.5rem; }
p { margin-bottom: 0.8rem; }
ul { margin-left: 1.5rem; margin-bottom: 1rem; }
li { margin-bottom: 0.4rem; }
li ul { margin-top: 0.3rem; }
strong { color: var(--accent); font-weight: 600; }
em { opacity: 0.7; font-size: 0.9em; }
blockquote {
background: var(--card); border-left: 4px solid var(--accent);
padding: 0.8rem 1.2rem; margin: 1rem 0; border-radius: 0 8px 8px 0;
}
hr { border: none; border-top: 1px solid var(--border); margin: 2rem 0; }
code { background: var(--card); padding: 0.15rem 0.4rem; border-radius: 4px; font-size: 0.9em; }
a { color: var(--accent); text-decoration: none; }
a:hover { text-decoration: underline; }
</style>
</head>
<body>
{{body}}
</body>
</html>"""
@router.get("/projects/{project_name}/mirror")
def api_project_mirror(project_name: str) -> dict:
"""Generate a human-readable project overview from structured data.
Layer 3 of the AtoCore architecture. The mirror is DERIVED from
entities, project state, and memories — it is not canonical truth.
Returns markdown that can be rendered, saved to a file, or served
as a dashboard page.
"""
from atocore.projects.registry import resolve_project_name as _resolve
canonical = _resolve(project_name)
try:
markdown = generate_project_overview(canonical)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Mirror generation failed: {e}")
return {"project": canonical, "format": "markdown", "content": markdown}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""
result = validate_backup(stamp)
if not result.get("exists", False):
raise HTTPException(status_code=404, detail=f"Backup not found: {stamp}")
return result
@router.get("/health")
def api_health() -> dict:
"""Health check.
Three layers of version reporting, in increasing precision:
- ``version`` / ``code_version``: ``atocore.__version__`` (e.g.
"0.2.0"). Bumped manually on commits that change the API
surface, schema, or user-visible behavior. Coarse — any
number of commits can land between bumps without changing
this value.
- ``build_sha``: full git SHA of the commit the running
container was built from. Set by ``deploy/dalidou/deploy.sh``
via the ``ATOCORE_BUILD_SHA`` env var on every rebuild.
Reports ``"unknown"`` for builds that bypass deploy.sh
(direct ``docker compose up`` etc.). This is the precise
drift signal: if the live ``build_sha`` doesn't match the
tip of the deployed branch on Gitea, the service is stale
regardless of what ``code_version`` says.
- ``build_time`` / ``build_branch``: when and from which branch
the live container was built. Useful for forensics when
multiple branches are in flight or when build_sha is
ambiguous (e.g. a force-push to the same SHA).
The deploy.sh post-deploy verification step compares the live
``build_sha`` to the SHA it just set, and exits non-zero on
mismatch.
"""
import os
from atocore import __version__
store = get_vector_store()
source_status = get_source_status()
return {
"status": "ok",
"version": __version__,
"code_version": __version__,
"build_sha": os.environ.get("ATOCORE_BUILD_SHA", "unknown"),
"build_time": os.environ.get("ATOCORE_BUILD_TIME", "unknown"),
"build_branch": os.environ.get("ATOCORE_BUILD_BRANCH", "unknown"),
"vectors_count": store.count,
"env": _config.settings.env,
"machine_paths": {
"db_path": str(_config.settings.db_path),
"chroma_path": str(_config.settings.chroma_path),
"log_dir": str(_config.settings.resolved_log_dir),
"backup_dir": str(_config.settings.resolved_backup_dir),
"run_dir": str(_config.settings.resolved_run_dir),
},
"sources_ready": all(
(not source["enabled"]) or (source["exists"] and source["is_dir"])
for source in source_status
),
"source_status": source_status,
}
@router.get("/sources")
def api_sources() -> dict:
"""Return configured ingestion source directories and readiness."""
return {
"sources": get_source_status(),
"vault_enabled": _config.settings.source_vault_enabled,
"drive_enabled": _config.settings.source_drive_enabled,
}
@router.get("/stats")
def api_stats() -> dict:
"""Ingestion statistics."""
return get_ingestion_stats()
@router.get("/debug/context")
def api_debug_context() -> dict:
"""Inspect the last assembled context pack."""
pack = get_last_context_pack()
if pack is None:
return {"message": "No context pack built yet."}
return _pack_to_dict(pack)
# --- Issue F: binary asset store (visual evidence) ---
@router.post("/assets")
async def api_upload_asset(
file: UploadFile = File(...),
project: str = Form(""),
caption: str = Form(""),
source_refs: str = Form(""),
) -> dict:
"""Upload a binary asset (image, PDF, CAD export).
Idempotent on SHA-256 content hash. ``source_refs`` is a JSON-encoded
list of provenance pointers (e.g. ``["session:<id>"]``); pass an
empty string for none. MIME type is inferred from the upload's
Content-Type header.
"""
from atocore.assets import (
AssetTooLarge,
AssetTypeNotAllowed,
store_asset,
)
import json as _json
data = await file.read()
try:
refs = _json.loads(source_refs) if source_refs else []
if not isinstance(refs, list):
raise ValueError("source_refs must be a JSON array")
refs = [str(r) for r in refs]
except (ValueError, _json.JSONDecodeError) as e:
raise HTTPException(
status_code=400,
detail=f"source_refs must be a JSON array of strings: {e}",
)
mime_type = (file.content_type or "").split(";", 1)[0].strip()
if not mime_type:
raise HTTPException(
status_code=400,
detail="Upload missing Content-Type; cannot determine mime_type",
)
try:
asset = store_asset(
data=data,
mime_type=mime_type,
original_filename=file.filename or "",
project=project or "",
caption=caption or "",
source_refs=refs,
)
except AssetTooLarge as e:
raise HTTPException(status_code=413, detail=str(e))
except AssetTypeNotAllowed as e:
raise HTTPException(status_code=415, detail=str(e))
return asset.to_dict()
@router.get("/assets/{asset_id}")
def api_get_asset_binary(asset_id: str):
"""Return the original binary with its stored Content-Type."""
from atocore.assets import AssetNotFound, get_asset_binary
try:
asset, data = get_asset_binary(asset_id)
except AssetNotFound as e:
raise HTTPException(status_code=404, detail=str(e))
headers = {
"Cache-Control": "private, max-age=3600",
"ETag": f'"{asset.hash_sha256}"',
}
return Response(content=data, media_type=asset.mime_type, headers=headers)
@router.get("/assets/{asset_id}/thumbnail")
def api_get_asset_thumbnail(asset_id: str, size: int = 240):
"""Return a generated thumbnail (images only). Max side ``size`` px."""
from atocore.assets import AssetError, AssetNotFound, get_thumbnail
try:
asset, data = get_thumbnail(asset_id, size=size)
except AssetNotFound as e:
raise HTTPException(status_code=404, detail=str(e))
except AssetError as e:
raise HTTPException(status_code=415, detail=str(e))
headers = {
"Cache-Control": "private, max-age=86400",
"ETag": f'"{asset.hash_sha256}-{size}"',
}
return Response(content=data, media_type="image/jpeg", headers=headers)
@router.get("/assets/{asset_id}/meta")
def api_get_asset_meta(asset_id: str) -> dict:
"""Return asset metadata without the binary."""
from atocore.assets import get_asset
asset = get_asset(asset_id)
if asset is None:
raise HTTPException(status_code=404, detail=f"Asset not found: {asset_id}")
return asset.to_dict()
@router.get("/admin/assets/orphans")
def api_list_asset_orphans(limit: int = 200) -> dict:
"""List assets with no referencing active entity."""
from atocore.assets import list_orphan_assets
orphans = list_orphan_assets(limit=limit)
return {
"orphans": [a.to_dict() for a in orphans],
"count": len(orphans),
}
@router.delete("/assets/{asset_id}")
def api_invalidate_asset(asset_id: str) -> dict:
"""Tombstone an asset. No-op if still referenced by an active entity."""
from atocore.assets import get_asset, invalidate_asset
if get_asset(asset_id) is None:
raise HTTPException(status_code=404, detail=f"Asset not found: {asset_id}")
ok = invalidate_asset(asset_id, actor="api-http")
if not ok:
raise HTTPException(
status_code=409,
detail=f"Asset {asset_id} is still referenced; "
"unlink EVIDENCED_BY relationships or retarget entity.properties.asset_id first",
)
return {"status": "invalidated", "id": asset_id}
@router.get("/entities/{entity_id}/evidence")
def api_get_entity_evidence(entity_id: str) -> dict:
"""Return artifact entities linked to this one via EVIDENCED_BY.
Each entry carries the artifact entity plus its resolved asset
metadata so the caller can build thumbnail URLs without a second
query. Non-artifact evidenced_by targets are skipped (the assumption
is that visual evidence is always an artifact entity).
"""
from atocore.engineering.service import (
get_entity,
get_relationships,
)
from atocore.assets import get_asset
entity = get_entity(entity_id)
if entity is None:
raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}")
rels = get_relationships(entity_id, direction="outgoing")
evidence: list[dict] = []
for rel in rels:
if rel.relationship_type != "evidenced_by":
continue
target = get_entity(rel.target_entity_id)
if target is None or target.entity_type != "artifact":
continue
asset_id = (target.properties or {}).get("asset_id")
asset = get_asset(asset_id) if asset_id else None
evidence.append({
"entity_id": target.id,
"name": target.name,
"kind": (target.properties or {}).get("kind", "other"),
"caption": (target.properties or {}).get("caption", ""),
"capture_context": (target.properties or {}).get("capture_context", ""),
"asset": asset.to_dict() if asset else None,
"relationship_id": rel.id,
})
return {"entity_id": entity_id, "evidence": evidence, "count": len(evidence)}