"""FastAPI route definitions.""" from pathlib import Path from fastapi import APIRouter, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel import atocore.config as _config from atocore.context.builder import ( build_context, get_last_context_pack, _pack_to_dict, ) from atocore.context.project_state import ( CATEGORIES, get_state, invalidate_state, set_state, ) from atocore.ingestion.pipeline import ( exclusive_ingestion, get_ingestion_stats, get_source_status, ingest_configured_sources, ingest_file, ingest_folder, ) from atocore.interactions.service import ( get_interaction, list_interactions, record_interaction, ) from atocore.engineering.mirror import generate_project_overview from atocore.engineering.wiki import ( render_entity, render_homepage, render_project, render_search, ) from atocore.engineering.service import ( ENTITY_TYPES, RELATIONSHIP_TYPES, create_entity, create_relationship, get_entities, get_entity, get_entity_with_context, get_relationships, ) from atocore.memory.extractor import ( EXTRACTOR_VERSION, MemoryCandidate, extract_candidates_from_interaction, ) from atocore.memory.extractor_llm import ( LLM_EXTRACTOR_VERSION, _cli_available as _llm_cli_available, extract_candidates_llm, ) from atocore.memory.reinforcement import reinforce_from_interaction from atocore.memory.service import ( MEMORY_STATUSES, MEMORY_TYPES, create_memory, get_memories, invalidate_memory, promote_memory, reject_candidate_memory, supersede_memory, update_memory, ) from atocore.observability.logger import get_logger from atocore.ops.backup import ( cleanup_old_backups, create_runtime_backup, list_runtime_backups, validate_backup, ) from atocore.projects.registry import ( build_project_registration_proposal, get_project_registry_template, list_registered_projects, register_project, refresh_registered_project, update_project, ) from atocore.retrieval.retriever import retrieve from atocore.retrieval.vector_store import get_vector_store router = APIRouter() log = get_logger("api") # --- Wiki routes (HTML, served first for clean URLs) --- @router.get("/wiki", response_class=HTMLResponse) def wiki_home() -> HTMLResponse: return HTMLResponse(content=render_homepage()) @router.get("/wiki/projects/{project_name}", response_class=HTMLResponse) def wiki_project(project_name: str) -> HTMLResponse: from atocore.projects.registry import resolve_project_name as _resolve return HTMLResponse(content=render_project(_resolve(project_name))) @router.get("/wiki/entities/{entity_id}", response_class=HTMLResponse) def wiki_entity(entity_id: str) -> HTMLResponse: html = render_entity(entity_id) if html is None: raise HTTPException(status_code=404, detail="Entity not found") return HTMLResponse(content=html) @router.get("/wiki/search", response_class=HTMLResponse) def wiki_search(q: str = "") -> HTMLResponse: return HTMLResponse(content=render_search(q)) # --- Request/Response models --- class IngestRequest(BaseModel): path: str class IngestResponse(BaseModel): results: list[dict] class IngestSourcesResponse(BaseModel): results: list[dict] class ProjectRefreshResponse(BaseModel): project: str aliases: list[str] description: str purge_deleted: bool status: str roots_ingested: int roots_skipped: int roots: list[dict] class ProjectRegistrationProposalRequest(BaseModel): project_id: str aliases: list[str] = [] description: str = "" ingest_roots: list[dict] class ProjectUpdateRequest(BaseModel): aliases: list[str] | None = None description: str | None = None ingest_roots: list[dict] | None = None class QueryRequest(BaseModel): prompt: str top_k: int = 10 filter_tags: list[str] | None = None project: str | None = None class QueryResponse(BaseModel): results: list[dict] class ContextBuildRequest(BaseModel): prompt: str project: str | None = None budget: int | None = None class ContextBuildResponse(BaseModel): formatted_context: str full_prompt: str chunks_used: int total_chars: int budget: int budget_remaining: int duration_ms: int chunks: list[dict] class MemoryCreateRequest(BaseModel): memory_type: str content: str project: str = "" confidence: float = 1.0 status: str = "active" class MemoryUpdateRequest(BaseModel): content: str | None = None confidence: float | None = None status: str | None = None class ProjectStateSetRequest(BaseModel): project: str category: str key: str value: str source: str = "" confidence: float = 1.0 class ProjectStateGetRequest(BaseModel): project: str category: str | None = None class ProjectStateInvalidateRequest(BaseModel): project: str category: str key: str # --- Endpoints --- @router.post("/ingest", response_model=IngestResponse) def api_ingest(req: IngestRequest) -> IngestResponse: """Ingest a markdown file or folder.""" target = Path(req.path) try: with exclusive_ingestion(): if target.is_file(): results = [ingest_file(target)] elif target.is_dir(): results = ingest_folder(target) else: raise HTTPException(status_code=404, detail=f"Path not found: {req.path}") except HTTPException: raise except Exception as e: log.error("ingest_failed", path=req.path, error=str(e)) raise HTTPException(status_code=500, detail=f"Ingestion failed: {e}") return IngestResponse(results=results) @router.post("/ingest/sources", response_model=IngestSourcesResponse) def api_ingest_sources() -> IngestSourcesResponse: """Ingest enabled configured source directories.""" try: with exclusive_ingestion(): results = ingest_configured_sources() except Exception as e: log.error("ingest_sources_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Configured source ingestion failed: {e}") return IngestSourcesResponse(results=results) @router.get("/projects") def api_projects() -> dict: """Return registered projects and their resolved ingest roots.""" return { "projects": list_registered_projects(), "registry_path": str(_config.settings.resolved_project_registry_path), } @router.get("/projects/template") def api_projects_template() -> dict: """Return a starter template for project registry entries.""" return { "template": get_project_registry_template(), "registry_path": str(_config.settings.resolved_project_registry_path), "allowed_sources": ["vault", "drive"], } @router.post("/projects/proposal") def api_project_registration_proposal(req: ProjectRegistrationProposalRequest) -> dict: """Return a normalized project registration proposal without writing it.""" try: return build_project_registration_proposal( project_id=req.project_id, aliases=req.aliases, description=req.description, ingest_roots=req.ingest_roots, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/projects/register") def api_project_registration(req: ProjectRegistrationProposalRequest) -> dict: """Persist a validated project registration to the registry file.""" try: return register_project( project_id=req.project_id, aliases=req.aliases, description=req.description, ingest_roots=req.ingest_roots, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.put("/projects/{project_name}") def api_project_update(project_name: str, req: ProjectUpdateRequest) -> dict: """Update an existing project registration.""" try: return update_project( project_name=project_name, aliases=req.aliases, description=req.description, ingest_roots=req.ingest_roots, ) except ValueError as e: detail = str(e) if detail.startswith("Unknown project"): raise HTTPException(status_code=404, detail=detail) raise HTTPException(status_code=400, detail=detail) @router.post("/projects/{project_name}/refresh", response_model=ProjectRefreshResponse) def api_refresh_project(project_name: str, purge_deleted: bool = False) -> ProjectRefreshResponse: """Refresh one registered project from its configured ingest roots.""" try: with exclusive_ingestion(): result = refresh_registered_project(project_name, purge_deleted=purge_deleted) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: log.error("project_refresh_failed", project=project_name, error=str(e)) raise HTTPException(status_code=500, detail=f"Project refresh failed: {e}") return ProjectRefreshResponse(**result) @router.post("/query", response_model=QueryResponse) def api_query(req: QueryRequest) -> QueryResponse: """Retrieve relevant chunks for a prompt.""" try: chunks = retrieve( req.prompt, top_k=req.top_k, filter_tags=req.filter_tags, project_hint=req.project, ) except Exception as e: log.error("query_failed", prompt=req.prompt[:100], error=str(e)) raise HTTPException(status_code=500, detail=f"Query failed: {e}") return QueryResponse( results=[ { "chunk_id": c.chunk_id, "content": c.content, "score": c.score, "heading_path": c.heading_path, "source_file": c.source_file, "title": c.title, } for c in chunks ] ) @router.post("/context/build", response_model=ContextBuildResponse) def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse: """Build a full context pack for a prompt.""" try: pack = build_context( user_prompt=req.prompt, project_hint=req.project, budget=req.budget, ) except Exception as e: log.error("context_build_failed", prompt=req.prompt[:100], error=str(e)) raise HTTPException(status_code=500, detail=f"Context build failed: {e}") pack_dict = _pack_to_dict(pack) return ContextBuildResponse( formatted_context=pack.formatted_context, full_prompt=pack.full_prompt, chunks_used=len(pack.chunks_used), total_chars=pack.total_chars, budget=pack.budget, budget_remaining=pack.budget_remaining, duration_ms=pack.duration_ms, chunks=pack_dict["chunks"], ) @router.post("/memory") def api_create_memory(req: MemoryCreateRequest) -> dict: """Create a new memory entry.""" try: mem = create_memory( memory_type=req.memory_type, content=req.content, project=req.project, confidence=req.confidence, status=req.status, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type} @router.get("/memory") def api_get_memories( memory_type: str | None = None, project: str | None = None, active_only: bool = True, min_confidence: float = 0.0, limit: int = 50, status: str | None = None, ) -> dict: """List memories, optionally filtered. When ``status`` is given explicitly it overrides ``active_only`` so the Phase 9 Commit C review queue can be listed via ``GET /memory?status=candidate``. """ try: memories = get_memories( memory_type=memory_type, project=project, active_only=active_only, min_confidence=min_confidence, limit=limit, status=status, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return { "memories": [ { "id": m.id, "memory_type": m.memory_type, "content": m.content, "project": m.project, "confidence": m.confidence, "status": m.status, "reference_count": m.reference_count, "last_referenced_at": m.last_referenced_at, "updated_at": m.updated_at, } for m in memories ], "types": MEMORY_TYPES, "statuses": MEMORY_STATUSES, } @router.put("/memory/{memory_id}") def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict: """Update an existing memory.""" try: success = update_memory( memory_id=memory_id, content=req.content, confidence=req.confidence, status=req.status, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not success: raise HTTPException(status_code=404, detail="Memory not found") return {"status": "updated", "id": memory_id} @router.delete("/memory/{memory_id}") def api_invalidate_memory(memory_id: str) -> dict: """Invalidate a memory (error correction).""" success = invalidate_memory(memory_id) if not success: raise HTTPException(status_code=404, detail="Memory not found") return {"status": "invalidated", "id": memory_id} @router.post("/memory/{memory_id}/promote") def api_promote_memory(memory_id: str) -> dict: """Promote a candidate memory to active (Phase 9 Commit C).""" try: success = promote_memory(memory_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not success: raise HTTPException( status_code=404, detail=f"Memory not found or not a candidate: {memory_id}", ) return {"status": "promoted", "id": memory_id} @router.post("/memory/{memory_id}/reject") def api_reject_candidate_memory(memory_id: str) -> dict: """Reject a candidate memory (Phase 9 Commit C review queue).""" success = reject_candidate_memory(memory_id) if not success: raise HTTPException( status_code=404, detail=f"Memory not found or not a candidate: {memory_id}", ) return {"status": "rejected", "id": memory_id} @router.post("/project/state") def api_set_project_state(req: ProjectStateSetRequest) -> dict: """Set or update a trusted project state entry.""" try: entry = set_state( project_name=req.project, category=req.category, key=req.key, value=req.value, source=req.source, confidence=req.confidence, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: log.error("set_state_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Failed to set state: {e}") return {"status": "ok", "id": entry.id, "category": entry.category, "key": entry.key} @router.get("/project/state/{project_name}") def api_get_project_state(project_name: str, category: str | None = None) -> dict: """Get trusted project state entries.""" entries = get_state(project_name, category=category) return { "project": project_name, "entries": [ { "id": e.id, "category": e.category, "key": e.key, "value": e.value, "source": e.source, "confidence": e.confidence, "status": e.status, "updated_at": e.updated_at, } for e in entries ], "categories": CATEGORIES, } @router.delete("/project/state") def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict: """Invalidate (supersede) a project state entry.""" success = invalidate_state(req.project, req.category, req.key) if not success: raise HTTPException(status_code=404, detail="State entry not found or already invalidated") return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key} class InteractionRecordRequest(BaseModel): prompt: str response: str = "" response_summary: str = "" project: str = "" client: str = "" session_id: str = "" memories_used: list[str] = [] chunks_used: list[str] = [] context_pack: dict | None = None reinforce: bool = True extract: bool = False @router.post("/interactions") def api_record_interaction(req: InteractionRecordRequest) -> dict: """Capture one interaction (prompt + response + what was used). This is the foundation of the AtoCore reflection loop. It records what the system fed to an LLM and what came back. If ``reinforce`` is true (default) and there is response content, the Phase 9 Commit B reinforcement pass runs automatically, bumping the confidence of any active memory echoed in the response. Nothing is ever promoted into trusted state automatically. """ try: interaction = record_interaction( prompt=req.prompt, response=req.response, response_summary=req.response_summary, project=req.project, client=req.client, session_id=req.session_id, memories_used=req.memories_used, chunks_used=req.chunks_used, context_pack=req.context_pack, reinforce=req.reinforce, extract=req.extract, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return { "status": "recorded", "id": interaction.id, "created_at": interaction.created_at, } @router.post("/interactions/{interaction_id}/reinforce") def api_reinforce_interaction(interaction_id: str) -> dict: """Run the reinforcement pass on an already-captured interaction. Useful for backfilling reinforcement over historical interactions, or for retrying after a transient failure in the automatic pass that runs inside ``POST /interactions``. """ interaction = get_interaction(interaction_id) if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") results = reinforce_from_interaction(interaction) return { "interaction_id": interaction_id, "reinforced_count": len(results), "reinforced": [ { "memory_id": r.memory_id, "memory_type": r.memory_type, "old_confidence": round(r.old_confidence, 4), "new_confidence": round(r.new_confidence, 4), } for r in results ], } class InteractionExtractRequest(BaseModel): persist: bool = False mode: str = "rule" # "rule" or "llm" @router.post("/interactions/{interaction_id}/extract") def api_extract_from_interaction( interaction_id: str, req: InteractionExtractRequest | None = None, ) -> dict: """Extract candidate memories from a captured interaction. Phase 9 Commit C. The extractor is rule-based and deliberately conservative — it only surfaces candidates that matched an explicit structural cue (decision heading, preference sentence, etc.). By default the candidates are returned *without* being persisted so a caller can preview them before committing to a review queue. Pass ``persist: true`` to immediately create candidate memories for each extraction result. """ interaction = get_interaction(interaction_id) if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") payload = req or InteractionExtractRequest() if payload.mode == "llm": candidates: list[MemoryCandidate] = extract_candidates_llm(interaction) else: candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction) persisted_ids: list[str] = [] if payload.persist: for candidate in candidates: try: mem = create_memory( memory_type=candidate.memory_type, content=candidate.content, project=candidate.project, confidence=candidate.confidence, status="candidate", ) persisted_ids.append(mem.id) except ValueError as e: log.error( "extract_persist_failed", interaction_id=interaction_id, rule=candidate.rule, error=str(e), ) return { "interaction_id": interaction_id, "candidate_count": len(candidates), "persisted": payload.persist, "persisted_ids": persisted_ids, "extractor_version": EXTRACTOR_VERSION, "candidates": [ { "memory_type": c.memory_type, "content": c.content, "project": c.project, "confidence": c.confidence, "rule": c.rule, "source_span": c.source_span, "extractor_version": c.extractor_version, } for c in candidates ], } @router.get("/interactions") def api_list_interactions( project: str | None = None, session_id: str | None = None, client: str | None = None, since: str | None = None, limit: int = 50, ) -> dict: """List captured interactions, optionally filtered by project, session, client, or creation time. Hard-capped at 500 entries per call.""" interactions = list_interactions( project=project, session_id=session_id, client=client, since=since, limit=limit, ) return { "count": len(interactions), "interactions": [ { "id": i.id, "prompt": i.prompt, "response_summary": i.response_summary, "response_chars": len(i.response), "project": i.project, "client": i.client, "session_id": i.session_id, "memories_used": i.memories_used, "chunks_used": i.chunks_used, "created_at": i.created_at, } for i in interactions ], } @router.get("/interactions/{interaction_id}") def api_get_interaction(interaction_id: str) -> dict: """Fetch a single interaction with the full response and context pack.""" interaction = get_interaction(interaction_id) if interaction is None: raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}") return { "id": interaction.id, "prompt": interaction.prompt, "response": interaction.response, "response_summary": interaction.response_summary, "project": interaction.project, "client": interaction.client, "session_id": interaction.session_id, "memories_used": interaction.memories_used, "chunks_used": interaction.chunks_used, "context_pack": interaction.context_pack, "created_at": interaction.created_at, } class BackupCreateRequest(BaseModel): include_chroma: bool = False @router.post("/admin/backup") def api_create_backup(req: BackupCreateRequest | None = None) -> dict: """Create a runtime backup snapshot. When ``include_chroma`` is true the call holds the ingestion lock so a safe cold copy of the vector store can be taken without racing against refresh or ingest endpoints. """ payload = req or BackupCreateRequest() try: if payload.include_chroma: with exclusive_ingestion(): metadata = create_runtime_backup(include_chroma=True) else: metadata = create_runtime_backup(include_chroma=False) except Exception as e: log.error("admin_backup_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Backup failed: {e}") return metadata @router.get("/admin/backup") def api_list_backups() -> dict: """List all runtime backups under the configured backup directory.""" return { "backup_dir": str(_config.settings.resolved_backup_dir), "backups": list_runtime_backups(), } class BackupCleanupRequest(BaseModel): confirm: bool = False @router.post("/admin/backup/cleanup") def api_cleanup_backups(req: BackupCleanupRequest | None = None) -> dict: """Apply retention policy to old backup snapshots. Dry-run by default. Pass ``confirm: true`` to actually delete. Retention: last 7 daily, last 4 weekly (Sundays), last 6 monthly (1st). """ payload = req or BackupCleanupRequest() try: return cleanup_old_backups(confirm=payload.confirm) except Exception as e: log.error("admin_cleanup_failed", error=str(e)) raise HTTPException(status_code=500, detail=f"Cleanup failed: {e}") class ExtractBatchRequest(BaseModel): since: str | None = None mode: str = "llm" limit: int = 50 persist: bool = True @router.post("/admin/extract-batch") def api_extract_batch(req: ExtractBatchRequest | None = None) -> dict: """Run batch extraction across recent interactions. Fetches interactions since ``since`` (or since the last recorded batch run), runs the extractor (rule or LLM) on each, and persists any candidates as ``status=candidate``. The last-run timestamp is stored in project state under ``atocore / status / last_extract_batch_run`` so subsequent calls without ``since`` automatically pick up where the last run left off. This endpoint is the operational home for R1 / R5 — it makes the LLM extractor accessible as an API operation rather than a script-only eval tool. Still NOT on the capture hot path: callers invoke this endpoint explicitly (cron, manual curl, CLI). """ payload = req or ExtractBatchRequest() if payload.mode == "llm" and not _llm_cli_available(): raise HTTPException( status_code=503, detail=( "LLM extraction unavailable in this runtime: the `claude` CLI " "is not on PATH. Run host-side via " "`scripts/batch_llm_extract_live.py` instead, or call this " "endpoint with mode=\"rule\"." ), ) since = payload.since if not since: state_entries = get_state("atocore") for entry in state_entries: if entry.category == "status" and entry.key == "last_extract_batch_run": since = entry.value break interactions = list_interactions(since=since, limit=min(payload.limit, 200)) processed = 0 total_candidates = 0 total_persisted = 0 errors: list[dict] = [] for interaction in interactions: if not (interaction.response or interaction.response_summary): continue try: if payload.mode == "llm": candidates = extract_candidates_llm(interaction) else: candidates = extract_candidates_from_interaction(interaction) except Exception as exc: errors.append({"interaction_id": interaction.id, "error": str(exc)}) continue processed += 1 total_candidates += len(candidates) if payload.persist and candidates: for candidate in candidates: try: create_memory( memory_type=candidate.memory_type, content=candidate.content, project=candidate.project, confidence=candidate.confidence, status="candidate", ) total_persisted += 1 except ValueError: pass # duplicate — skip silently from datetime import datetime, timezone now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") try: set_state( project="atocore", category="status", key="last_extract_batch_run", value=now, source="admin/extract-batch endpoint", ) except Exception: pass # best-effort timestamp tracking log.info( "extract_batch_complete", mode=payload.mode, processed=processed, total_candidates=total_candidates, total_persisted=total_persisted, errors=len(errors), ) return { "processed": processed, "total_candidates": total_candidates, "total_persisted": total_persisted, "mode": payload.mode, "persist": payload.persist, "since": since or "(first run)", "errors": errors, } @router.get("/admin/dashboard") def api_dashboard() -> dict: """One-shot system observability dashboard. Returns memory counts by type/project/status, project state entry counts, interaction volume by client, pipeline health (harness, triage stats, last run), and extraction pipeline status — everything an operator needs to understand AtoCore's health beyond the basic /health endpoint. """ import json as _json from collections import Counter from datetime import datetime as _dt, timezone as _tz all_memories = get_memories(active_only=False, limit=500) active = [m for m in all_memories if m.status == "active"] candidates = [m for m in all_memories if m.status == "candidate"] type_counts = dict(Counter(m.memory_type for m in active)) project_counts = dict(Counter(m.project or "(none)" for m in active)) reinforced = [m for m in active if m.reference_count > 0] # Interaction stats — total + by_client from DB directly interaction_stats: dict = {"most_recent": None, "total": 0, "by_client": {}} try: from atocore.models.database import get_connection as _gc with _gc() as conn: row = conn.execute("SELECT count(*) FROM interactions").fetchone() interaction_stats["total"] = row[0] if row else 0 rows = conn.execute( "SELECT client, count(*) FROM interactions GROUP BY client" ).fetchall() interaction_stats["by_client"] = {r[0]: r[1] for r in rows} row = conn.execute( "SELECT created_at FROM interactions ORDER BY created_at DESC LIMIT 1" ).fetchone() interaction_stats["most_recent"] = row[0] if row else None except Exception: interactions = list_interactions(limit=1) interaction_stats["most_recent"] = ( interactions[0].created_at if interactions else None ) # Pipeline health from project state pipeline: dict = {} extract_state: dict = {} try: state_entries = get_state("atocore") for entry in state_entries: if entry.category != "status": continue if entry.key == "last_extract_batch_run": extract_state["last_run"] = entry.value elif entry.key == "pipeline_last_run": pipeline["last_run"] = entry.value try: last = _dt.fromisoformat(entry.value.replace("Z", "+00:00")) delta = _dt.now(_tz.utc) - last pipeline["hours_since_last_run"] = round( delta.total_seconds() / 3600, 1 ) except Exception: pass elif entry.key == "pipeline_summary": try: pipeline["summary"] = _json.loads(entry.value) except Exception: pipeline["summary_raw"] = entry.value elif entry.key == "retrieval_harness_result": try: pipeline["harness"] = _json.loads(entry.value) except Exception: pipeline["harness_raw"] = entry.value except Exception: pass # Project state counts — include all registered projects ps_counts = {} try: from atocore.projects.registry import load_project_registry as _lpr for proj in _lpr(): try: entries = get_state(proj.project_id) ps_counts[proj.project_id] = len(entries) except Exception: pass except Exception: for proj_id in [ "p04-gigabit", "p05-interferometer", "p06-polisher", "atocore", ]: try: entries = get_state(proj_id) ps_counts[proj_id] = len(entries) except Exception: pass return { "memories": { "active": len(active), "candidates": len(candidates), "by_type": type_counts, "by_project": project_counts, "reinforced": len(reinforced), }, "project_state": { "counts": ps_counts, "total": sum(ps_counts.values()), }, "interactions": interaction_stats, "extraction_pipeline": extract_state, "pipeline": pipeline, } # --- Engineering Knowledge Layer (Layer 2) --- class EntityCreateRequest(BaseModel): entity_type: str name: str project: str = "" description: str = "" properties: dict | None = None status: str = "active" confidence: float = 1.0 source_refs: list[str] | None = None class RelationshipCreateRequest(BaseModel): source_entity_id: str target_entity_id: str relationship_type: str confidence: float = 1.0 source_refs: list[str] | None = None @router.post("/entities") def api_create_entity(req: EntityCreateRequest) -> dict: """Create a new engineering entity.""" try: entity = create_entity( entity_type=req.entity_type, name=req.name, project=req.project, description=req.description, properties=req.properties, status=req.status, confidence=req.confidence, source_refs=req.source_refs, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return {"status": "ok", "id": entity.id, "entity_type": entity.entity_type, "name": entity.name} @router.get("/entities") def api_list_entities( entity_type: str | None = None, project: str | None = None, status: str = "active", name_contains: str | None = None, limit: int = 100, ) -> dict: """List engineering entities with optional filters.""" entities = get_entities( entity_type=entity_type, project=project, status=status, name_contains=name_contains, limit=limit, ) return { "entities": [ { "id": e.id, "entity_type": e.entity_type, "name": e.name, "project": e.project, "description": e.description, "properties": e.properties, "status": e.status, "confidence": e.confidence, } for e in entities ], "count": len(entities), } @router.get("/entities/{entity_id}") def api_get_entity(entity_id: str) -> dict: """Get an entity with its relationships and related entities.""" result = get_entity_with_context(entity_id) if result is None: raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}") entity = result["entity"] return { "entity": { "id": entity.id, "entity_type": entity.entity_type, "name": entity.name, "project": entity.project, "description": entity.description, "properties": entity.properties, "status": entity.status, "confidence": entity.confidence, "source_refs": entity.source_refs, "created_at": entity.created_at, "updated_at": entity.updated_at, }, "relationships": [ { "id": r.id, "source_entity_id": r.source_entity_id, "target_entity_id": r.target_entity_id, "relationship_type": r.relationship_type, "confidence": r.confidence, } for r in result["relationships"] ], "related_entities": { eid: { "entity_type": e.entity_type, "name": e.name, "project": e.project, "description": e.description[:200], } for eid, e in result["related_entities"].items() }, } @router.post("/relationships") def api_create_relationship(req: RelationshipCreateRequest) -> dict: """Create a relationship between two entities.""" try: rel = create_relationship( source_entity_id=req.source_entity_id, target_entity_id=req.target_entity_id, relationship_type=req.relationship_type, confidence=req.confidence, source_refs=req.source_refs, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return { "status": "ok", "id": rel.id, "relationship_type": rel.relationship_type, } @router.get("/projects/{project_name}/mirror.html", response_class=HTMLResponse) def api_project_mirror_html(project_name: str) -> HTMLResponse: """Serve a readable HTML project overview page. Open in a browser for a clean, styled project dashboard derived from AtoCore's structured data. Source of truth is the database — this page is a derived view. """ from atocore.projects.registry import resolve_project_name as _resolve canonical = _resolve(project_name) try: md_content = generate_project_overview(canonical) except Exception as e: raise HTTPException(status_code=500, detail=f"Mirror generation failed: {e}") import markdown html_body = markdown.markdown(md_content, extensions=["tables", "fenced_code"]) html = _MIRROR_HTML_TEMPLATE.replace("{{title}}", f"{canonical} — AtoCore Mirror") html = html.replace("{{body}}", html_body) return HTMLResponse(content=html) _MIRROR_HTML_TEMPLATE = """