feat: Phase 5B-5D — 10 canonical engineering queries + triage UI

The graph becomes useful. Before this commit, entities sat in the DB
as data with no narrative. After: the director can ask "what am I
forgetting?" and get a structured answer in milliseconds.

New module (src/atocore/engineering/queries.py, 360 lines):

Structure queries (Q-001/004/005/008/013):
- system_map(project): full subsystem → component tree + orphans +
  materials joined per component
- decisions_affecting(project, subsystem_id?): decisions linked via
  AFFECTED_BY_DECISION, scoped to a subsystem or whole project
- requirements_for(component_id): Q-005 forward trace
- recent_changes(project, since, limit): Q-013 via memory_audit join
  (reuses the Phase 4 audit infrastructure — entity_kind='entity')

The 3 killer queries (the real value):
- orphan_requirements(project): requirements with NO inbound SATISFIES
  edge. "What do I claim the system must do that nothing actually
  claims to handle?" Q-006.
- risky_decisions(project): decisions whose BASED_ON_ASSUMPTION edge
  points to an assumption with status in ('superseded','invalid') OR
  properties.flagged=True. Finds cascading risk from shaky premises. Q-009.
- unsupported_claims(project): ValidationClaim entities with no inbound
  SUPPORTS edge — asserted but no Result to back them. Q-011.
- all_gaps(project): runs all three in one call for dashboards.

History + impact (Q-016/017):
- impact_analysis(entity_id, max_depth=3): BFS over outbound edges.
  "What's downstream of this if I change it?"
- evidence_chain(entity_id): inbound SUPPORTS/EVIDENCED_BY/DESCRIBED_BY/
  VALIDATED_BY/ANALYZED_BY. "How do I know this is true?"

API (src/atocore/api/routes.py) exposes 10 endpoints:
- GET /engineering/projects/{p}/systems
- GET /engineering/decisions?project=&subsystem=
- GET /engineering/components/{id}/requirements
- GET /engineering/changes?project=&since=&limit=
- GET /engineering/gaps/orphan-requirements?project=
- GET /engineering/gaps/risky-decisions?project=
- GET /engineering/gaps/unsupported-claims?project=
- GET /engineering/gaps?project=  (combined)
- GET /engineering/impact?entity=&max_depth=
- GET /engineering/evidence?entity=

Mirror integration (src/atocore/engineering/mirror.py):
- New _gaps_section() renders at top of every project page
- If any gap non-empty: shows up-to-10 per category with names + context
- Clean project: " No gaps detected" — signals everything is traced

Triage UI (src/atocore/engineering/triage_ui.py):
- /admin/triage now shows BOTH memory candidates AND entity candidates
- Entity cards: name, type, project, confidence, source provenance,
  Promote/Reject buttons, link to wiki entity page
- Entity promote/reject via fetch to /entities/{id}/promote|reject
- One triage UI for the whole pipeline — consistent muscle memory

Tests: 326 → 341 (15 new, all in test_engineering_queries.py):
- System map structure + orphan detection + material joins
- Killer queries: positive + negative cases (empty when clean)
- Decisions query: project-wide and subsystem-scoped
- Impact analysis walks outbound BFS
- Evidence chain walks inbound provenance

No regressions. All 10 daily queries from the plan are now live and
answering real questions against the graph.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-17 07:18:46 -04:00
parent 07664bd743
commit 53b71639ad
5 changed files with 947 additions and 13 deletions

View File

