Files
ATOCore/src/atocore/engineering/queries.py
Anto01 23cdb3149f feat(engineering): V1-A — Q-001 subsystem-scoped + pillar query integration test
Phase V1-A of the Engineering V1 Completion Plan. Scope was kept tight
per the plan: a single Q-001 shape fix and an integration test that
proves the four pillar queries work end-to-end against one seed graph.

Code change:
  - subsystem_contents() in src/atocore/engineering/queries.py returns
    {subsystem, contains: [{id, type, entity_type, name, status}]} by
    walking inbound part_of edges (the inverse of CONTAINS), filtered
    to active children. `type` matches the catalog spec; `entity_type`
    preserves parity with the rest of this module's response shape.
  - GET /entities/Subsystem/{id}?expand=contains route in routes.py
    matches the Q-001 spec invocation verbatim; 400 on unsupported
    expand, 404 on missing-or-wrong-type.
  - Aliased under /v1.
  - The existing project-wide /engineering/projects/{name}/systems
    route stays as-is for Q-004 (whole-project tree).

V1-A acceptance test (tests/test_v1_a_pillar_queries.py):
  - Q-001 subsystem-scoped: Optics → Primary Mirror + Diverger Lens
  - Q-005 requirements_for: Primary Mirror has its single satisfied req
  - Q-006 orphan_requirements: orphan surfaces, satisfied does not
  - Q-017 evidence_chain: supported claim has the FEA result via
    'supports'; unsupported claim does not
  - Q-009 risky_decisions / Q-011 unsupported_claims also asserted
    against the same seed (Codex P2: don't seed data you don't assert)

Plus targeted tests for the Q-001 helper: missing id, wrong type,
nested child subsystems, inactive-child filtering, real /v1 GET
behavior.

Reviewed by Codex twice: GO WITH CONDITIONS at b575773, GO at 0e83525
after the dual-key emit + Q-009/Q-011 + /v1-behavior + nested/inactive
amends folded in.

Tests: 596 -> 605 (+9). Full local suite green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 13:03:58 -04:00

532 lines
19 KiB
Python

