diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index 18ae5e7..66b77d6 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -1327,6 +1327,86 @@ def api_list_entities( } +# --- Phase 5 Engineering V1: The 10 canonical queries --- + + +@router.get("/engineering/projects/{project_name}/systems") +def api_system_map(project_name: str) -> dict: + """Q-001 + Q-004: subsystem/component tree for a project.""" + from atocore.engineering.queries import system_map + return system_map(project_name) + + +@router.get("/engineering/decisions") +def api_decisions_affecting( + project: str, + subsystem: str | None = None, +) -> dict: + """Q-008: decisions affecting a subsystem (or the whole project).""" + from atocore.engineering.queries import decisions_affecting + return decisions_affecting(project, subsystem_id=subsystem) + + +@router.get("/engineering/components/{component_id}/requirements") +def api_requirements_for_component(component_id: str) -> dict: + """Q-005: requirements that a component satisfies.""" + from atocore.engineering.queries import requirements_for + return requirements_for(component_id) + + +@router.get("/engineering/changes") +def api_recent_engineering_changes( + project: str, + since: str | None = None, + limit: int = 50, +) -> dict: + """Q-013: entity changes in project since timestamp.""" + from atocore.engineering.queries import recent_changes + return recent_changes(project, since=since, limit=limit) + + +@router.get("/engineering/gaps/orphan-requirements") +def api_orphan_requirements(project: str) -> dict: + """Q-006 (killer): requirements with no SATISFIES edge.""" + from atocore.engineering.queries import orphan_requirements + return orphan_requirements(project) + + +@router.get("/engineering/gaps/risky-decisions") +def api_risky_decisions(project: str) -> dict: + """Q-009 (killer): decisions resting on flagged/superseded assumptions.""" + from atocore.engineering.queries import risky_decisions + return risky_decisions(project) + + +@router.get("/engineering/gaps/unsupported-claims") +def api_unsupported_claims(project: str) -> dict: + """Q-011 (killer): validation claims with no SUPPORTS edge.""" + from atocore.engineering.queries import unsupported_claims + return unsupported_claims(project) + + +@router.get("/engineering/gaps") +def api_all_gaps(project: str) -> dict: + """Combined Q-006 + Q-009 + Q-011 for a project.""" + from atocore.engineering.queries import all_gaps + return all_gaps(project) + + +@router.get("/engineering/impact") +def api_impact_analysis(entity: str, max_depth: int = 3) -> dict: + """Q-016: transitive outbound impact of changing an entity.""" + from atocore.engineering.queries import impact_analysis + return impact_analysis(entity, max_depth=max_depth) + + +@router.get("/engineering/evidence") +def api_evidence_chain(entity: str) -> dict: + """Q-017: inbound evidence chain for an entity.""" + from atocore.engineering.queries import evidence_chain + return evidence_chain(entity) + + @router.post("/entities/{entity_id}/promote") def api_promote_entity(entity_id: str) -> dict: """Promote a candidate entity to active (Phase 5 Engineering V1).""" diff --git a/src/atocore/engineering/mirror.py b/src/atocore/engineering/mirror.py index a9d81a0..cbc1d4d 100644 --- a/src/atocore/engineering/mirror.py +++ b/src/atocore/engineering/mirror.py @@ -29,6 +29,7 @@ def generate_project_overview(project: str) -> str: sections = [ _header(project), _synthesis_section(project), + _gaps_section(project), # Phase 5: killer queries surface here _state_section(project), _system_architecture(project), _decisions_section(project), @@ -41,6 +42,66 @@ def generate_project_overview(project: str) -> str: return "\n\n".join(s for s in sections if s) +def _gaps_section(project: str) -> str: + """Phase 5: surface the 3 killer-query gaps on every project page. + + If any gap is non-empty, it appears near the top so the director + sees "what am I forgetting?" before the rest of the report. + """ + try: + from atocore.engineering.queries import all_gaps + result = all_gaps(project) + except Exception: + return "" + + orphan = result["orphan_requirements"]["count"] + risky = result["risky_decisions"]["count"] + unsup = result["unsupported_claims"]["count"] + + if orphan == 0 and risky == 0 and unsup == 0: + return ( + "## Coverage Gaps\n\n" + "> ✅ No gaps detected: every requirement is satisfied, " + "no decisions rest on flagged assumptions, every claim has evidence.\n" + ) + + lines = ["## Coverage Gaps", ""] + lines.append( + "> ⚠️ Items below need attention — gaps in the engineering graph.\n" + ) + + if orphan: + lines.append(f"### {orphan} Orphan Requirement(s)") + lines.append("*Requirements with no component claiming to satisfy them:*") + lines.append("") + for r in result["orphan_requirements"]["gaps"][:10]: + lines.append(f"- **{r['name']}** — {(r['description'] or '')[:120]}") + if orphan > 10: + lines.append(f"- _...and {orphan - 10} more_") + lines.append("") + + if risky: + lines.append(f"### {risky} Risky Decision(s)") + lines.append("*Decisions based on assumptions that are flagged, superseded, or invalid:*") + lines.append("") + for d in result["risky_decisions"]["gaps"][:10]: + lines.append( + f"- **{d['decision_name']}** — based on flagged assumption " + f"_{d['assumption_name']}_ ({d['assumption_status']})" + ) + lines.append("") + + if unsup: + lines.append(f"### {unsup} Unsupported Claim(s)") + lines.append("*Validation claims with no supporting Result entity:*") + lines.append("") + for c in result["unsupported_claims"]["gaps"][:10]: + lines.append(f"- **{c['name']}** — {(c['description'] or '')[:120]}") + lines.append("") + + return "\n".join(lines) + + def _synthesis_section(project: str) -> str: """Generate a short LLM synthesis of the current project state. diff --git a/src/atocore/engineering/queries.py b/src/atocore/engineering/queries.py new file mode 100644 index 0000000..8580102 --- /dev/null +++ b/src/atocore/engineering/queries.py @@ -0,0 +1,467 @@ +"""Phase 5 Engineering V1 — The 10 canonical queries. + +Each function maps to one or more catalog IDs in +``docs/architecture/engineering-query-catalog.md``. Return values are plain +dicts so API and wiki renderers can consume them without importing dataclasses. + +Design principles: + - All queries filter to status='active' unless the caller asks otherwise + - All project filters go through ``resolve_project_name`` (canonicalization) + - Graph traversals are bounded (depth <= 3 for impact, limit 200 for lists) + - The 3 "killer" queries (gaps) accept project as required — gaps are always + scoped to one project in V1 + +These queries are the *useful surface* of the entity graph. Before this module, +the graph was data with no narrative; after this module, the director can ask +real questions about coverage, risk, and evidence. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +from atocore.engineering.service import ( + Entity, + _row_to_entity, + get_entity, + get_relationships, +) +from atocore.models.database import get_connection +from atocore.projects.registry import resolve_project_name + + +# ============================================================ +# Structure queries (Q-001, Q-004, Q-005, Q-008) +# ============================================================ + + +def system_map(project: str) -> dict: + """Q-001 + Q-004: return the full subsystem/component tree for a project. + + Shape: + { + "project": "p05-interferometer", + "subsystems": [ + { + "id": ..., "name": ..., "description": ..., + "components": [{id, name, description, materials: [...]}], + }, + ... + ], + "orphan_components": [...], # components with no PART_OF edge + } + """ + project = resolve_project_name(project) if project else "" + out: dict = {"project": project, "subsystems": [], "orphan_components": []} + + with get_connection() as conn: + # All subsystems in project + subsys_rows = conn.execute( + "SELECT * FROM entities WHERE status = 'active' " + "AND project = ? AND entity_type = 'subsystem' " + "ORDER BY name", + (project,), + ).fetchall() + + # All components in project + comp_rows = conn.execute( + "SELECT * FROM entities WHERE status = 'active' " + "AND project = ? AND entity_type = 'component'", + (project,), + ).fetchall() + + # PART_OF edges: component → subsystem + part_of_rows = conn.execute( + "SELECT source_entity_id, target_entity_id FROM relationships " + "WHERE relationship_type = 'part_of'" + ).fetchall() + part_of_map: dict[str, str] = { + r["source_entity_id"]: r["target_entity_id"] for r in part_of_rows + } + + # uses_material edges for components + mat_rows = conn.execute( + "SELECT r.source_entity_id, e.name FROM relationships r " + "JOIN entities e ON e.id = r.target_entity_id " + "WHERE r.relationship_type = 'uses_material' AND e.status = 'active'" + ).fetchall() + materials_by_comp: dict[str, list[str]] = {} + for r in mat_rows: + materials_by_comp.setdefault(r["source_entity_id"], []).append(r["name"]) + + # Build: subsystems → their components + subsys_comps: dict[str, list[dict]] = {s["id"]: [] for s in subsys_rows} + orphans: list[dict] = [] + for c in comp_rows: + parent = part_of_map.get(c["id"]) + comp_dict = { + "id": c["id"], + "name": c["name"], + "description": c["description"] or "", + "materials": materials_by_comp.get(c["id"], []), + } + if parent and parent in subsys_comps: + subsys_comps[parent].append(comp_dict) + else: + orphans.append(comp_dict) + + out["subsystems"] = [ + { + "id": s["id"], + "name": s["name"], + "description": s["description"] or "", + "components": subsys_comps.get(s["id"], []), + } + for s in subsys_rows + ] + out["orphan_components"] = orphans + return out + + +def decisions_affecting(project: str, subsystem_id: str | None = None) -> dict: + """Q-008: decisions that affect a subsystem (or whole project). + + Walks AFFECTED_BY_DECISION edges. If subsystem_id is given, returns + decisions linked to that subsystem or any of its components. Otherwise, + all decisions in the project. + """ + project = resolve_project_name(project) if project else "" + + target_ids: set[str] = set() + if subsystem_id: + target_ids.add(subsystem_id) + # Include components PART_OF the subsystem + with get_connection() as conn: + rows = conn.execute( + "SELECT source_entity_id FROM relationships " + "WHERE relationship_type = 'part_of' AND target_entity_id = ?", + (subsystem_id,), + ).fetchall() + for r in rows: + target_ids.add(r["source_entity_id"]) + + with get_connection() as conn: + if target_ids: + placeholders = ",".join("?" * len(target_ids)) + rows = conn.execute( + f"SELECT DISTINCT e.* FROM entities e " + f"JOIN relationships r ON r.source_entity_id = e.id " + f"WHERE e.status = 'active' AND e.entity_type = 'decision' " + f"AND e.project = ? AND r.relationship_type = 'affected_by_decision' " + f"AND r.target_entity_id IN ({placeholders}) " + f"ORDER BY e.updated_at DESC", + (project, *target_ids), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM entities WHERE status = 'active' " + "AND entity_type = 'decision' AND project = ? " + "ORDER BY updated_at DESC LIMIT 200", + (project,), + ).fetchall() + + decisions = [_entity_dict(_row_to_entity(r)) for r in rows] + return { + "project": project, + "subsystem_id": subsystem_id or "", + "decisions": decisions, + "count": len(decisions), + } + + +def requirements_for(component_id: str) -> dict: + """Q-005: requirements that a component satisfies.""" + with get_connection() as conn: + # Component → SATISFIES → Requirement + rows = conn.execute( + "SELECT e.* FROM entities e " + "JOIN relationships r ON r.target_entity_id = e.id " + "WHERE r.source_entity_id = ? AND r.relationship_type = 'satisfies' " + "AND e.entity_type = 'requirement' AND e.status = 'active' " + "ORDER BY e.name", + (component_id,), + ).fetchall() + requirements = [_entity_dict(_row_to_entity(r)) for r in rows] + return { + "component_id": component_id, + "requirements": requirements, + "count": len(requirements), + } + + +def recent_changes(project: str, since: str | None = None, limit: int = 50) -> dict: + """Q-013: what changed recently in the project (entity audit log). + + Uses the shared memory_audit table filtered by entity_kind='entity' and + joins back to entities for the project scope. + """ + project = resolve_project_name(project) if project else "" + since = since or "2020-01-01" + + with get_connection() as conn: + rows = conn.execute( + "SELECT a.id, a.memory_id AS entity_id, a.action, a.actor, " + "a.timestamp, a.note, e.entity_type, e.name, e.project " + "FROM memory_audit a " + "LEFT JOIN entities e ON e.id = a.memory_id " + "WHERE a.entity_kind = 'entity' AND a.timestamp >= ? " + "AND (e.project = ? OR e.project IS NULL) " + "ORDER BY a.timestamp DESC LIMIT ?", + (since, project, limit), + ).fetchall() + + changes = [] + for r in rows: + changes.append({ + "audit_id": r["id"], + "entity_id": r["entity_id"], + "entity_type": r["entity_type"] or "?", + "entity_name": r["name"] or "(deleted)", + "action": r["action"], + "actor": r["actor"] or "api", + "note": r["note"] or "", + "timestamp": r["timestamp"], + }) + return {"project": project, "since": since, "changes": changes, "count": len(changes)} + + +# ============================================================ +# Killer queries (Q-006, Q-009, Q-011) — the "what am I forgetting?" queries +# ============================================================ + + +def orphan_requirements(project: str) -> dict: + """Q-006: requirements in project with NO inbound SATISFIES edge. + + These are "something we said must be true" with nothing actually + satisfying them. The single highest-value query for an engineering + director: shows what's unclaimed by design. + """ + project = resolve_project_name(project) if project else "" + + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM entities WHERE status = 'active' " + "AND project = ? AND entity_type = 'requirement' " + "AND NOT EXISTS (" + " SELECT 1 FROM relationships r " + " WHERE r.relationship_type = 'satisfies' " + " AND r.target_entity_id = entities.id" + ") " + "ORDER BY updated_at DESC", + (project,), + ).fetchall() + + orphans = [_entity_dict(_row_to_entity(r)) for r in rows] + return { + "project": project, + "query": "Q-006 orphan requirements", + "description": "Requirements with no SATISFIES relationship — nothing claims to meet them.", + "gaps": orphans, + "count": len(orphans), + } + + +def risky_decisions(project: str) -> dict: + """Q-009: decisions linked to assumptions flagged as unresolved. + + Walks BASED_ON_ASSUMPTION edges. An assumption is "flagged" if its + properties.flagged=True OR status='superseded' OR status='invalid'. + """ + project = resolve_project_name(project) if project else "" + + with get_connection() as conn: + rows = conn.execute( + "SELECT DISTINCT d.*, a.name AS assumption_name, a.id AS assumption_id, " + "a.status AS assumption_status, a.properties AS assumption_props " + "FROM entities d " + "JOIN relationships r ON r.source_entity_id = d.id " + "JOIN entities a ON a.id = r.target_entity_id " + "WHERE d.status = 'active' AND d.entity_type = 'decision' " + "AND d.project = ? " + "AND r.relationship_type = 'based_on_assumption' " + "AND (" + " a.status IN ('superseded', 'invalid') OR " + " a.properties LIKE '%\"flagged\": true%' OR " + " a.properties LIKE '%\"flagged\":true%'" + ") " + "ORDER BY d.updated_at DESC", + (project,), + ).fetchall() + + risky = [] + for r in rows: + risky.append({ + "decision_id": r["id"], + "decision_name": r["name"], + "decision_description": r["description"] or "", + "assumption_id": r["assumption_id"], + "assumption_name": r["assumption_name"], + "assumption_status": r["assumption_status"], + }) + return { + "project": project, + "query": "Q-009 risky decisions", + "description": "Decisions based on assumptions that are flagged, superseded, or invalid.", + "gaps": risky, + "count": len(risky), + } + + +def unsupported_claims(project: str) -> dict: + """Q-011: validation claims with NO inbound SUPPORTS edge. + + These are asserted claims (e.g., "margin is adequate") with no + Result entity actually supporting them. High-risk: the engineer + believes it, but there's no evidence on file. + """ + project = resolve_project_name(project) if project else "" + + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM entities WHERE status = 'active' " + "AND project = ? AND entity_type = 'validation_claim' " + "AND NOT EXISTS (" + " SELECT 1 FROM relationships r " + " WHERE r.relationship_type = 'supports' " + " AND r.target_entity_id = entities.id" + ") " + "ORDER BY updated_at DESC", + (project,), + ).fetchall() + + claims = [_entity_dict(_row_to_entity(r)) for r in rows] + return { + "project": project, + "query": "Q-011 unsupported claims", + "description": "Validation claims with no supporting Result — asserted but not evidenced.", + "gaps": claims, + "count": len(claims), + } + + +def all_gaps(project: str) -> dict: + """Combined: run Q-006, Q-009, Q-011 for a project in one go.""" + return { + "project": resolve_project_name(project) if project else "", + "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "orphan_requirements": orphan_requirements(project), + "risky_decisions": risky_decisions(project), + "unsupported_claims": unsupported_claims(project), + } + + +# ============================================================ +# History + impact (Q-016, Q-017) +# ============================================================ + + +def impact_analysis(entity_id: str, max_depth: int = 3) -> dict: + """Q-016: transitive outbound reach of an entity. + + Walks outbound edges breadth-first to max_depth. Answers "what would + be affected if I changed component X?" by finding everything downstream. + """ + visited: set[str] = {entity_id} + impacted: list[dict] = [] + frontier = [(entity_id, 0)] + + while frontier: + current_id, depth = frontier.pop(0) + if depth >= max_depth: + continue + with get_connection() as conn: + rows = conn.execute( + "SELECT r.relationship_type, r.target_entity_id, " + "e.entity_type, e.name, e.status " + "FROM relationships r " + "JOIN entities e ON e.id = r.target_entity_id " + "WHERE r.source_entity_id = ? AND e.status = 'active'", + (current_id,), + ).fetchall() + for r in rows: + tid = r["target_entity_id"] + if tid in visited: + continue + visited.add(tid) + impacted.append({ + "entity_id": tid, + "entity_type": r["entity_type"], + "name": r["name"], + "relationship": r["relationship_type"], + "depth": depth + 1, + }) + frontier.append((tid, depth + 1)) + + root = get_entity(entity_id) + return { + "root": _entity_dict(root) if root else None, + "impacted_count": len(impacted), + "impacted": impacted, + "max_depth": max_depth, + } + + +def evidence_chain(entity_id: str) -> dict: + """Q-017: what evidence supports this entity? + + Walks inbound SUPPORTS / EVIDENCED_BY / DESCRIBED_BY edges to surface + the provenance chain: "this claim is supported by that result, which + was produced by that analysis model, which was described by that doc." + """ + provenance_edges = ("supports", "evidenced_by", "described_by", + "validated_by", "analyzed_by") + placeholders = ",".join("?" * len(provenance_edges)) + + with get_connection() as conn: + # Inbound edges of the provenance family + inbound_rows = conn.execute( + f"SELECT r.relationship_type, r.source_entity_id, " + f"e.entity_type, e.name, e.description, e.status " + f"FROM relationships r " + f"JOIN entities e ON e.id = r.source_entity_id " + f"WHERE r.target_entity_id = ? AND e.status = 'active' " + f"AND r.relationship_type IN ({placeholders})", + (entity_id, *provenance_edges), + ).fetchall() + + # Also look at source_refs on the entity itself + root = get_entity(entity_id) + + chain = [] + for r in inbound_rows: + chain.append({ + "via": r["relationship_type"], + "source_id": r["source_entity_id"], + "source_type": r["entity_type"], + "source_name": r["name"], + "source_description": (r["description"] or "")[:200], + }) + + return { + "root": _entity_dict(root) if root else None, + "direct_source_refs": root.source_refs if root else [], + "evidence_chain": chain, + "count": len(chain), + } + + +# ============================================================ +# Helpers +# ============================================================ + + +def _entity_dict(e: Entity) -> dict: + """Flatten an Entity to a public-API dict.""" + return { + "id": e.id, + "entity_type": e.entity_type, + "name": e.name, + "project": e.project, + "description": e.description, + "properties": e.properties, + "status": e.status, + "confidence": e.confidence, + "source_refs": e.source_refs, + "updated_at": e.updated_at, + } diff --git a/src/atocore/engineering/triage_ui.py b/src/atocore/engineering/triage_ui.py index a9ab64b..352e23d 100644 --- a/src/atocore/engineering/triage_ui.py +++ b/src/atocore/engineering/triage_ui.py @@ -293,16 +293,109 @@ _TRIAGE_CSS = """ """ +def _render_entity_card(entity) -> str: + """Phase 5: entity candidate card with promote/reject.""" + eid = _escape(entity.id) + name = _escape(entity.name) + etype = _escape(entity.entity_type) + project = _escape(entity.project or "(global)") + desc = _escape(entity.description or "") + conf = f"{entity.confidence:.2f}" + src_refs = entity.source_refs or [] + source_display = _escape(", ".join(src_refs[:3])) if src_refs else "(no provenance)" + + return f""" +
+
+ [entity · {etype}] + {project} + conf {conf} · src: {source_display} +
+
+
{name}
+
{desc}
+
+
+ + + View in wiki → +
+
+
+""" + + +_ENTITY_TRIAGE_SCRIPT = """ + +""" + +_ENTITY_TRIAGE_CSS = """ + +""" + + def render_triage_page(limit: int = 100) -> str: - """Render the full triage page with all pending candidates.""" + """Render the full triage page with pending memory + entity candidates.""" + from atocore.engineering.service import get_entities + try: - candidates = get_memories(status="candidate", limit=limit) + mem_candidates = get_memories(status="candidate", limit=limit) except Exception as e: - body = f"

Error loading candidates: {_escape(str(e))}

" + body = f"

Error loading memory candidates: {_escape(str(e))}

" return render_html("Triage — AtoCore", body, breadcrumbs=[("Wiki", "/wiki"), ("Triage", "")]) - if not candidates: - body = _TRIAGE_CSS + """ + try: + entity_candidates = get_entities(status="candidate", limit=limit) + except Exception as e: + entity_candidates = [] + + total = len(mem_candidates) + len(entity_candidates) + + if total == 0: + body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + """

Triage Queue

@@ -313,15 +406,34 @@ def render_triage_page(limit: int = 100) -> str: """ return render_html("Triage — AtoCore", body, breadcrumbs=[("Wiki", "/wiki"), ("Triage", "")]) - cards_html = "".join(_render_candidate_card(c) for c in candidates) + # Memory cards + mem_cards = "".join(_render_candidate_card(c) for c in mem_candidates) - body = _TRIAGE_CSS + f""" + # Entity cards + ent_cards_html = "" + if entity_candidates: + ent_cards = "".join(_render_entity_card(e) for e in entity_candidates) + ent_cards_html = f""" +
+