@@ -1327,6 +1327,86 @@ def api_list_entities(
}
# --- Phase 5 Engineering V1: The 10 canonical queries ---
@router.get("/engineering/projects/{project_name}/systems")
def api_system_map(project_name: str) -> dict:
"""Q-001 + Q-004: subsystem/component tree for a project."""
from atocore.engineering.queries import system_map
return system_map(project_name)
@router.get("/engineering/decisions")
def api_decisions_affecting(
project: str,
subsystem: str | None = None,
) -> dict:
"""Q-008: decisions affecting a subsystem (or the whole project)."""
from atocore.engineering.queries import decisions_affecting
return decisions_affecting(project, subsystem_id=subsystem)
@router.get("/engineering/components/{component_id}/requirements")
def api_requirements_for_component(component_id: str) -> dict:
"""Q-005: requirements that a component satisfies."""
from atocore.engineering.queries import requirements_for
return requirements_for(component_id)
@router.get("/engineering/changes")
def api_recent_engineering_changes(
project: str,
since: str | None = None,
limit: int = 50,
) -> dict:
"""Q-013: entity changes in project since timestamp."""
from atocore.engineering.queries import recent_changes
return recent_changes(project, since=since, limit=limit)
@router.get("/engineering/gaps/orphan-requirements")
def api_orphan_requirements(project: str) -> dict:
"""Q-006 (killer): requirements with no SATISFIES edge."""
from atocore.engineering.queries import orphan_requirements
return orphan_requirements(project)
@router.get("/engineering/gaps/risky-decisions")
def api_risky_decisions(project: str) -> dict:
"""Q-009 (killer): decisions resting on flagged/superseded assumptions."""
from atocore.engineering.queries import risky_decisions
return risky_decisions(project)
@router.get("/engineering/gaps/unsupported-claims")
def api_unsupported_claims(project: str) -> dict:
"""Q-011 (killer): validation claims with no SUPPORTS edge."""
from atocore.engineering.queries import unsupported_claims
return unsupported_claims(project)
@router.get("/engineering/gaps")
def api_all_gaps(project: str) -> dict:
"""Combined Q-006 + Q-009 + Q-011 for a project."""
from atocore.engineering.queries import all_gaps
return all_gaps(project)
@router.get("/engineering/impact")
def api_impact_analysis(entity: str, max_depth: int = 3) -> dict:
"""Q-016: transitive outbound impact of changing an entity."""
from atocore.engineering.queries import impact_analysis
return impact_analysis(entity, max_depth=max_depth)
@router.get("/engineering/evidence")
def api_evidence_chain(entity: str) -> dict:
"""Q-017: inbound evidence chain for an entity."""
from atocore.engineering.queries import evidence_chain
return evidence_chain(entity)
@router.post("/entities/{entity_id}/promote")
def api_promote_entity(entity_id: str) -> dict:
"""Promote a candidate entity to active (Phase 5 Engineering V1)."""

View File

