"""Phase 5 Engineering V1 — The 10 canonical queries. Each function maps to one or more catalog IDs in ``docs/architecture/engineering-query-catalog.md``. Return values are plain dicts so API and wiki renderers can consume them without importing dataclasses. Design principles: - All queries filter to status='active' unless the caller asks otherwise - All project filters go through ``resolve_project_name`` (canonicalization) - Graph traversals are bounded (depth <= 3 for impact, limit 200 for lists) - The 3 "killer" queries (gaps) accept project as required — gaps are always scoped to one project in V1 These queries are the *useful surface* of the entity graph. Before this module, the graph was data with no narrative; after this module, the director can ask real questions about coverage, risk, and evidence. """ from __future__ import annotations from datetime import datetime, timezone from atocore.engineering.service import ( Entity, _row_to_entity, get_entity, get_relationships, ) from atocore.models.database import get_connection from atocore.projects.registry import resolve_project_name # ============================================================ # Structure queries (Q-001, Q-004, Q-005, Q-008) # ============================================================ def system_map(project: str) -> dict: """Q-001 + Q-004: return the full subsystem/component tree for a project. Shape: { "project": "p05-interferometer", "subsystems": [ { "id": ..., "name": ..., "description": ..., "components": [{id, name, description, materials: [...]}], }, ... ], "orphan_components": [...], # components with no PART_OF edge } """ project = resolve_project_name(project) if project else "" out: dict = {"project": project, "subsystems": [], "orphan_components": []} with get_connection() as conn: # All subsystems in project subsys_rows = conn.execute( "SELECT * FROM entities WHERE status = 'active' " "AND project = ? AND entity_type = 'subsystem' " "ORDER BY name", (project,), ).fetchall() # All components in project comp_rows = conn.execute( "SELECT * FROM entities WHERE status = 'active' " "AND project = ? AND entity_type = 'component'", (project,), ).fetchall() # PART_OF edges: component → subsystem part_of_rows = conn.execute( "SELECT source_entity_id, target_entity_id FROM relationships " "WHERE relationship_type = 'part_of'" ).fetchall() part_of_map: dict[str, str] = { r["source_entity_id"]: r["target_entity_id"] for r in part_of_rows } # uses_material edges for components mat_rows = conn.execute( "SELECT r.source_entity_id, e.name FROM relationships r " "JOIN entities e ON e.id = r.target_entity_id " "WHERE r.relationship_type = 'uses_material' AND e.status = 'active'" ).fetchall() materials_by_comp: dict[str, list[str]] = {} for r in mat_rows: materials_by_comp.setdefault(r["source_entity_id"], []).append(r["name"]) # Build: subsystems → their components subsys_comps: dict[str, list[dict]] = {s["id"]: [] for s in subsys_rows} orphans: list[dict] = [] for c in comp_rows: parent = part_of_map.get(c["id"]) comp_dict = { "id": c["id"], "name": c["name"], "description": c["description"] or "", "materials": materials_by_comp.get(c["id"], []), } if parent and parent in subsys_comps: subsys_comps[parent].append(comp_dict) else: orphans.append(comp_dict) out["subsystems"] = [ { "id": s["id"], "name": s["name"], "description": s["description"] or "", "components": subsys_comps.get(s["id"], []), } for s in subsys_rows ] out["orphan_components"] = orphans return out def decisions_affecting(project: str, subsystem_id: str | None = None) -> dict: """Q-008: decisions that affect a subsystem (or whole project). Walks AFFECTED_BY_DECISION edges. If subsystem_id is given, returns decisions linked to that subsystem or any of its components. Otherwise, all decisions in the project. """ project = resolve_project_name(project) if project else "" target_ids: set[str] = set() if subsystem_id: target_ids.add(subsystem_id) # Include components PART_OF the subsystem with get_connection() as conn: rows = conn.execute( "SELECT source_entity_id FROM relationships " "WHERE relationship_type = 'part_of' AND target_entity_id = ?", (subsystem_id,), ).fetchall() for r in rows: target_ids.add(r["source_entity_id"]) with get_connection() as conn: if target_ids: placeholders = ",".join("?" * len(target_ids)) rows = conn.execute( f"SELECT DISTINCT e.* FROM entities e " f"JOIN relationships r ON r.source_entity_id = e.id " f"WHERE e.status = 'active' AND e.entity_type = 'decision' " f"AND e.project = ? AND r.relationship_type = 'affected_by_decision' " f"AND r.target_entity_id IN ({placeholders}) " f"ORDER BY e.updated_at DESC", (project, *target_ids), ).fetchall() else: rows = conn.execute( "SELECT * FROM entities WHERE status = 'active' " "AND entity_type = 'decision' AND project = ? " "ORDER BY updated_at DESC LIMIT 200", (project,), ).fetchall() decisions = [_entity_dict(_row_to_entity(r)) for r in rows] return { "project": project, "subsystem_id": subsystem_id or "", "decisions": decisions, "count": len(decisions), } def requirements_for(component_id: str) -> dict: """Q-005: requirements that a component satisfies.""" with get_connection() as conn: # Component → SATISFIES → Requirement rows = conn.execute( "SELECT e.* FROM entities e " "JOIN relationships r ON r.target_entity_id = e.id " "WHERE r.source_entity_id = ? AND r.relationship_type = 'satisfies' " "AND e.entity_type = 'requirement' AND e.status = 'active' " "ORDER BY e.name", (component_id,), ).fetchall() requirements = [_entity_dict(_row_to_entity(r)) for r in rows] return { "component_id": component_id, "requirements": requirements, "count": len(requirements), } def recent_changes(project: str, since: str | None = None, limit: int = 50) -> dict: """Q-013: what changed recently in the project (entity audit log). Uses the shared memory_audit table filtered by entity_kind='entity' and joins back to entities for the project scope. """ project = resolve_project_name(project) if project else "" since = since or "2020-01-01" with get_connection() as conn: rows = conn.execute( "SELECT a.id, a.memory_id AS entity_id, a.action, a.actor, " "a.timestamp, a.note, e.entity_type, e.name, e.project " "FROM memory_audit a " "LEFT JOIN entities e ON e.id = a.memory_id " "WHERE a.entity_kind = 'entity' AND a.timestamp >= ? " "AND (e.project = ? OR e.project IS NULL) " "ORDER BY a.timestamp DESC LIMIT ?", (since, project, limit), ).fetchall() changes = [] for r in rows: changes.append({ "audit_id": r["id"], "entity_id": r["entity_id"], "entity_type": r["entity_type"] or "?", "entity_name": r["name"] or "(deleted)", "action": r["action"], "actor": r["actor"] or "api", "note": r["note"] or "", "timestamp": r["timestamp"], }) return {"project": project, "since": since, "changes": changes, "count": len(changes)} # ============================================================ # Killer queries (Q-006, Q-009, Q-011) — the "what am I forgetting?" queries # ============================================================ def orphan_requirements(project: str) -> dict: """Q-006: requirements in project with NO inbound SATISFIES edge. These are "something we said must be true" with nothing actually satisfying them. The single highest-value query for an engineering director: shows what's unclaimed by design. """ project = resolve_project_name(project) if project else "" with get_connection() as conn: rows = conn.execute( "SELECT * FROM entities WHERE status = 'active' " "AND project = ? AND entity_type = 'requirement' " "AND NOT EXISTS (" " SELECT 1 FROM relationships r " " WHERE r.relationship_type = 'satisfies' " " AND r.target_entity_id = entities.id" ") " "ORDER BY updated_at DESC", (project,), ).fetchall() orphans = [_entity_dict(_row_to_entity(r)) for r in rows] return { "project": project, "query": "Q-006 orphan requirements", "description": "Requirements with no SATISFIES relationship — nothing claims to meet them.", "gaps": orphans, "count": len(orphans), } def risky_decisions(project: str) -> dict: """Q-009: decisions linked to assumptions flagged as unresolved. Walks BASED_ON_ASSUMPTION edges. An assumption is "flagged" if its properties.flagged=True OR status='superseded' OR status='invalid'. """ project = resolve_project_name(project) if project else "" with get_connection() as conn: rows = conn.execute( "SELECT DISTINCT d.*, a.name AS assumption_name, a.id AS assumption_id, " "a.status AS assumption_status, a.properties AS assumption_props " "FROM entities d " "JOIN relationships r ON r.source_entity_id = d.id " "JOIN entities a ON a.id = r.target_entity_id " "WHERE d.status = 'active' AND d.entity_type = 'decision' " "AND d.project = ? " "AND r.relationship_type = 'based_on_assumption' " "AND (" " a.status IN ('superseded', 'invalid') OR " " a.properties LIKE '%\"flagged\": true%' OR " " a.properties LIKE '%\"flagged\":true%'" ") " "ORDER BY d.updated_at DESC", (project,), ).fetchall() risky = [] for r in rows: risky.append({ "decision_id": r["id"], "decision_name": r["name"], "decision_description": r["description"] or "", "assumption_id": r["assumption_id"], "assumption_name": r["assumption_name"], "assumption_status": r["assumption_status"], }) return { "project": project, "query": "Q-009 risky decisions", "description": "Decisions based on assumptions that are flagged, superseded, or invalid.", "gaps": risky, "count": len(risky), } def unsupported_claims(project: str) -> dict: """Q-011: validation claims with NO inbound SUPPORTS edge. These are asserted claims (e.g., "margin is adequate") with no Result entity actually supporting them. High-risk: the engineer believes it, but there's no evidence on file. """ project = resolve_project_name(project) if project else "" with get_connection() as conn: rows = conn.execute( "SELECT * FROM entities WHERE status = 'active' " "AND project = ? AND entity_type = 'validation_claim' " "AND NOT EXISTS (" " SELECT 1 FROM relationships r " " WHERE r.relationship_type = 'supports' " " AND r.target_entity_id = entities.id" ") " "ORDER BY updated_at DESC", (project,), ).fetchall() claims = [_entity_dict(_row_to_entity(r)) for r in rows] return { "project": project, "query": "Q-011 unsupported claims", "description": "Validation claims with no supporting Result — asserted but not evidenced.", "gaps": claims, "count": len(claims), } def all_gaps(project: str) -> dict: """Combined: run Q-006, Q-009, Q-011 for a project in one go.""" return { "project": resolve_project_name(project) if project else "", "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "orphan_requirements": orphan_requirements(project), "risky_decisions": risky_decisions(project), "unsupported_claims": unsupported_claims(project), } # ============================================================ # History + impact (Q-016, Q-017) # ============================================================ def impact_analysis(entity_id: str, max_depth: int = 3) -> dict: """Q-016: transitive outbound reach of an entity. Walks outbound edges breadth-first to max_depth. Answers "what would be affected if I changed component X?" by finding everything downstream. """ visited: set[str] = {entity_id} impacted: list[dict] = [] frontier = [(entity_id, 0)] while frontier: current_id, depth = frontier.pop(0) if depth >= max_depth: continue with get_connection() as conn: rows = conn.execute( "SELECT r.relationship_type, r.target_entity_id, " "e.entity_type, e.name, e.status " "FROM relationships r " "JOIN entities e ON e.id = r.target_entity_id " "WHERE r.source_entity_id = ? AND e.status = 'active'", (current_id,), ).fetchall() for r in rows: tid = r["target_entity_id"] if tid in visited: continue visited.add(tid) impacted.append({ "entity_id": tid, "entity_type": r["entity_type"], "name": r["name"], "relationship": r["relationship_type"], "depth": depth + 1, }) frontier.append((tid, depth + 1)) root = get_entity(entity_id) return { "root": _entity_dict(root) if root else None, "impacted_count": len(impacted), "impacted": impacted, "max_depth": max_depth, } def evidence_chain(entity_id: str) -> dict: """Q-017: what evidence supports this entity? Walks inbound SUPPORTS / EVIDENCED_BY / DESCRIBED_BY edges to surface the provenance chain: "this claim is supported by that result, which was produced by that analysis model, which was described by that doc." """ provenance_edges = ("supports", "evidenced_by", "described_by", "validated_by", "analyzed_by") placeholders = ",".join("?" * len(provenance_edges)) with get_connection() as conn: # Inbound edges of the provenance family inbound_rows = conn.execute( f"SELECT r.relationship_type, r.source_entity_id, " f"e.entity_type, e.name, e.description, e.status " f"FROM relationships r " f"JOIN entities e ON e.id = r.source_entity_id " f"WHERE r.target_entity_id = ? AND e.status = 'active' " f"AND r.relationship_type IN ({placeholders})", (entity_id, *provenance_edges), ).fetchall() # Also look at source_refs on the entity itself root = get_entity(entity_id) chain = [] for r in inbound_rows: chain.append({ "via": r["relationship_type"], "source_id": r["source_entity_id"], "source_type": r["entity_type"], "source_name": r["name"], "source_description": (r["description"] or "")[:200], }) return { "root": _entity_dict(root) if root else None, "direct_source_refs": root.source_refs if root else [], "evidence_chain": chain, "count": len(chain), } # ============================================================ # Helpers # ============================================================ def _entity_dict(e: Entity) -> dict: """Flatten an Entity to a public-API dict.""" return { "id": e.id, "entity_type": e.entity_type, "name": e.name, "project": e.project, "description": e.description, "properties": e.properties, "status": e.status, "confidence": e.confidence, "source_refs": e.source_refs, "updated_at": e.updated_at, }