"""Phase 5 Engineering V1 — The 10 canonical queries.
Each function maps to one or more catalog IDs in
``docs/architecture/engineering-query-catalog.md``. Return values are plain
dicts so API and wiki renderers can consume them without importing dataclasses.
Design principles:
- All queries filter to status='active' unless the caller asks otherwise
- All project filters go through ``resolve_project_name`` (canonicalization)
- Graph traversals are bounded (depth <= 3 for impact, limit 200 for lists)
- The 3 "killer" queries (gaps) accept project as required — gaps are always
scoped to one project in V1
These queries are the *useful surface* of the entity graph. Before this module,
the graph was data with no narrative; after this module, the director can ask
real questions about coverage, risk, and evidence.
"""
from __future__ import annotations
from datetime import datetime, timezone
from atocore.engineering.service import (
Entity,
_row_to_entity,
get_entity,
get_relationships,
)
from atocore.models.database import get_connection
from atocore.projects.registry import resolve_project_name
# ============================================================
# Structure queries (Q-001, Q-004, Q-005, Q-008)
# ============================================================
def system_map(project: str) -> dict:
"""Q-001 + Q-004: return the full subsystem/component tree for a project.
Shape:
{
"project": "p05-interferometer",
"subsystems": [
{
"id": ..., "name": ..., "description": ...,
"components": [{id, name, description, materials: [...]}],
},
...
],
"orphan_components": [...], # components with no PART_OF edge
}
"""
project = resolve_project_name(project) if project else ""
out: dict = {"project": project, "subsystems": [], "orphan_components": []}
with get_connection() as conn:
# All subsystems in project
subsys_rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'subsystem' "
"ORDER BY name",
(project,),
).fetchall()
# All components in project
comp_rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'component'",
(project,),
).fetchall()
# PART_OF edges: component → subsystem
part_of_rows = conn.execute(
"SELECT source_entity_id, target_entity_id FROM relationships "
"WHERE relationship_type = 'part_of'"
).fetchall()
part_of_map: dict[str, str] = {
r["source_entity_id"]: r["target_entity_id"] for r in part_of_rows
}
# uses_material edges for components
mat_rows = conn.execute(
"SELECT r.source_entity_id, e.name FROM relationships r "
"JOIN entities e ON e.id = r.target_entity_id "
"WHERE r.relationship_type = 'uses_material' AND e.status = 'active'"
).fetchall()
materials_by_comp: dict[str, list[str]] = {}
for r in mat_rows:
materials_by_comp.setdefault(r["source_entity_id"], []).append(r["name"])
# Build: subsystems → their components
subsys_comps: dict[str, list[dict]] = {s["id"]: [] for s in subsys_rows}
orphans: list[dict] = []
for c in comp_rows:
parent = part_of_map.get(c["id"])
comp_dict = {
"id": c["id"],
"name": c["name"],
"description": c["description"] or "",
"materials": materials_by_comp.get(c["id"], []),
}
if parent and parent in subsys_comps:
subsys_comps[parent].append(comp_dict)
else:
orphans.append(comp_dict)
out["subsystems"] = [
{
"id": s["id"],
"name": s["name"],
"description": s["description"] or "",
"components": subsys_comps.get(s["id"], []),
}
for s in subsys_rows
]
out["orphan_components"] = orphans
return out
def subsystem_contents(subsystem_id: str) -> dict | None:
"""Q-001 subsystem-scoped variant: a single subsystem and its
direct ``CONTAINS`` children.
Spec: ``GET /entities/Subsystem/<id>?expand=contains`` per
``docs/architecture/engineering-query-catalog.md`` Q-001.
Differs from :func:`system_map` (Q-004) which returns the
project-wide tree. The subsystem-scoped form is what individual
operator queries actually need: "what's inside this one subsystem?"
rather than "show me the whole project."
The relationship walk uses inbound ``part_of`` edges (the inverse
of ``CONTAINS``) so both child Components and child Subsystems
surface uniformly. Filters to active children only — superseded
or invalid rows do not belong in a "current contents" answer.
Returns:
``{"subsystem": {id, name, project, status, description},
"contains": [{id, entity_type, name, status}]}``
or ``None`` when the entity does not exist or is not a subsystem.
"""
with get_connection() as conn:
ss = conn.execute(
"SELECT * FROM entities WHERE id = ?",
(subsystem_id,),
).fetchone()
if ss is None or ss["entity_type"] != "subsystem":
return None
rows = conn.execute(
"SELECT e.id, e.entity_type, e.name, e.status "
"FROM relationships r "
"JOIN entities e ON e.id = r.source_entity_id "
"WHERE r.relationship_type = 'part_of' "
"AND r.target_entity_id = ? "
"AND e.status = 'active' "
"ORDER BY e.entity_type, e.name",
(subsystem_id,),
).fetchall()
return {
"subsystem": {
"id": ss["id"],
"name": ss["name"],
"project": ss["project"],
"status": ss["status"],
"description": ss["description"] or "",
},
"contains": [
{
"id": r["id"],
# V1-A spec uses `type` per engineering-query-catalog.md Q-001;
# `entity_type` is duplicated for parity with the rest of
# this module's response shape (see `_entity_dict`).
"type": r["entity_type"],
"entity_type": r["entity_type"],
"name": r["name"],
"status": r["status"],
}
for r in rows
],
}
def decisions_affecting(project: str, subsystem_id: str | None = None) -> dict:
"""Q-008: decisions that affect a subsystem (or whole project).
Walks AFFECTED_BY_DECISION edges. If subsystem_id is given, returns
decisions linked to that subsystem or any of its components. Otherwise,
all decisions in the project.
"""
project = resolve_project_name(project) if project else ""
target_ids: set[str] = set()
if subsystem_id:
target_ids.add(subsystem_id)
# Include components PART_OF the subsystem
with get_connection() as conn:
rows = conn.execute(
"SELECT source_entity_id FROM relationships "
"WHERE relationship_type = 'part_of' AND target_entity_id = ?",
(subsystem_id,),
).fetchall()
for r in rows:
target_ids.add(r["source_entity_id"])
with get_connection() as conn:
if target_ids:
placeholders = ",".join("?" * len(target_ids))
rows = conn.execute(
f"SELECT DISTINCT e.* FROM entities e "
f"JOIN relationships r ON r.source_entity_id = e.id "
f"WHERE e.status = 'active' AND e.entity_type = 'decision' "
f"AND e.project = ? AND r.relationship_type = 'affected_by_decision' "
f"AND r.target_entity_id IN ({placeholders}) "
f"ORDER BY e.updated_at DESC",
(project, *target_ids),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND entity_type = 'decision' AND project = ? "
"ORDER BY updated_at DESC LIMIT 200",
(project,),
).fetchall()
decisions = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"project": project,
"subsystem_id": subsystem_id or "",
"decisions": decisions,
"count": len(decisions),
}
def requirements_for(component_id: str) -> dict:
"""Q-005: requirements that a component satisfies."""
with get_connection() as conn:
# Component → SATISFIES → Requirement
rows = conn.execute(
"SELECT e.* FROM entities e "
"JOIN relationships r ON r.target_entity_id = e.id "
"WHERE r.source_entity_id = ? AND r.relationship_type = 'satisfies' "
"AND e.entity_type = 'requirement' AND e.status = 'active' "
"ORDER BY e.name",
(component_id,),
).fetchall()
requirements = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"component_id": component_id,
"requirements": requirements,
"count": len(requirements),
}
def recent_changes(project: str, since: str | None = None, limit: int = 50) -> dict:
"""Q-013: what changed recently in the project (entity audit log).
Uses the shared memory_audit table filtered by entity_kind='entity' and
joins back to entities for the project scope.
"""
project = resolve_project_name(project) if project else ""
since = since or "2020-01-01"
with get_connection() as conn:
rows = conn.execute(
"SELECT a.id, a.memory_id AS entity_id, a.action, a.actor, "
"a.timestamp, a.note, e.entity_type, e.name, e.project "
"FROM memory_audit a "
"LEFT JOIN entities e ON e.id = a.memory_id "
"WHERE a.entity_kind = 'entity' AND a.timestamp >= ? "
"AND (e.project = ? OR e.project IS NULL) "
"ORDER BY a.timestamp DESC LIMIT ?",
(since, project, limit),
).fetchall()
changes = []
for r in rows:
changes.append({
"audit_id": r["id"],
"entity_id": r["entity_id"],
"entity_type": r["entity_type"] or "?",
"entity_name": r["name"] or "(deleted)",
"action": r["action"],
"actor": r["actor"] or "api",
"note": r["note"] or "",
"timestamp": r["timestamp"],
})
return {"project": project, "since": since, "changes": changes, "count": len(changes)}
# ============================================================
# Killer queries (Q-006, Q-009, Q-011) — the "what am I forgetting?" queries
# ============================================================
def orphan_requirements(project: str) -> dict:
"""Q-006: requirements in project with NO inbound SATISFIES edge.
These are "something we said must be true" with nothing actually
satisfying them. The single highest-value query for an engineering
director: shows what's unclaimed by design.
"""
project = resolve_project_name(project) if project else ""
with get_connection() as conn:
rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'requirement' "
"AND NOT EXISTS ("
" SELECT 1 FROM relationships r "
" WHERE r.relationship_type = 'satisfies' "
" AND r.target_entity_id = entities.id"
") "
"ORDER BY updated_at DESC",
(project,),
).fetchall()
orphans = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"project": project,
"query": "Q-006 orphan requirements",
"description": "Requirements with no SATISFIES relationship — nothing claims to meet them.",
"gaps": orphans,
"count": len(orphans),
}
def risky_decisions(project: str) -> dict:
"""Q-009: decisions linked to assumptions flagged as unresolved.
Walks BASED_ON_ASSUMPTION edges. An assumption is "flagged" if its
properties.flagged=True OR status='superseded' OR status='invalid'.
"""
project = resolve_project_name(project) if project else ""
with get_connection() as conn:
rows = conn.execute(
"SELECT DISTINCT d.*, a.name AS assumption_name, a.id AS assumption_id, "
"a.status AS assumption_status, a.properties AS assumption_props "
"FROM entities d "
"JOIN relationships r ON r.source_entity_id = d.id "
"JOIN entities a ON a.id = r.target_entity_id "
"WHERE d.status = 'active' AND d.entity_type = 'decision' "
"AND d.project = ? "
"AND r.relationship_type = 'based_on_assumption' "
"AND ("
" a.status IN ('superseded', 'invalid') OR "
" a.properties LIKE '%\"flagged\": true%' OR "
" a.properties LIKE '%\"flagged\":true%'"
") "
"ORDER BY d.updated_at DESC",
(project,),
).fetchall()
risky = []
for r in rows:
risky.append({
"decision_id": r["id"],
"decision_name": r["name"],
"decision_description": r["description"] or "",
"assumption_id": r["assumption_id"],
"assumption_name": r["assumption_name"],
"assumption_status": r["assumption_status"],
})
return {
"project": project,
"query": "Q-009 risky decisions",
"description": "Decisions based on assumptions that are flagged, superseded, or invalid.",
"gaps": risky,
"count": len(risky),
}
def unsupported_claims(project: str) -> dict:
"""Q-011: validation claims with NO inbound SUPPORTS edge.
These are asserted claims (e.g., "margin is adequate") with no
Result entity actually supporting them. High-risk: the engineer
believes it, but there's no evidence on file.
"""
project = resolve_project_name(project) if project else ""
with get_connection() as conn:
rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'validation_claim' "
"AND NOT EXISTS ("
" SELECT 1 FROM relationships r "
" WHERE r.relationship_type = 'supports' "
" AND r.target_entity_id = entities.id"
") "
"ORDER BY updated_at DESC",
(project,),
).fetchall()
claims = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"project": project,
"query": "Q-011 unsupported claims",
"description": "Validation claims with no supporting Result — asserted but not evidenced.",
"gaps": claims,
"count": len(claims),
}
def all_gaps(project: str) -> dict:
"""Combined: run Q-006, Q-009, Q-011 for a project in one go."""
return {
"project": resolve_project_name(project) if project else "",
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"orphan_requirements": orphan_requirements(project),
"risky_decisions": risky_decisions(project),
"unsupported_claims": unsupported_claims(project),
}
# ============================================================
# History + impact (Q-016, Q-017)
# ============================================================
def impact_analysis(entity_id: str, max_depth: int = 3) -> dict:
"""Q-016: transitive outbound reach of an entity.
Walks outbound edges breadth-first to max_depth. Answers "what would
be affected if I changed component X?" by finding everything downstream.
"""
visited: set[str] = {entity_id}
impacted: list[dict] = []
frontier = [(entity_id, 0)]
while frontier:
current_id, depth = frontier.pop(0)
if depth >= max_depth:
continue
with get_connection() as conn:
rows = conn.execute(
"SELECT r.relationship_type, r.target_entity_id, "
"e.entity_type, e.name, e.status "
"FROM relationships r "
"JOIN entities e ON e.id = r.target_entity_id "
"WHERE r.source_entity_id = ? AND e.status = 'active'",
(current_id,),
).fetchall()
for r in rows:
tid = r["target_entity_id"]
if tid in visited:
continue
visited.add(tid)
impacted.append({
"entity_id": tid,
"entity_type": r["entity_type"],
"name": r["name"],
"relationship": r["relationship_type"],
"depth": depth + 1,
})
frontier.append((tid, depth + 1))
root = get_entity(entity_id)
return {
"root": _entity_dict(root) if root else None,
"impacted_count": len(impacted),
"impacted": impacted,
"max_depth": max_depth,
}
def evidence_chain(entity_id: str) -> dict:
"""Q-017: what evidence supports this entity?
Walks inbound SUPPORTS / EVIDENCED_BY / DESCRIBED_BY edges to surface
the provenance chain: "this claim is supported by that result, which
was produced by that analysis model, which was described by that doc."
"""
provenance_edges = ("supports", "evidenced_by", "described_by",
"validated_by", "analyzed_by")
placeholders = ",".join("?" * len(provenance_edges))
with get_connection() as conn:
# Inbound edges of the provenance family
inbound_rows = conn.execute(
f"SELECT r.relationship_type, r.source_entity_id, "
f"e.entity_type, e.name, e.description, e.status "
f"FROM relationships r "
f"JOIN entities e ON e.id = r.source_entity_id "
f"WHERE r.target_entity_id = ? AND e.status = 'active' "
f"AND r.relationship_type IN ({placeholders})",
(entity_id, *provenance_edges),
).fetchall()
# Also look at source_refs on the entity itself
root = get_entity(entity_id)
chain = []
for r in inbound_rows:
chain.append({
"via": r["relationship_type"],
"source_id": r["source_entity_id"],
"source_type": r["entity_type"],
"source_name": r["name"],
"source_description": (r["description"] or "")[:200],
})
return {
"root": _entity_dict(root) if root else None,
"direct_source_refs": root.source_refs if root else [],
"evidence_chain": chain,
"count": len(chain),
}
# ============================================================
# Helpers
# ============================================================
def _entity_dict(e: Entity) -> dict:
"""Flatten an Entity to a public-API dict."""
return {
"id": e.id,
"entity_type": e.entity_type,
"name": e.name,
"project": e.project,
"description": e.description,
"properties": e.properties,
"status": e.status,
"confidence": e.confidence,
"source_refs": e.source_refs,
"updated_at": e.updated_at,
}