@@ -29,6 +29,7 @@ def generate_project_overview(project: str) -> str:
sections = [
_header(project),
_synthesis_section(project),
_gaps_section(project), # Phase 5: killer queries surface here
_state_section(project),
_system_architecture(project),
_decisions_section(project),
@@ -41,6 +42,66 @@ def generate_project_overview(project: str) -> str:
return "\n\n".join(s for s in sections if s)
def _gaps_section(project: str) -> str:
"""Phase 5: surface the 3 killer-query gaps on every project page.
If any gap is non-empty, it appears near the top so the director
sees "what am I forgetting?" before the rest of the report.
"""
try:
from atocore.engineering.queries import all_gaps
result = all_gaps(project)
except Exception:
return ""
orphan = result["orphan_requirements"]["count"]
risky = result["risky_decisions"]["count"]
unsup = result["unsupported_claims"]["count"]
if orphan == 0 and risky == 0 and unsup == 0:
return (
"## Coverage Gaps\n\n"
"> ✅ No gaps detected: every requirement is satisfied, "
"no decisions rest on flagged assumptions, every claim has evidence.\n"
)
lines = ["## Coverage Gaps", ""]
lines.append(
"> ⚠️ Items below need attention — gaps in the engineering graph.\n"
)
if orphan:
lines.append(f"### {orphan} Orphan Requirement(s)")
lines.append("*Requirements with no component claiming to satisfy them:*")
lines.append("")
for r in result["orphan_requirements"]["gaps"][:10]:
lines.append(f"- **{r['name']}** — {(r['description'] or '')[:120]}")
if orphan > 10:
lines.append(f"- _...and {orphan - 10} more_")
lines.append("")
if risky:
lines.append(f"### {risky} Risky Decision(s)")
lines.append("*Decisions based on assumptions that are flagged, superseded, or invalid:*")
lines.append("")
for d in result["risky_decisions"]["gaps"][:10]:
lines.append(
f"- **{d['decision_name']}** — based on flagged assumption "
f"_{d['assumption_name']}_ ({d['assumption_status']})"
)
lines.append("")
if unsup:
lines.append(f"### {unsup} Unsupported Claim(s)")
lines.append("*Validation claims with no supporting Result entity:*")
lines.append("")
for c in result["unsupported_claims"]["gaps"][:10]:
lines.append(f"- **{c['name']}** — {(c['description'] or '')[:120]}")
lines.append("")
return "\n".join(lines)
def _synthesis_section(project: str) -> str:
"""Generate a short LLM synthesis of the current project state.

View File

@@ -0,0 +1,467 @@
"""Phase 5 Engineering V1 — The 10 canonical queries.
Each function maps to one or more catalog IDs in
``docs/architecture/engineering-query-catalog.md``. Return values are plain
dicts so API and wiki renderers can consume them without importing dataclasses.
Design principles:
- All queries filter to status='active' unless the caller asks otherwise
- All project filters go through ``resolve_project_name`` (canonicalization)
- Graph traversals are bounded (depth <= 3 for impact, limit 200 for lists)
- The 3 "killer" queries (gaps) accept project as required — gaps are always
scoped to one project in V1
These queries are the *useful surface* of the entity graph. Before this module,
the graph was data with no narrative; after this module, the director can ask
real questions about coverage, risk, and evidence.
"""
from __future__ import annotations
from datetime import datetime, timezone
from atocore.engineering.service import (
Entity,
_row_to_entity,
get_entity,
get_relationships,
)
from atocore.models.database import get_connection
from atocore.projects.registry import resolve_project_name
# ============================================================
# Structure queries (Q-001, Q-004, Q-005, Q-008)
# ============================================================
def system_map(project: str) -> dict:
"""Q-001 + Q-004: return the full subsystem/component tree for a project.
Shape:
{
"project": "p05-interferometer",
"subsystems": [
{
"id": ..., "name": ..., "description": ...,
"components": [{id, name, description, materials: [...]}],
},
...
],
"orphan_components": [...], # components with no PART_OF edge
}
"""
project = resolve_project_name(project) if project else ""
out: dict = {"project": project, "subsystems": [], "orphan_components": []}
with get_connection() as conn:
# All subsystems in project
subsys_rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'subsystem' "
"ORDER BY name",
(project,),
).fetchall()
# All components in project
comp_rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'component'",
(project,),
).fetchall()
# PART_OF edges: component → subsystem
part_of_rows = conn.execute(
"SELECT source_entity_id, target_entity_id FROM relationships "
"WHERE relationship_type = 'part_of'"
).fetchall()
part_of_map: dict[str, str] = {
r["source_entity_id"]: r["target_entity_id"] for r in part_of_rows
}
# uses_material edges for components
mat_rows = conn.execute(
"SELECT r.source_entity_id, e.name FROM relationships r "
"JOIN entities e ON e.id = r.target_entity_id "
"WHERE r.relationship_type = 'uses_material' AND e.status = 'active'"
).fetchall()
materials_by_comp: dict[str, list[str]] = {}
for r in mat_rows:
materials_by_comp.setdefault(r["source_entity_id"], []).append(r["name"])
# Build: subsystems → their components
subsys_comps: dict[str, list[dict]] = {s["id"]: [] for s in subsys_rows}
orphans: list[dict] = []
for c in comp_rows:
parent = part_of_map.get(c["id"])
comp_dict = {
"id": c["id"],
"name": c["name"],
"description": c["description"] or "",
"materials": materials_by_comp.get(c["id"], []),
}
if parent and parent in subsys_comps:
subsys_comps[parent].append(comp_dict)
else:
orphans.append(comp_dict)
out["subsystems"] = [
{
"id": s["id"],
"name": s["name"],
"description": s["description"] or "",
"components": subsys_comps.get(s["id"], []),
}
for s in subsys_rows
]
out["orphan_components"] = orphans
return out
def decisions_affecting(project: str, subsystem_id: str | None = None) -> dict:
"""Q-008: decisions that affect a subsystem (or whole project).
Walks AFFECTED_BY_DECISION edges. If subsystem_id is given, returns
decisions linked to that subsystem or any of its components. Otherwise,
all decisions in the project.
"""
project = resolve_project_name(project) if project else ""
target_ids: set[str] = set()
if subsystem_id:
target_ids.add(subsystem_id)
# Include components PART_OF the subsystem
with get_connection() as conn:
rows = conn.execute(
"SELECT source_entity_id FROM relationships "
"WHERE relationship_type = 'part_of' AND target_entity_id = ?",
(subsystem_id,),
).fetchall()
for r in rows:
target_ids.add(r["source_entity_id"])
with get_connection() as conn:
if target_ids:
placeholders = ",".join("?" * len(target_ids))
rows = conn.execute(
f"SELECT DISTINCT e.* FROM entities e "
f"JOIN relationships r ON r.source_entity_id = e.id "
f"WHERE e.status = 'active' AND e.entity_type = 'decision' "
f"AND e.project = ? AND r.relationship_type = 'affected_by_decision' "
f"AND r.target_entity_id IN ({placeholders}) "
f"ORDER BY e.updated_at DESC",
(project, *target_ids),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND entity_type = 'decision' AND project = ? "
"ORDER BY updated_at DESC LIMIT 200",
(project,),
).fetchall()
decisions = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"project": project,
"subsystem_id": subsystem_id or "",
"decisions": decisions,
"count": len(decisions),
}
def requirements_for(component_id: str) -> dict:
"""Q-005: requirements that a component satisfies."""
with get_connection() as conn:
# Component → SATISFIES → Requirement
rows = conn.execute(
"SELECT e.* FROM entities e "
"JOIN relationships r ON r.target_entity_id = e.id "
"WHERE r.source_entity_id = ? AND r.relationship_type = 'satisfies' "
"AND e.entity_type = 'requirement' AND e.status = 'active' "
"ORDER BY e.name",
(component_id,),
).fetchall()
requirements = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"component_id": component_id,
"requirements": requirements,
"count": len(requirements),
}
def recent_changes(project: str, since: str | None = None, limit: int = 50) -> dict:
"""Q-013: what changed recently in the project (entity audit log).
Uses the shared memory_audit table filtered by entity_kind='entity' and
joins back to entities for the project scope.
"""
project = resolve_project_name(project) if project else ""
since = since or "2020-01-01"
with get_connection() as conn:
rows = conn.execute(
"SELECT a.id, a.memory_id AS entity_id, a.action, a.actor, "
"a.timestamp, a.note, e.entity_type, e.name, e.project "
"FROM memory_audit a "
"LEFT JOIN entities e ON e.id = a.memory_id "
"WHERE a.entity_kind = 'entity' AND a.timestamp >= ? "
"AND (e.project = ? OR e.project IS NULL) "
"ORDER BY a.timestamp DESC LIMIT ?",
(since, project, limit),
).fetchall()
changes = []
for r in rows:
changes.append({
"audit_id": r["id"],
"entity_id": r["entity_id"],
"entity_type": r["entity_type"] or "?",
"entity_name": r["name"] or "(deleted)",
"action": r["action"],
"actor": r["actor"] or "api",
"note": r["note"] or "",
"timestamp": r["timestamp"],
})
return {"project": project, "since": since, "changes": changes, "count": len(changes)}
# ============================================================
# Killer queries (Q-006, Q-009, Q-011) — the "what am I forgetting?" queries
# ============================================================
def orphan_requirements(project: str) -> dict:
"""Q-006: requirements in project with NO inbound SATISFIES edge.
These are "something we said must be true" with nothing actually
satisfying them. The single highest-value query for an engineering
director: shows what's unclaimed by design.
"""
project = resolve_project_name(project) if project else ""
with get_connection() as conn:
rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'requirement' "
"AND NOT EXISTS ("
" SELECT 1 FROM relationships r "
" WHERE r.relationship_type = 'satisfies' "
" AND r.target_entity_id = entities.id"
") "
"ORDER BY updated_at DESC",
(project,),
).fetchall()
orphans = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"project": project,
"query": "Q-006 orphan requirements",
"description": "Requirements with no SATISFIES relationship — nothing claims to meet them.",
"gaps": orphans,
"count": len(orphans),
}
def risky_decisions(project: str) -> dict:
"""Q-009: decisions linked to assumptions flagged as unresolved.
Walks BASED_ON_ASSUMPTION edges. An assumption is "flagged" if its
properties.flagged=True OR status='superseded' OR status='invalid'.
"""
project = resolve_project_name(project) if project else ""
with get_connection() as conn:
rows = conn.execute(
"SELECT DISTINCT d.*, a.name AS assumption_name, a.id AS assumption_id, "
"a.status AS assumption_status, a.properties AS assumption_props "
"FROM entities d "
"JOIN relationships r ON r.source_entity_id = d.id "
"JOIN entities a ON a.id = r.target_entity_id "
"WHERE d.status = 'active' AND d.entity_type = 'decision' "
"AND d.project = ? "
"AND r.relationship_type = 'based_on_assumption' "
"AND ("
" a.status IN ('superseded', 'invalid') OR "
" a.properties LIKE '%\"flagged\": true%' OR "
" a.properties LIKE '%\"flagged\":true%'"
") "
"ORDER BY d.updated_at DESC",
(project,),
).fetchall()
risky = []
for r in rows:
risky.append({
"decision_id": r["id"],
"decision_name": r["name"],
"decision_description": r["description"] or "",
"assumption_id": r["assumption_id"],
"assumption_name": r["assumption_name"],
"assumption_status": r["assumption_status"],
})
return {
"project": project,
"query": "Q-009 risky decisions",
"description": "Decisions based on assumptions that are flagged, superseded, or invalid.",
"gaps": risky,
"count": len(risky),
}
def unsupported_claims(project: str) -> dict:
"""Q-011: validation claims with NO inbound SUPPORTS edge.
These are asserted claims (e.g., "margin is adequate") with no
Result entity actually supporting them. High-risk: the engineer
believes it, but there's no evidence on file.
"""
project = resolve_project_name(project) if project else ""
with get_connection() as conn:
rows = conn.execute(
"SELECT * FROM entities WHERE status = 'active' "
"AND project = ? AND entity_type = 'validation_claim' "
"AND NOT EXISTS ("
" SELECT 1 FROM relationships r "
" WHERE r.relationship_type = 'supports' "
" AND r.target_entity_id = entities.id"
") "
"ORDER BY updated_at DESC",
(project,),
).fetchall()
claims = [_entity_dict(_row_to_entity(r)) for r in rows]
return {
"project": project,
"query": "Q-011 unsupported claims",
"description": "Validation claims with no supporting Result — asserted but not evidenced.",
"gaps": claims,
"count": len(claims),
}
def all_gaps(project: str) -> dict:
"""Combined: run Q-006, Q-009, Q-011 for a project in one go."""
return {
"project": resolve_project_name(project) if project else "",
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"orphan_requirements": orphan_requirements(project),
"risky_decisions": risky_decisions(project),
"unsupported_claims": unsupported_claims(project),
}
# ============================================================
# History + impact (Q-016, Q-017)
# ============================================================
def impact_analysis(entity_id: str, max_depth: int = 3) -> dict:
"""Q-016: transitive outbound reach of an entity.
Walks outbound edges breadth-first to max_depth. Answers "what would
be affected if I changed component X?" by finding everything downstream.
"""
visited: set[str] = {entity_id}
impacted: list[dict] = []
frontier = [(entity_id, 0)]
while frontier:
current_id, depth = frontier.pop(0)
if depth >= max_depth:
continue
with get_connection() as conn:
rows = conn.execute(
"SELECT r.relationship_type, r.target_entity_id, "
"e.entity_type, e.name, e.status "
"FROM relationships r "
"JOIN entities e ON e.id = r.target_entity_id "
"WHERE r.source_entity_id = ? AND e.status = 'active'",
(current_id,),
).fetchall()
for r in rows:
tid = r["target_entity_id"]
if tid in visited:
continue
visited.add(tid)
impacted.append({
"entity_id": tid,
"entity_type": r["entity_type"],
"name": r["name"],
"relationship": r["relationship_type"],
"depth": depth + 1,
})
frontier.append((tid, depth + 1))
root = get_entity(entity_id)
return {
"root": _entity_dict(root) if root else None,
"impacted_count": len(impacted),
"impacted": impacted,
"max_depth": max_depth,
}
def evidence_chain(entity_id: str) -> dict:
"""Q-017: what evidence supports this entity?
Walks inbound SUPPORTS / EVIDENCED_BY / DESCRIBED_BY edges to surface
the provenance chain: "this claim is supported by that result, which
was produced by that analysis model, which was described by that doc."
"""
provenance_edges = ("supports", "evidenced_by", "described_by",
"validated_by", "analyzed_by")
placeholders = ",".join("?" * len(provenance_edges))
with get_connection() as conn:
# Inbound edges of the provenance family
inbound_rows = conn.execute(
f"SELECT r.relationship_type, r.source_entity_id, "
f"e.entity_type, e.name, e.description, e.status "
f"FROM relationships r "
f"JOIN entities e ON e.id = r.source_entity_id "
f"WHERE r.target_entity_id = ? AND e.status = 'active' "
f"AND r.relationship_type IN ({placeholders})",
(entity_id, *provenance_edges),
).fetchall()
# Also look at source_refs on the entity itself
root = get_entity(entity_id)
chain = []
for r in inbound_rows:
chain.append({
"via": r["relationship_type"],
"source_id": r["source_entity_id"],
"source_type": r["entity_type"],
"source_name": r["name"],
"source_description": (r["description"] or "")[:200],
})
return {
"root": _entity_dict(root) if root else None,
"direct_source_refs": root.source_refs if root else [],
"evidence_chain": chain,
"count": len(chain),
}
# ============================================================
# Helpers
# ============================================================
def _entity_dict(e: Entity) -> dict:
"""Flatten an Entity to a public-API dict."""
return {
"id": e.id,
"entity_type": e.entity_type,
"name": e.name,
"project": e.project,
"description": e.description,
"properties": e.properties,
"status": e.status,
"confidence": e.confidence,
"source_refs": e.source_refs,
"updated_at": e.updated_at,
}

View File

@@ -293,16 +293,109 @@ _TRIAGE_CSS = """
"""
def _render_entity_card(entity) -> str:
"""Phase 5: entity candidate card with promote/reject."""
eid = _escape(entity.id)
name = _escape(entity.name)
etype = _escape(entity.entity_type)
project = _escape(entity.project or "(global)")
desc = _escape(entity.description or "")
conf = f"{entity.confidence:.2f}"
src_refs = entity.source_refs or []
source_display = _escape(", ".join(src_refs[:3])) if src_refs else "(no provenance)"
return f"""
<div class="cand cand-entity" id="ecand-{eid}" data-entity-id="{eid}">
<div class="cand-head">
<span class="cand-type entity-type">[entity · {etype}]</span>
<span class="cand-project">{project}</span>
<span class="cand-meta">conf {conf} · src: {source_display}</span>
</div>
<div class="cand-body">
<div class="entity-name">{name}</div>
<div class="entity-desc">{desc}</div>
</div>
<div class="cand-actions">
<button class="btn-entity-promote" data-entity-id="{eid}" title="Promote entity (Y)">✅ Promote Entity</button>
<button class="btn-entity-reject" data-entity-id="{eid}" title="Reject entity (N)">❌ Reject</button>
<a class="btn-link" href="/wiki/entities/{eid}">View in wiki →</a>
</div>
<div class="cand-status" id="estatus-{eid}"></div>
</div>
"""
_ENTITY_TRIAGE_SCRIPT = """
<script>
async function entityPromote(id) {
const st = document.getElementById('estatus-' + id);
st.textContent = 'Promoting…';
st.className = 'cand-status ok';
const r = await fetch('/entities/' + encodeURIComponent(id) + '/promote', {method:'POST'});
if (r.ok) {
st.textContent = '✅ Entity promoted';
setTimeout(() => {
const card = document.getElementById('ecand-' + id);
if (card) { card.style.opacity = '0'; setTimeout(() => card.remove(), 300); }
}, 400);
} else st.textContent = '' + r.status;
}
async function entityReject(id) {
const st = document.getElementById('estatus-' + id);
st.textContent = 'Rejecting…';
st.className = 'cand-status ok';
const r = await fetch('/entities/' + encodeURIComponent(id) + '/reject', {method:'POST'});
if (r.ok) {
st.textContent = '❌ Entity rejected';
setTimeout(() => {
const card = document.getElementById('ecand-' + id);
if (card) { card.style.opacity = '0'; setTimeout(() => card.remove(), 300); }
}, 400);
} else st.textContent = '' + r.status;
}
document.addEventListener('click', (e) => {
const eid = e.target.dataset?.entityId;
if (!eid) return;
if (e.target.classList.contains('btn-entity-promote')) entityPromote(eid);
else if (e.target.classList.contains('btn-entity-reject')) entityReject(eid);
});
</script>
"""
_ENTITY_TRIAGE_CSS = """
<style>
.cand-entity { border-left: 3px solid #059669; }
.entity-type { background: #059669; color: white; padding: 0.1rem 0.5rem; border-radius: 3px; font-size: 0.75rem; }
.entity-name { font-size: 1.15rem; font-weight: 600; margin-bottom: 0.3rem; }
.entity-desc { opacity: 0.85; font-size: 0.95rem; }
.btn-entity-promote { background: #059669; color: white; border-color: #059669; }
.btn-entity-reject:hover { background: #dc2626; color: white; border-color: #dc2626; }
.btn-link { padding: 0.4rem 0.9rem; text-decoration: none; color: var(--accent); border: 1px solid var(--border); border-radius: 4px; font-size: 0.88rem; }
.btn-link:hover { background: var(--hover); }
.section-break { border-top: 2px solid var(--border); margin: 2rem 0 1rem 0; padding-top: 1rem; }
</style>
"""
def render_triage_page(limit: int = 100) -> str:
"""Render the full triage page with all pending candidates."""
"""Render the full triage page with pending memory + entity candidates."""
from atocore.engineering.service import get_entities
try:
candidates = get_memories(status="candidate", limit=limit)
mem_candidates = get_memories(status="candidate", limit=limit)
except Exception as e:
body = f"<p>Error loading candidates: {_escape(str(e))}</p>"
body = f"<p>Error loading memory candidates: {_escape(str(e))}</p>"
return render_html("Triage — AtoCore", body, breadcrumbs=[("Wiki", "/wiki"), ("Triage", "")])
if not candidates:
body = _TRIAGE_CSS + """
try:
entity_candidates = get_entities(status="candidate", limit=limit)
except Exception as e:
entity_candidates = []
total = len(mem_candidates) + len(entity_candidates)
if total == 0:
body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + """
<div class="triage-header">
<h1>Triage Queue</h1>
</div>
@@ -313,15 +406,34 @@ def render_triage_page(limit: int = 100) -> str:
"""
return render_html("Triage — AtoCore", body, breadcrumbs=[("Wiki", "/wiki"), ("Triage", "")])
cards_html = "".join(_render_candidate_card(c) for c in candidates)
# Memory cards
mem_cards = "".join(_render_candidate_card(c) for c in mem_candidates)
body = _TRIAGE_CSS + f"""
# Entity cards
ent_cards_html = ""
if entity_candidates:
ent_cards = "".join(_render_entity_card(e) for e in entity_candidates)
ent_cards_html = f"""
<div class="section-break">
<h2>🔧 Entity Candidates ({len(entity_candidates)})</h2>
<p class="auto-triage-msg">
Typed graph entries awaiting review. Promoting an entity connects it to
the engineering knowledge graph (subsystems, requirements, decisions, etc.).
</p>
</div>
{ent_cards}
"""
body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + f"""
<div class="triage-header">
<h1>Triage Queue</h1>
<span class="count"><span id="cand-count">{len(candidates)}</span> pending</span>
<span class="count">
<span id="cand-count">{len(mem_candidates)}</span> memory ·
{len(entity_candidates)} entity
</span>
</div>
<div class="triage-help">
Review candidate memories the auto-triage wasn't sure about. Edit the content
Review candidates the auto-triage wasn't sure about. Edit the content
if needed, then promote or reject. Shortcuts: <kbd>Y</kbd> promote · <kbd>N</kbd>
reject · <kbd>E</kbd> edit · <kbd>S</kbd> scroll to next.
</div>
@@ -330,12 +442,14 @@ def render_triage_page(limit: int = 100) -> str:
🤖 Auto-process queue
</button>
<span id="auto-triage-status" class="auto-triage-msg">
Sends the full queue through LLM triage on the host. Promotes durable facts,
rejects noise, leaves only ambiguous items here for you.
Sends the full memory queue through LLM triage on the host. Entity candidates
stay for manual review (types + relationships matter too much to auto-decide).
</span>
</div>
{cards_html}
""" + _TRIAGE_SCRIPT
<h2>📝 Memory Candidates ({len(mem_candidates)})</h2>
{mem_cards}
{ent_cards_html}
""" + _TRIAGE_SCRIPT + _ENTITY_TRIAGE_SCRIPT
return render_html(
"Triage — AtoCore",

View File

@@ -0,0 +1,212 @@
"""Phase 5 tests — the 10 canonical engineering queries.
Test fixtures seed a small p-test graph and exercise each query. The 3 killer
queries (Q-006/009/011) get dedicated tests that verify they surface real gaps
and DON'T false-positive on well-formed data.
"""
from __future__ import annotations
import pytest
from atocore.engineering.queries import (
all_gaps,
decisions_affecting,
evidence_chain,
impact_analysis,
orphan_requirements,
recent_changes,
requirements_for,
risky_decisions,
system_map,
unsupported_claims,
)
from atocore.engineering.service import (
create_entity,
create_relationship,
init_engineering_schema,
)
from atocore.models.database import init_db
@pytest.fixture
def seeded_graph(tmp_data_dir):
"""Build a small engineering graph for query tests."""
init_db()
init_engineering_schema()
# Subsystem + components
ss = create_entity("subsystem", "Optics", project="p-test")
c1 = create_entity("component", "Primary Mirror", project="p-test")
c2 = create_entity("component", "Diverger Lens", project="p-test")
c_orphan = create_entity("component", "Unparented", project="p-test")
create_relationship(c1.id, ss.id, "part_of")
create_relationship(c2.id, ss.id, "part_of")
# Requirements — one satisfied, one orphan
r_ok = create_entity("requirement", "Surface figure < 25nm RMS", project="p-test")
r_orphan = create_entity("requirement", "Measurement lambda/20", project="p-test")
create_relationship(c1.id, r_ok.id, "satisfies")
# Decisions
d_ok = create_entity("decision", "Use Zerodur blank", project="p-test")
d_risky = create_entity("decision", "Use external CGH", project="p-test")
create_relationship(d_ok.id, ss.id, "affected_by_decision")
# Assumption (flagged) — d_risky depends on it
a_flagged = create_entity(
"parameter", "Vendor lead time 6 weeks",
project="p-test",
properties={"flagged": True},
)
create_relationship(d_risky.id, a_flagged.id, "based_on_assumption")
# Validation claim — one supported, one not
v_ok = create_entity("validation_claim", "Margin is adequate", project="p-test")
v_orphan = create_entity("validation_claim", "Thermal stability OK", project="p-test")
result = create_entity("result", "FEA thermal sweep 2026-03", project="p-test")
create_relationship(result.id, v_ok.id, "supports")
# Material
mat = create_entity("material", "Zerodur", project="p-test")
create_relationship(c1.id, mat.id, "uses_material")
return {
"subsystem": ss, "component_1": c1, "component_2": c2,
"orphan_component": c_orphan,
"req_ok": r_ok, "req_orphan": r_orphan,
"decision_ok": d_ok, "decision_risky": d_risky,
"assumption_flagged": a_flagged,
"claim_supported": v_ok, "claim_orphan": v_orphan,
"result": result, "material": mat,
}
# --- Structure queries ---
def test_system_map_returns_subsystem_with_components(seeded_graph):
result = system_map("p-test")
assert result["project"] == "p-test"
assert len(result["subsystems"]) == 1
optics = result["subsystems"][0]
assert optics["name"] == "Optics"
comp_names = {c["name"] for c in optics["components"]}
assert "Primary Mirror" in comp_names
assert "Diverger Lens" in comp_names
def test_system_map_reports_orphan_components(seeded_graph):
result = system_map("p-test")
names = {c["name"] for c in result["orphan_components"]}
assert "Unparented" in names
def test_system_map_includes_materials(seeded_graph):
result = system_map("p-test")
primary = next(
c for s in result["subsystems"] for c in s["components"] if c["name"] == "Primary Mirror"
)
assert "Zerodur" in primary["materials"]
def test_decisions_affecting_whole_project(seeded_graph):
result = decisions_affecting("p-test")
names = {d["name"] for d in result["decisions"]}
assert "Use Zerodur blank" in names
assert "Use external CGH" in names
def test_decisions_affecting_specific_subsystem(seeded_graph):
ss_id = seeded_graph["subsystem"].id
result = decisions_affecting("p-test", subsystem_id=ss_id)
names = {d["name"] for d in result["decisions"]}
# d_ok has edge to subsystem directly
assert "Use Zerodur blank" in names
def test_requirements_for_component(seeded_graph):
c_id = seeded_graph["component_1"].id
result = requirements_for(c_id)
assert result["count"] == 1
assert result["requirements"][0]["name"] == "Surface figure < 25nm RMS"
def test_recent_changes_includes_created_entities(seeded_graph):
result = recent_changes("p-test", limit=100)
actions = [c["action"] for c in result["changes"]]
assert "created" in actions
assert result["count"] > 0
# --- Killer queries ---
def test_orphan_requirements_finds_unsatisfied(seeded_graph):
result = orphan_requirements("p-test")
names = {r["name"] for r in result["gaps"]}
assert "Measurement lambda/20" in names # orphan
assert "Surface figure < 25nm RMS" not in names # has SATISFIES edge
def test_orphan_requirements_empty_when_all_satisfied(tmp_data_dir):
init_db()
init_engineering_schema()
c = create_entity("component", "C", project="p-clean")
r = create_entity("requirement", "R", project="p-clean")
create_relationship(c.id, r.id, "satisfies")
result = orphan_requirements("p-clean")
assert result["count"] == 0
def test_risky_decisions_finds_flagged_assumptions(seeded_graph):
result = risky_decisions("p-test")
names = {d["decision_name"] for d in result["gaps"]}
assert "Use external CGH" in names
assert "Use Zerodur blank" not in names # has no flagged assumption
def test_unsupported_claims_finds_orphan_claims(seeded_graph):
result = unsupported_claims("p-test")
names = {c["name"] for c in result["gaps"]}
assert "Thermal stability OK" in names
assert "Margin is adequate" not in names # has SUPPORTS edge
def test_all_gaps_combines_the_three_killers(seeded_graph):
result = all_gaps("p-test")
assert result["orphan_requirements"]["count"] == 1
assert result["risky_decisions"]["count"] == 1
assert result["unsupported_claims"]["count"] == 1
def test_all_gaps_clean_project_reports_zero(tmp_data_dir):
init_db()
init_engineering_schema()
create_entity("component", "alone", project="p-empty")
result = all_gaps("p-empty")
assert result["orphan_requirements"]["count"] == 0
assert result["risky_decisions"]["count"] == 0
assert result["unsupported_claims"]["count"] == 0
# --- Impact + evidence ---
def test_impact_analysis_walks_outbound_edges(seeded_graph):
c_id = seeded_graph["component_1"].id
result = impact_analysis(c_id, max_depth=2)
# Primary Mirror → SATISFIES → Requirement, → USES_MATERIAL → Material
rel_types = {i["relationship"] for i in result["impacted"]}
assert "satisfies" in rel_types
assert "uses_material" in rel_types
def test_evidence_chain_walks_inbound_provenance(seeded_graph):
v_ok_id = seeded_graph["claim_supported"].id
result = evidence_chain(v_ok_id)
# The Result entity supports the claim
via_types = {e["via"] for e in result["evidence_chain"]}
assert "supports" in via_types
source_names = {e["source_name"] for e in result["evidence_chain"]}
assert "FEA thermal sweep 2026-03" in source_names