🔧 Entity Candidates ({len(entity_candidates)})

+

+ Typed graph entries awaiting review. Promoting an entity connects it to + the engineering knowledge graph (subsystems, requirements, decisions, etc.). +

+
+ {ent_cards} + """ + + body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + f"""

Triage Queue

- {len(candidates)} pending + + {len(mem_candidates)} memory · + {len(entity_candidates)} entity +
- Review candidate memories the auto-triage wasn't sure about. Edit the content + Review candidates the auto-triage wasn't sure about. Edit the content if needed, then promote or reject. Shortcuts: Y promote · N reject · E edit · S scroll to next.
@@ -330,12 +442,14 @@ def render_triage_page(limit: int = 100) -> str: 🤖 Auto-process queue - Sends the full queue through LLM triage on the host. Promotes durable facts, - rejects noise, leaves only ambiguous items here for you. + Sends the full memory queue through LLM triage on the host. Entity candidates + stay for manual review (types + relationships matter too much to auto-decide). - {cards_html} - """ + _TRIAGE_SCRIPT +

📝 Memory Candidates ({len(mem_candidates)})

+ {mem_cards} + {ent_cards_html} + """ + _TRIAGE_SCRIPT + _ENTITY_TRIAGE_SCRIPT return render_html( "Triage — AtoCore", diff --git a/tests/test_engineering_queries.py b/tests/test_engineering_queries.py new file mode 100644 index 0000000..a52604c --- /dev/null +++ b/tests/test_engineering_queries.py @@ -0,0 +1,212 @@ +"""Phase 5 tests — the 10 canonical engineering queries. + +Test fixtures seed a small p-test graph and exercise each query. The 3 killer +queries (Q-006/009/011) get dedicated tests that verify they surface real gaps +and DON'T false-positive on well-formed data. +""" + +from __future__ import annotations + +import pytest + +from atocore.engineering.queries import ( + all_gaps, + decisions_affecting, + evidence_chain, + impact_analysis, + orphan_requirements, + recent_changes, + requirements_for, + risky_decisions, + system_map, + unsupported_claims, +) +from atocore.engineering.service import ( + create_entity, + create_relationship, + init_engineering_schema, +) +from atocore.models.database import init_db + + +@pytest.fixture +def seeded_graph(tmp_data_dir): + """Build a small engineering graph for query tests.""" + init_db() + init_engineering_schema() + + # Subsystem + components + ss = create_entity("subsystem", "Optics", project="p-test") + c1 = create_entity("component", "Primary Mirror", project="p-test") + c2 = create_entity("component", "Diverger Lens", project="p-test") + c_orphan = create_entity("component", "Unparented", project="p-test") + create_relationship(c1.id, ss.id, "part_of") + create_relationship(c2.id, ss.id, "part_of") + + # Requirements — one satisfied, one orphan + r_ok = create_entity("requirement", "Surface figure < 25nm RMS", project="p-test") + r_orphan = create_entity("requirement", "Measurement lambda/20", project="p-test") + create_relationship(c1.id, r_ok.id, "satisfies") + + # Decisions + d_ok = create_entity("decision", "Use Zerodur blank", project="p-test") + d_risky = create_entity("decision", "Use external CGH", project="p-test") + create_relationship(d_ok.id, ss.id, "affected_by_decision") + + # Assumption (flagged) — d_risky depends on it + a_flagged = create_entity( + "parameter", "Vendor lead time 6 weeks", + project="p-test", + properties={"flagged": True}, + ) + create_relationship(d_risky.id, a_flagged.id, "based_on_assumption") + + # Validation claim — one supported, one not + v_ok = create_entity("validation_claim", "Margin is adequate", project="p-test") + v_orphan = create_entity("validation_claim", "Thermal stability OK", project="p-test") + result = create_entity("result", "FEA thermal sweep 2026-03", project="p-test") + create_relationship(result.id, v_ok.id, "supports") + + # Material + mat = create_entity("material", "Zerodur", project="p-test") + create_relationship(c1.id, mat.id, "uses_material") + + return { + "subsystem": ss, "component_1": c1, "component_2": c2, + "orphan_component": c_orphan, + "req_ok": r_ok, "req_orphan": r_orphan, + "decision_ok": d_ok, "decision_risky": d_risky, + "assumption_flagged": a_flagged, + "claim_supported": v_ok, "claim_orphan": v_orphan, + "result": result, "material": mat, + } + + +# --- Structure queries --- + + +def test_system_map_returns_subsystem_with_components(seeded_graph): + result = system_map("p-test") + assert result["project"] == "p-test" + assert len(result["subsystems"]) == 1 + optics = result["subsystems"][0] + assert optics["name"] == "Optics" + comp_names = {c["name"] for c in optics["components"]} + assert "Primary Mirror" in comp_names + assert "Diverger Lens" in comp_names + + +def test_system_map_reports_orphan_components(seeded_graph): + result = system_map("p-test") + names = {c["name"] for c in result["orphan_components"]} + assert "Unparented" in names + + +def test_system_map_includes_materials(seeded_graph): + result = system_map("p-test") + primary = next( + c for s in result["subsystems"] for c in s["components"] if c["name"] == "Primary Mirror" + ) + assert "Zerodur" in primary["materials"] + + +def test_decisions_affecting_whole_project(seeded_graph): + result = decisions_affecting("p-test") + names = {d["name"] for d in result["decisions"]} + assert "Use Zerodur blank" in names + assert "Use external CGH" in names + + +def test_decisions_affecting_specific_subsystem(seeded_graph): + ss_id = seeded_graph["subsystem"].id + result = decisions_affecting("p-test", subsystem_id=ss_id) + names = {d["name"] for d in result["decisions"]} + # d_ok has edge to subsystem directly + assert "Use Zerodur blank" in names + + +def test_requirements_for_component(seeded_graph): + c_id = seeded_graph["component_1"].id + result = requirements_for(c_id) + assert result["count"] == 1 + assert result["requirements"][0]["name"] == "Surface figure < 25nm RMS" + + +def test_recent_changes_includes_created_entities(seeded_graph): + result = recent_changes("p-test", limit=100) + actions = [c["action"] for c in result["changes"]] + assert "created" in actions + assert result["count"] > 0 + + +# --- Killer queries --- + + +def test_orphan_requirements_finds_unsatisfied(seeded_graph): + result = orphan_requirements("p-test") + names = {r["name"] for r in result["gaps"]} + assert "Measurement lambda/20" in names # orphan + assert "Surface figure < 25nm RMS" not in names # has SATISFIES edge + + +def test_orphan_requirements_empty_when_all_satisfied(tmp_data_dir): + init_db() + init_engineering_schema() + c = create_entity("component", "C", project="p-clean") + r = create_entity("requirement", "R", project="p-clean") + create_relationship(c.id, r.id, "satisfies") + result = orphan_requirements("p-clean") + assert result["count"] == 0 + + +def test_risky_decisions_finds_flagged_assumptions(seeded_graph): + result = risky_decisions("p-test") + names = {d["decision_name"] for d in result["gaps"]} + assert "Use external CGH" in names + assert "Use Zerodur blank" not in names # has no flagged assumption + + +def test_unsupported_claims_finds_orphan_claims(seeded_graph): + result = unsupported_claims("p-test") + names = {c["name"] for c in result["gaps"]} + assert "Thermal stability OK" in names + assert "Margin is adequate" not in names # has SUPPORTS edge + + +def test_all_gaps_combines_the_three_killers(seeded_graph): + result = all_gaps("p-test") + assert result["orphan_requirements"]["count"] == 1 + assert result["risky_decisions"]["count"] == 1 + assert result["unsupported_claims"]["count"] == 1 + + +def test_all_gaps_clean_project_reports_zero(tmp_data_dir): + init_db() + init_engineering_schema() + create_entity("component", "alone", project="p-empty") + result = all_gaps("p-empty") + assert result["orphan_requirements"]["count"] == 0 + assert result["risky_decisions"]["count"] == 0 + assert result["unsupported_claims"]["count"] == 0 + + +# --- Impact + evidence --- + + +def test_impact_analysis_walks_outbound_edges(seeded_graph): + c_id = seeded_graph["component_1"].id + result = impact_analysis(c_id, max_depth=2) + # Primary Mirror → SATISFIES → Requirement, → USES_MATERIAL → Material + rel_types = {i["relationship"] for i in result["impacted"]} + assert "satisfies" in rel_types + assert "uses_material" in rel_types + + +def test_evidence_chain_walks_inbound_provenance(seeded_graph): + v_ok_id = seeded_graph["claim_supported"].id + result = evidence_chain(v_ok_id) + # The Result entity supports the claim + via_types = {e["via"] for e in result["evidence_chain"]} + assert "supports" in via_types + source_names = {e["source_name"] for e in result["evidence_chain"]} + assert "FEA thermal sweep 2026-03" in source_names