From 3316ff99f9e724f1060591e2dcf7e34cd3d7bcd2 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Fri, 17 Apr 2026 07:53:03 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=205F/5G/5H=20=E2=80=94=20graduati?= =?UTF-8?q?on,=20conflicts,=20MCP=20engineering=20tools?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The population move + the safety net + the universal consumer hookup, all shipped together. This is where the engineering graph becomes genuinely useful against the real 262-memory corpus. 5F: Memory → Entity graduation (THE population move) - src/atocore/engineering/_graduation_prompt.py: stdlib-only shared prompt module mirroring _llm_prompt.py pattern (container + host use same system prompt, no drift) - scripts/graduate_memories.py: host-side batch driver that asks claude-p "does this memory describe a typed entity?" and creates entity candidates with source_refs pointing back to the memory - promote_entity() now scans source_refs for memory:* prefix; if found, flips source memory to status='graduated' with graduated_to_entity_id forward pointer + writes memory_audit row - GET /admin/graduation/stats exposes graduation rate for dashboard 5G: Sync conflict detection on entity promote - src/atocore/engineering/conflicts.py: detect_conflicts_for_entity() runs on every active promote. V1 checks 3 slot kinds narrowly to avoid false positives: * component.material (multiple USES_MATERIAL edges) * component.part_of (multiple PART_OF edges) * requirement.name (duplicate active Requirements in same project) - Conflicts + members persist via the tables built in 5A - Fires a "warning" alert via Phase 4 framework - Deduplicates: same (slot_kind, slot_key) won't get a new row - resolve_conflict(action="dismiss|supersede_others|no_action"): supersede_others marks non-winner members as status='superseded' - GET /admin/conflicts + POST /admin/conflicts/{id}/resolve 5H: MCP + context pack integration - scripts/atocore_mcp.py: 7 new engineering tools exposed to every MCP-aware client (Claude Desktop, Claude Code, Cursor, Zed): * atocore_engineering_map (Q-001/004 system tree) * atocore_engineering_gaps (Q-006/009/011 killer queries — THE director's question surfaced as a built-in tool) * atocore_engineering_requirements_for_component (Q-005) * atocore_engineering_decisions (Q-008) * atocore_engineering_changes (Q-013 — reads entity audit log) * atocore_engineering_impact (Q-016 BFS downstream) * atocore_engineering_evidence (Q-017 inbound provenance) - MCP tools total: 14 (7 memory/state/health + 7 engineering) - context/builder.py _build_engineering_context now appends a compact gaps summary ("Gaps: N orphan reqs, M risky decisions, K unsupported claims") so every project-scoped LLM call sees "what we're missing" Tests: 341 → 356 (15 new): - 5F: graduation prompt parses positive/negative decisions, rejects unknown entity types, tolerates markdown fences; promote_entity marks source memory graduated with forward pointer; entity without memory refs promotes cleanly - 5G: component.material + component.part_of + requirement.name conflicts detected; clean component triggers nothing; dedup works; supersede_others resolution marks losers; dismiss leaves both active; end-to-end promote triggers detection - 5H: graduation user message includes project + type + content No regressions across the 341 prior tests. The MCP server now answers "which p05 requirements aren't satisfied?" directly from any Claude session — no user prompt engineering, no context hacks. Next to kick off from user: run graduation script on Dalidou to populate the graph from 262 existing memories: ssh papa@dalidou 'cd /srv/storage/atocore/app && PYTHONPATH=src \ python3 scripts/graduate_memories.py --project p05-interferometer --limit 30 --dry-run' Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/atocore_mcp.py | 306 ++++++++++++++++++ scripts/graduate_memories.py | 237 ++++++++++++++ src/atocore/api/routes.py | 56 ++++ src/atocore/context/builder.py | 17 + src/atocore/engineering/_graduation_prompt.py | 194 +++++++++++ src/atocore/engineering/conflicts.py | 291 +++++++++++++++++ src/atocore/engineering/service.py | 85 ++++- tests/test_engineering_v1_phase5.py | 246 ++++++++++++++ 8 files changed, 1425 insertions(+), 7 deletions(-) create mode 100644 scripts/graduate_memories.py create mode 100644 src/atocore/engineering/_graduation_prompt.py create mode 100644 src/atocore/engineering/conflicts.py create mode 100644 tests/test_engineering_v1_phase5.py diff --git a/scripts/atocore_mcp.py b/scripts/atocore_mcp.py index 1aefccc..9474e13 100644 --- a/scripts/atocore_mcp.py +++ b/scripts/atocore_mcp.py @@ -243,6 +243,197 @@ def _tool_health(args: dict) -> str: return f"AtoCore healthy: sha={sha} vectors={vectors} env={env}" +# --- Phase 5H: Engineering query tools --- + + +def _tool_system_map(args: dict) -> str: + """Q-001 + Q-004: subsystem/component tree for a project.""" + project = (args.get("project") or "").strip() + if not project: + return "Error: 'project' is required." + result, err = safe_call( + http_get, f"/engineering/projects/{urllib.parse.quote(project)}/systems" + ) + if err: + return f"Engineering query failed: {err}" + subs = result.get("subsystems", []) or [] + orphans = result.get("orphan_components", []) or [] + if not subs and not orphans: + return f"No subsystems or components registered for {project}." + lines = [f"System map for {project}:"] + for s in subs: + lines.append(f"\n[{s['name']}] — {s.get('description') or '(no description)'}") + for c in s.get("components", []): + mats = ", ".join(c.get("materials", [])) or "-" + lines.append(f" • {c['name']} (materials: {mats})") + if orphans: + lines.append(f"\nOrphan components (not attached to any subsystem):") + for c in orphans: + lines.append(f" • {c['name']}") + return "\n".join(lines) + + +def _tool_gaps(args: dict) -> str: + """Q-006 + Q-009 + Q-011: find coverage gaps. Director's most-used query.""" + project = (args.get("project") or "").strip() + if not project: + return "Error: 'project' is required." + result, err = safe_call( + http_get, f"/engineering/gaps", + params={"project": project}, + ) + if err: + return f"Gap query failed: {err}" + + orphan = result.get("orphan_requirements", {}) + risky = result.get("risky_decisions", {}) + unsup = result.get("unsupported_claims", {}) + + counts = f"{orphan.get('count',0)}/{risky.get('count',0)}/{unsup.get('count',0)}" + lines = [f"Coverage gaps for {project} (orphan reqs / risky decisions / unsupported claims: {counts}):\n"] + + if orphan.get("count", 0): + lines.append(f"ORPHAN REQUIREMENTS ({orphan['count']}) — no component claims to satisfy:") + for g in orphan.get("gaps", [])[:10]: + lines.append(f" • {g['name']}: {(g.get('description') or '')[:120]}") + lines.append("") + if risky.get("count", 0): + lines.append(f"RISKY DECISIONS ({risky['count']}) — based on flagged assumptions:") + for g in risky.get("gaps", [])[:10]: + lines.append(f" • {g['decision_name']} (assumption: {g['assumption_name']} — {g['assumption_status']})") + lines.append("") + if unsup.get("count", 0): + lines.append(f"UNSUPPORTED CLAIMS ({unsup['count']}) — no Result entity backs them:") + for g in unsup.get("gaps", [])[:10]: + lines.append(f" • {g['name']}: {(g.get('description') or '')[:120]}") + + if orphan.get("count", 0) == 0 and risky.get("count", 0) == 0 and unsup.get("count", 0) == 0: + lines.append("✓ No gaps detected — every requirement satisfied, no flagged assumptions, all claims have evidence.") + + return "\n".join(lines) + + +def _tool_requirements_for(args: dict) -> str: + """Q-005: requirements that a component satisfies.""" + component_id = (args.get("component_id") or "").strip() + if not component_id: + return "Error: 'component_id' is required." + result, err = safe_call( + http_get, f"/engineering/components/{urllib.parse.quote(component_id)}/requirements" + ) + if err: + return f"Query failed: {err}" + reqs = result.get("requirements", []) or [] + if not reqs: + return "No requirements associated with this component." + lines = [f"Component satisfies {len(reqs)} requirement(s):"] + for r in reqs: + lines.append(f" • {r['name']}: {(r.get('description') or '')[:150]}") + return "\n".join(lines) + + +def _tool_decisions_affecting(args: dict) -> str: + """Q-008: decisions affecting a project or subsystem.""" + project = (args.get("project") or "").strip() + subsystem = args.get("subsystem_id") or args.get("subsystem") or "" + if not project: + return "Error: 'project' is required." + params = {"project": project} + if subsystem: + params["subsystem"] = subsystem + result, err = safe_call(http_get, "/engineering/decisions", params=params) + if err: + return f"Query failed: {err}" + decisions = result.get("decisions", []) or [] + if not decisions: + scope = f"subsystem {subsystem}" if subsystem else f"project {project}" + return f"No decisions recorded for {scope}." + scope = f"subsystem {subsystem}" if subsystem else project + lines = [f"{len(decisions)} decision(s) affecting {scope}:"] + for d in decisions: + lines.append(f" • {d['name']}: {(d.get('description') or '')[:150]}") + return "\n".join(lines) + + +def _tool_recent_changes(args: dict) -> str: + """Q-013: what changed recently in the engineering graph.""" + project = (args.get("project") or "").strip() + since = args.get("since") or "" + limit = int(args.get("limit") or 20) + if not project: + return "Error: 'project' is required." + params = {"project": project, "limit": limit} + if since: + params["since"] = since + result, err = safe_call(http_get, "/engineering/changes", params=params) + if err: + return f"Query failed: {err}" + changes = result.get("changes", []) or [] + if not changes: + return f"No entity changes in {project} since {since or '(all time)'}." + lines = [f"Recent changes in {project} ({len(changes)}):"] + for c in changes: + lines.append( + f" [{c['timestamp'][:16]}] {c['action']:10s} " + f"[{c.get('entity_type','?')}] {c.get('entity_name','?')} " + f"by {c.get('actor','?')}" + ) + return "\n".join(lines) + + +def _tool_impact(args: dict) -> str: + """Q-016: impact of changing an entity (downstream BFS).""" + entity = (args.get("entity_id") or args.get("entity") or "").strip() + if not entity: + return "Error: 'entity_id' is required." + max_depth = int(args.get("max_depth") or 3) + result, err = safe_call( + http_get, "/engineering/impact", + params={"entity": entity, "max_depth": max_depth}, + ) + if err: + return f"Query failed: {err}" + root = result.get("root") or {} + impacted = result.get("impacted", []) or [] + if not impacted: + return f"Nothing downstream of [{root.get('entity_type','?')}] {root.get('name','?')}." + lines = [ + f"Changing [{root.get('entity_type')}] {root.get('name')} " + f"would affect {len(impacted)} entity(ies) (max depth {max_depth}):" + ] + for i in impacted[:25]: + indent = " " * i.get("depth", 1) + lines.append(f"{indent}→ [{i['entity_type']}] {i['name']} (via {i['relationship']})") + if len(impacted) > 25: + lines.append(f" ... and {len(impacted)-25} more") + return "\n".join(lines) + + +def _tool_evidence(args: dict) -> str: + """Q-017: evidence chain for an entity.""" + entity = (args.get("entity_id") or args.get("entity") or "").strip() + if not entity: + return "Error: 'entity_id' is required." + result, err = safe_call(http_get, "/engineering/evidence", params={"entity": entity}) + if err: + return f"Query failed: {err}" + root = result.get("root") or {} + chain = result.get("evidence_chain", []) or [] + lines = [f"Evidence for [{root.get('entity_type','?')}] {root.get('name','?')}:"] + if not chain: + lines.append(" (no inbound provenance edges)") + else: + for e in chain: + lines.append( + f" {e['via']} ← [{e['source_type']}] {e['source_name']}: " + f"{(e.get('source_description') or '')[:100]}" + ) + refs = result.get("direct_source_refs") or [] + if refs: + lines.append(f"\nDirect source_refs: {refs[:5]}") + return "\n".join(lines) + + TOOLS = [ { "name": "atocore_context", @@ -358,6 +549,121 @@ TOOLS = [ "inputSchema": {"type": "object", "properties": {}}, "handler": _tool_health, }, + # --- Phase 5H: Engineering knowledge graph tools --- + { + "name": "atocore_engineering_map", + "description": ( + "Get the subsystem/component tree for an engineering project. " + "Returns the full system architecture: subsystems, their components, " + "materials, and any orphan components not attached to a subsystem. " + "Use when the user asks about project structure or system design." + ), + "inputSchema": { + "type": "object", + "properties": { + "project": {"type": "string", "description": "Project id (e.g. p04-gigabit)"}, + }, + "required": ["project"], + }, + "handler": _tool_system_map, + }, + { + "name": "atocore_engineering_gaps", + "description": ( + "Find coverage gaps in a project's engineering graph: orphan " + "requirements (no component satisfies them), risky decisions " + "(based on flagged assumptions), and unsupported claims (no " + "Result evidence). This is the director's most useful query — " + "answers 'what am I forgetting?' in seconds." + ), + "inputSchema": { + "type": "object", + "properties": { + "project": {"type": "string"}, + }, + "required": ["project"], + }, + "handler": _tool_gaps, + }, + { + "name": "atocore_engineering_requirements_for_component", + "description": "List the requirements a specific component claims to satisfy (Q-005).", + "inputSchema": { + "type": "object", + "properties": { + "component_id": {"type": "string"}, + }, + "required": ["component_id"], + }, + "handler": _tool_requirements_for, + }, + { + "name": "atocore_engineering_decisions", + "description": ( + "Decisions that affect a project, optionally scoped to a specific " + "subsystem. Use when the user asks 'what did we decide about X?'" + ), + "inputSchema": { + "type": "object", + "properties": { + "project": {"type": "string"}, + "subsystem_id": {"type": "string", "description": "optional subsystem entity id"}, + }, + "required": ["project"], + }, + "handler": _tool_decisions_affecting, + }, + { + "name": "atocore_engineering_changes", + "description": ( + "Recent changes to the engineering graph for a project: which " + "entities were created/promoted/rejected/updated, by whom, when. " + "Use for 'what changed recently?' type questions." + ), + "inputSchema": { + "type": "object", + "properties": { + "project": {"type": "string"}, + "since": {"type": "string", "description": "ISO timestamp; optional"}, + "limit": {"type": "integer", "minimum": 1, "maximum": 200, "default": 20}, + }, + "required": ["project"], + }, + "handler": _tool_recent_changes, + }, + { + "name": "atocore_engineering_impact", + "description": ( + "Impact analysis: what's downstream of a given entity. BFS over " + "outbound relationships up to max_depth. Use to answer 'what would " + "break if I change X?'" + ), + "inputSchema": { + "type": "object", + "properties": { + "entity_id": {"type": "string"}, + "max_depth": {"type": "integer", "minimum": 1, "maximum": 5, "default": 3}, + }, + "required": ["entity_id"], + }, + "handler": _tool_impact, + }, + { + "name": "atocore_engineering_evidence", + "description": ( + "Evidence chain for an entity: what supports it? Walks inbound " + "SUPPORTS / EVIDENCED_BY / DESCRIBED_BY / VALIDATED_BY / ANALYZED_BY " + "edges. Use for 'how do we know X is true?' type questions." + ), + "inputSchema": { + "type": "object", + "properties": { + "entity_id": {"type": "string"}, + }, + "required": ["entity_id"], + }, + "handler": _tool_evidence, + }, ] diff --git a/scripts/graduate_memories.py b/scripts/graduate_memories.py new file mode 100644 index 0000000..0683325 --- /dev/null +++ b/scripts/graduate_memories.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +"""Phase 5F — Memory → Entity graduation batch pass. + +Takes active memories, asks claude-p whether each describes a typed +engineering entity, and creates entity candidates for the ones that do. +Each candidate carries source_refs back to its source memory so human +review can trace provenance. + +Human reviews the entity candidates via /admin/triage (same UI as memory +triage). When a candidate is promoted, a post-promote hook marks the source +memory as `graduated` and sets `graduated_to_entity_id` for traceability. + +This is THE population move: without it, the engineering graph stays sparse +and the killer queries (Q-006/009/011) have nothing to find gaps in. + +Usage: + python3 scripts/graduate_memories.py --base-url http://127.0.0.1:8100 \\ + --project p05-interferometer --limit 20 + + # Dry run (don't create entities, just show decisions): + python3 scripts/graduate_memories.py --project p05-interferometer --dry-run + + # Process all active memories across all projects (big run): + python3 scripts/graduate_memories.py --limit 200 + +Host-side because claude CLI lives on Dalidou, not in the container. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +import tempfile +import time +import urllib.error +import urllib.request +from typing import Any + +# Make src/ importable so we can reuse the stdlib-only prompt module +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) +if _SRC_DIR not in sys.path: + sys.path.insert(0, _SRC_DIR) + +from atocore.engineering._graduation_prompt import ( # noqa: E402 + GRADUATION_PROMPT_VERSION, + SYSTEM_PROMPT, + build_user_message, + parse_graduation_output, +) + + +DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100") +DEFAULT_MODEL = os.environ.get("ATOCORE_LLM_EXTRACTOR_MODEL", "sonnet") +DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_GRADUATION_TIMEOUT_S", "90")) + +_sandbox_cwd = None + + +def get_sandbox_cwd() -> str: + """Temp cwd so claude CLI doesn't auto-discover project CLAUDE.md files.""" + global _sandbox_cwd + if _sandbox_cwd is None: + _sandbox_cwd = tempfile.mkdtemp(prefix="ato-graduate-") + return _sandbox_cwd + + +def api_get(base_url: str, path: str) -> dict: + req = urllib.request.Request(f"{base_url}{path}") + with urllib.request.urlopen(req, timeout=15) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def api_post(base_url: str, path: str, body: dict | None = None) -> dict: + data = json.dumps(body or {}).encode("utf-8") + req = urllib.request.Request( + f"{base_url}{path}", method="POST", + headers={"Content-Type": "application/json"}, data=data, + ) + with urllib.request.urlopen(req, timeout=15) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def graduate_one(memory: dict, model: str, timeout_s: float) -> dict[str, Any] | None: + """Ask claude whether this memory describes a typed entity. + + Returns None on any failure (parse error, timeout, exit!=0). + Applies retry+pacing to match the pattern in auto_triage/batch_extract. + """ + if not shutil.which("claude"): + return None + + user_msg = build_user_message( + memory_content=memory.get("content", "") or "", + memory_project=memory.get("project", "") or "", + memory_type=memory.get("memory_type", "") or "", + ) + + args = [ + "claude", "-p", + "--model", model, + "--append-system-prompt", SYSTEM_PROMPT, + "--disable-slash-commands", + user_msg, + ] + + last_error = "" + for attempt in range(3): + if attempt > 0: + time.sleep(2 ** attempt) + try: + completed = subprocess.run( + args, capture_output=True, text=True, + timeout=timeout_s, cwd=get_sandbox_cwd(), + encoding="utf-8", errors="replace", + ) + except subprocess.TimeoutExpired: + last_error = "timeout" + continue + except Exception as exc: + last_error = f"subprocess error: {exc}" + continue + + if completed.returncode == 0: + return parse_graduation_output(completed.stdout or "") + + stderr = (completed.stderr or "").strip()[:200] + last_error = f"exit_{completed.returncode}: {stderr}" if stderr else f"exit_{completed.returncode}" + + print(f" ! claude failed after 3 tries: {last_error}", file=sys.stderr) + return None + + +def create_entity_candidate( + base_url: str, + decision: dict, + memory: dict, +) -> str | None: + """Create an entity candidate with source_refs pointing at the memory.""" + try: + result = api_post(base_url, "/entities", { + "entity_type": decision["entity_type"], + "name": decision["name"], + "project": memory.get("project", "") or "", + "description": decision["description"], + "properties": { + "graduated_from_memory": memory["id"], + "proposed_relationships": decision["relationships"], + "prompt_version": GRADUATION_PROMPT_VERSION, + }, + "status": "candidate", + "confidence": decision["confidence"], + "source_refs": [f"memory:{memory['id']}"], + }) + return result.get("id") + except Exception as e: + print(f" ! entity create failed: {e}", file=sys.stderr) + return None + + +def main() -> None: + parser = argparse.ArgumentParser(description="Graduate active memories into entity candidates") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--model", default=DEFAULT_MODEL) + parser.add_argument("--project", default=None, help="Only graduate memories in this project") + parser.add_argument("--limit", type=int, default=50, help="Max memories to process") + parser.add_argument("--min-confidence", type=float, default=0.3, + help="Skip memories with confidence below this (they're probably noise)") + parser.add_argument("--dry-run", action="store_true", help="Show decisions without creating entities") + args = parser.parse_args() + + # Fetch active memories + query = "status=active" + query += f"&limit={args.limit}" + if args.project: + query += f"&project={args.project}" + result = api_get(args.base_url, f"/memory?{query}") + memories = result.get("memories", []) + + # Filter by min_confidence + skip already-graduated + memories = [m for m in memories + if m.get("confidence", 0) >= args.min_confidence + and m.get("status") != "graduated"] + + print(f"graduating: {len(memories)} memories project={args.project or '(all)'} " + f"model={args.model} dry_run={args.dry_run}") + + graduated = 0 + skipped = 0 + errors = 0 + entities_created: list[str] = [] + + for i, mem in enumerate(memories, 1): + if i > 1: + time.sleep(0.5) # light pacing, matches auto_triage + mid = mem["id"] + label = f"[{i:3d}/{len(memories)}] {mid[:8]} [{mem.get('memory_type','?')}]" + + decision = graduate_one(mem, args.model, DEFAULT_TIMEOUT_S) + if decision is None: + print(f" ERROR {label} (graduate_one returned None)") + errors += 1 + continue + + if not decision.get("graduate"): + reason = decision.get("reason", "(no reason)") + print(f" skip {label} {reason}") + skipped += 1 + continue + + etype = decision["entity_type"] + ename = decision["name"] + nrel = len(decision.get("relationships", [])) + + if args.dry_run: + print(f" WOULD {label} → [{etype}] {ename!r} ({nrel} rels)") + graduated += 1 + else: + entity_id = create_entity_candidate(args.base_url, decision, mem) + if entity_id: + print(f" CREATE {label} → [{etype}] {ename!r} ({nrel} rels) entity={entity_id[:8]}") + graduated += 1 + entities_created.append(entity_id) + else: + errors += 1 + + print(f"\ntotal: graduated={graduated} skipped={skipped} errors={errors}") + if entities_created: + print(f"Review at /admin/triage ({len(entities_created)} entity candidates created)") + + +if __name__ == "__main__": + main() diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index 66b77d6..30aa98e 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -1327,6 +1327,62 @@ def api_list_entities( } +@router.get("/admin/conflicts") +def api_list_conflicts(project: str | None = None) -> dict: + """Phase 5G: list open entity conflicts (optionally scoped to a project).""" + from atocore.engineering.conflicts import list_open_conflicts + conflicts = list_open_conflicts(project=project) + return {"conflicts": conflicts, "count": len(conflicts)} + + +class ConflictResolveRequest(BaseModel): + action: str # dismiss|supersede_others|no_action + winner_id: str | None = None + + +@router.post("/admin/conflicts/{conflict_id}/resolve") +def api_resolve_conflict(conflict_id: str, req: ConflictResolveRequest) -> dict: + """Resolve a conflict. Options: dismiss, supersede_others (needs winner_id), no_action.""" + from atocore.engineering.conflicts import resolve_conflict + try: + success = resolve_conflict( + conflict_id=conflict_id, + action=req.action, + winner_id=req.winner_id, + actor="api-http", + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + if not success: + raise HTTPException(status_code=404, detail=f"Conflict not found or already resolved: {conflict_id}") + return {"status": "resolved", "id": conflict_id, "action": req.action} + + +@router.get("/admin/graduation/stats") +def api_graduation_stats() -> dict: + """Phase 5F graduation stats for dashboard.""" + from atocore.models.database import get_connection + + with get_connection() as conn: + total_memories = int(conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'active'").fetchone()[0]) + graduated = int(conn.execute("SELECT COUNT(*) FROM memories WHERE status = 'graduated'").fetchone()[0]) + entity_candidates_from_mem = int(conn.execute( + "SELECT COUNT(*) FROM entities WHERE status = 'candidate' " + "AND source_refs LIKE '%memory:%'" + ).fetchone()[0]) + active_entities = int(conn.execute("SELECT COUNT(*) FROM entities WHERE status = 'active'").fetchone()[0]) + + return { + "active_memories": total_memories, + "graduated_memories": graduated, + "entity_candidates_from_memories": entity_candidates_from_mem, + "active_entities": active_entities, + "graduation_rate": ( + graduated / (total_memories + graduated) if (total_memories + graduated) > 0 else 0.0 + ), + } + + # --- Phase 5 Engineering V1: The 10 canonical queries --- diff --git a/src/atocore/context/builder.py b/src/atocore/context/builder.py index 3ce00b9..5ef4481 100644 --- a/src/atocore/context/builder.py +++ b/src/atocore/context/builder.py @@ -508,6 +508,23 @@ def _build_engineering_context( f" {direction} {rel.relationship_type} [{other.entity_type}] {other.name}" ) + # Phase 5H: append a compact gaps summary so the LLM always sees + # "what we're currently missing" alongside the entity neighborhood. + # This is the director's most-used insight — orphan requirements, + # risky decisions, unsupported claims — surfaced in every context pack + # for project-scoped queries. + try: + from atocore.engineering.queries import all_gaps as _all_gaps + gaps = _all_gaps(project) + orphan_n = gaps["orphan_requirements"]["count"] + risky_n = gaps["risky_decisions"]["count"] + unsup_n = gaps["unsupported_claims"]["count"] + if orphan_n or risky_n or unsup_n: + lines.append("") + lines.append(f"Gaps: {orphan_n} orphan reqs, {risky_n} risky decisions, {unsup_n} unsupported claims") + except Exception: + pass + lines.append("--- End Engineering Context ---") text = "\n".join(lines) diff --git a/src/atocore/engineering/_graduation_prompt.py b/src/atocore/engineering/_graduation_prompt.py new file mode 100644 index 0000000..b2d871b --- /dev/null +++ b/src/atocore/engineering/_graduation_prompt.py @@ -0,0 +1,194 @@ +"""Shared LLM prompt for memory → entity graduation (Phase 5F). + +Mirrors the pattern of ``atocore.memory._llm_prompt``: stdlib-only so both +the container extractor path and the host-side graduate_memories.py script +use the same system prompt and parser, eliminating drift. + +Graduation asks: "does this active memory describe a TYPED engineering entity +that belongs in the knowledge graph?" If yes, produce an entity candidate +with type + name + description + zero-or-more relationship hints. If no, +return null so the memory stays as-is. + +Design note: we DON'T ask the LLM to resolve targets of relationships (e.g., +"connect to Subsystem 'Optics'"). That's done in a second pass after human +review — partly to keep this prompt cheap, partly because name-matching +targets across projects is a hard problem worth its own pass. +""" + +from __future__ import annotations + +import json +from typing import Any + +GRADUATION_PROMPT_VERSION = "graduate-0.1.0" +MAX_CONTENT_CHARS = 1500 + +ENTITY_TYPES = { + "project", + "system", + "subsystem", + "component", + "interface", + "requirement", + "constraint", + "decision", + "material", + "parameter", + "analysis_model", + "result", + "validation_claim", + "vendor", + "process", +} + +SYSTEM_PROMPT = """You are a knowledge-graph curator for an engineering firm's context system (AtoCore). + +Your job: given one active MEMORY (a curated fact about an engineering project), decide whether it describes a TYPED engineering entity that belongs in the structured graph. If yes, emit the entity candidate. If no, return null. + +A memory gets graduated when its content names a specific thing that has lifecycle, relationships, or cross-references in engineering work. A memory stays as-is when it's a general observation, preference, or loose context. + +ENTITY TYPES (choose the best fit): + +- project — a named project (usually already registered; rare to emit) +- subsystem — a named chunk of a system with defined boundaries (e.g., "Primary Optics", "Cable Tensioning", "Motion Control") +- component — a discrete physical or logical part (e.g., "Primary Mirror", "Pivot Pin", "Z-axis Servo Drive") +- interface — a named boundary between two subsystems/components (e.g., "Mirror-to-Cell mounting interface") +- requirement — a "must" or "shall" statement (e.g., "Surface figure < 25nm RMS") +- constraint — a non-negotiable limit (e.g., "Thermal operating range 0-40°C") +- decision — a committed design direction (e.g., "Selected Zerodur over ULE for primary blank") +- material — a named material used in a component (e.g., "Zerodur", "Invar 36") +- parameter — a specific named value or assumption (e.g., "Ambient temperature 22°C", "Lead time 6 weeks") +- analysis_model — a named FEA / optical / thermal model (e.g., "Preston wear model v2") +- result — a named measurement or simulation output (e.g., "FEA thermal sweep 2026-03") +- validation_claim — an asserted claim to be backed by evidence (e.g., "Margin is adequate for full envelope") +- vendor — a supplier / partner entity (e.g., "Schott AG", "ABB Space", "Nabeel") +- process — a named workflow step (e.g., "Ion beam figuring pass", "Incoming inspection") +- system — whole project's system envelope (rare; usually project handles this) + +WHEN TO GRADUATE: + +GRADUATE if the memory clearly names one of these entities with enough detail to be useful. Examples: +- "Selected Zerodur for the p04 primary mirror blank" → 2 entities: decision(name="Select Zerodur for primary blank") + material(name="Zerodur") +- "ABB Space (INO) is the polishing vendor for p04" → vendor(name="ABB Space") +- "Surface figure target is < 25nm RMS after IBF" → requirement(name="Surface figure < 25nm RMS after IBF") +- "The Preston model assumes 5N min contact pressure" → parameter(name="Preston min contact pressure = 5N") + +DON'T GRADUATE if the memory is: +- A preference or work-style note (those stay as memories) +- A session observation ("we tested X today") — no durable typed thing +- A general insight / rule of thumb ("Always calibrate before measuring") +- An OpenClaw MEMORY.md import of conversational history +- Something where you can't pick a clear entity type with confidence + +OUTPUT FORMAT — exactly one JSON object: + +If graduating, emit: +{ + "graduate": true, + "entity_type": "component|requirement|decision|...", + "name": "short noun phrase, <60 chars", + "description": "one-sentence description that adds context beyond the name", + "confidence": 0.0-1.0, + "relationships": [ + {"rel_type": "part_of|satisfies|uses_material|based_on_assumption|constrained_by|affected_by_decision|supports|evidenced_by|described_by", "target_hint": "name of the target entity (human will resolve)"} + ] +} + +If not graduating, emit: +{"graduate": false, "reason": "one-sentence reason"} + +Rules: +- Output ONLY the JSON object, no markdown, no prose +- name MUST be <60 chars and specific; reject vague names like "the system" +- confidence: 0.6-0.7 is typical. Raise to 0.8+ only if the memory is very specific and unambiguous. +- relationships array can be empty +- target_hint is a free-text name; the human-review stage will resolve it to an actual entity id (or reject if the target doesn't exist yet) +- If the memory describes MULTIPLE entities, pick the single most important one; a second pass can catch the others +""" + + +def build_user_message(memory_content: str, memory_project: str, memory_type: str) -> str: + return ( + f"MEMORY PROJECT: {memory_project or '(unscoped)'}\n" + f"MEMORY TYPE: {memory_type}\n\n" + f"MEMORY CONTENT:\n{memory_content[:MAX_CONTENT_CHARS]}\n\n" + "Return the JSON decision now." + ) + + +def parse_graduation_output(raw: str) -> dict[str, Any] | None: + """Parse the LLM's graduation decision. Return None on any parse error. + + On success returns the normalized decision dict with keys: + graduate (bool), entity_type (str), name (str), description (str), + confidence (float), relationships (list of {rel_type, target_hint}) + OR {"graduate": false, "reason": "..."} + """ + text = (raw or "").strip() + if not text: + return None + if text.startswith("```"): + text = text.strip("`") + nl = text.find("\n") + if nl >= 0: + text = text[nl + 1:] + if text.endswith("```"): + text = text[:-3] + text = text.strip() + + # Tolerate leading prose + if not text.lstrip().startswith("{"): + start = text.find("{") + end = text.rfind("}") + if start >= 0 and end > start: + text = text[start:end + 1] + + try: + parsed = json.loads(text) + except json.JSONDecodeError: + return None + + if not isinstance(parsed, dict): + return None + + graduate = bool(parsed.get("graduate", False)) + if not graduate: + return {"graduate": False, "reason": str(parsed.get("reason", ""))[:200]} + + entity_type = str(parsed.get("entity_type") or "").strip().lower() + if entity_type not in ENTITY_TYPES: + return None + + name = str(parsed.get("name") or "").strip() + if not name or len(name) > 120: + return None + + description = str(parsed.get("description") or "").strip()[:500] + + try: + confidence = float(parsed.get("confidence", 0.6)) + except (TypeError, ValueError): + confidence = 0.6 + confidence = max(0.0, min(1.0, confidence)) + + raw_rels = parsed.get("relationships") or [] + if not isinstance(raw_rels, list): + raw_rels = [] + relationships: list[dict] = [] + for r in raw_rels[:10]: + if not isinstance(r, dict): + continue + rtype = str(r.get("rel_type") or "").strip().lower() + target = str(r.get("target_hint") or "").strip() + if not rtype or not target: + continue + relationships.append({"rel_type": rtype, "target_hint": target[:120]}) + + return { + "graduate": True, + "entity_type": entity_type, + "name": name, + "description": description, + "confidence": confidence, + "relationships": relationships, + } diff --git a/src/atocore/engineering/conflicts.py b/src/atocore/engineering/conflicts.py new file mode 100644 index 0000000..a0d393e --- /dev/null +++ b/src/atocore/engineering/conflicts.py @@ -0,0 +1,291 @@ +"""Phase 5G — Conflict detection on entity promote. + +When a candidate entity is promoted to active, we check whether another +active entity is already claiming the "same slot" with an incompatible +value. If so, we emit a conflicts row + conflict_members rows so the +human can resolve. + +Slot keys are per-entity-type (from ``conflict-model.md``). V1 starts +narrow with 3 slot kinds to avoid false positives: + +1. **component.material** — a component should normally have ONE + dominant material (via USES_MATERIAL edge). Two active USES_MATERIAL + edges from the same component pointing at different materials = + conflict. +2. **component.part_of** — a component should belong to AT MOST one + subsystem (via PART_OF). Two active PART_OF edges = conflict. +3. **requirement.value** — two active Requirements with the same name in + the same project but different descriptions = conflict. + +Rule: **flag, never block**. The promote succeeds; the conflict row is +just a flag for the human. Users see conflicts in the dashboard and on +wiki entity pages with a "⚠️ Disputed" badge. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone + +from atocore.models.database import get_connection +from atocore.observability.logger import get_logger + +log = get_logger("conflicts") + + +def detect_conflicts_for_entity(entity_id: str) -> list[str]: + """Run conflict detection for a newly-promoted active entity. + + Returns a list of conflict_ids created. Fail-open: any detection error + is logged and returns an empty list; the promote itself is not affected. + """ + try: + with get_connection() as conn: + row = conn.execute( + "SELECT * FROM entities WHERE id = ? AND status = 'active'", + (entity_id,), + ).fetchone() + if row is None: + return [] + + created: list[str] = [] + etype = row["entity_type"] + project = row["project"] or "" + + if etype == "component": + created.extend(_check_component_conflicts(entity_id, project)) + elif etype == "requirement": + created.extend(_check_requirement_conflicts(entity_id, row["name"], project)) + + return created + except Exception as e: + log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e)) + return [] + + +def _check_component_conflicts(component_id: str, project: str) -> list[str]: + """Check material + part_of slot uniqueness for a component.""" + created: list[str] = [] + with get_connection() as conn: + # component.material conflicts + mat_edges = conn.execute( + "SELECT r.id AS rel_id, r.target_entity_id, e.name " + "FROM relationships r " + "JOIN entities e ON e.id = r.target_entity_id " + "WHERE r.source_entity_id = ? AND r.relationship_type = 'uses_material' " + "AND e.status = 'active'", + (component_id,), + ).fetchall() + if len(mat_edges) > 1: + cid = _record_conflict( + slot_kind="component.material", + slot_key=component_id, + project=project, + note=f"component has {len(mat_edges)} active material edges", + members=[ + { + "kind": "entity", + "id": m["target_entity_id"], + "snapshot": m["name"], + } + for m in mat_edges + ], + ) + if cid: + created.append(cid) + + # component.part_of conflicts + pof_edges = conn.execute( + "SELECT r.id AS rel_id, r.target_entity_id, e.name " + "FROM relationships r " + "JOIN entities e ON e.id = r.target_entity_id " + "WHERE r.source_entity_id = ? AND r.relationship_type = 'part_of' " + "AND e.status = 'active'", + (component_id,), + ).fetchall() + if len(pof_edges) > 1: + cid = _record_conflict( + slot_kind="component.part_of", + slot_key=component_id, + project=project, + note=f"component is part_of {len(pof_edges)} subsystems", + members=[ + { + "kind": "entity", + "id": p["target_entity_id"], + "snapshot": p["name"], + } + for p in pof_edges + ], + ) + if cid: + created.append(cid) + + return created + + +def _check_requirement_conflicts(requirement_id: str, name: str, project: str) -> list[str]: + """Two active Requirements with the same name in the same project.""" + with get_connection() as conn: + peers = conn.execute( + "SELECT id, description FROM entities " + "WHERE entity_type = 'requirement' AND status = 'active' " + "AND project = ? AND LOWER(name) = LOWER(?) AND id != ?", + (project, name, requirement_id), + ).fetchall() + if not peers: + return [] + + members = [{"kind": "entity", "id": requirement_id, "snapshot": name}] + for p in peers: + members.append({"kind": "entity", "id": p["id"], + "snapshot": (p["description"] or "")[:200]}) + + cid = _record_conflict( + slot_kind="requirement.name", + slot_key=f"{project}|{name.lower()}", + project=project, + note=f"{len(peers)+1} active requirements share the name '{name}'", + members=members, + ) + return [cid] if cid else [] + + +def _record_conflict( + slot_kind: str, + slot_key: str, + project: str, + note: str, + members: list[dict], +) -> str | None: + """Persist a conflict + its members; skip if an open conflict already + exists for the same (slot_kind, slot_key).""" + try: + with get_connection() as conn: + existing = conn.execute( + "SELECT id FROM conflicts WHERE slot_kind = ? AND slot_key = ? " + "AND status = 'open'", + (slot_kind, slot_key), + ).fetchone() + if existing: + return None # don't dup + + conflict_id = str(uuid.uuid4()) + conn.execute( + "INSERT INTO conflicts (id, slot_kind, slot_key, project, " + "status, note) VALUES (?, ?, ?, ?, 'open', ?)", + (conflict_id, slot_kind, slot_key, project, note[:500]), + ) + for m in members: + conn.execute( + "INSERT INTO conflict_members (id, conflict_id, member_kind, " + "member_id, value_snapshot) VALUES (?, ?, ?, ?, ?)", + (str(uuid.uuid4()), conflict_id, + m.get("kind", "entity"), m.get("id", ""), + (m.get("snapshot") or "")[:500]), + ) + + log.info("conflict_detected", conflict_id=conflict_id, + slot_kind=slot_kind, project=project) + + # Emit a warning alert so the operator sees it + try: + from atocore.observability.alerts import emit_alert + emit_alert( + severity="warning", + title=f"Entity conflict: {slot_kind}", + message=note, + context={"project": project, "slot_key": slot_key, + "member_count": len(members)}, + ) + except Exception: + pass + + return conflict_id + except Exception as e: + log.warning("conflict_record_failed", error=str(e)) + return None + + +def list_open_conflicts(project: str | None = None) -> list[dict]: + """Return open conflicts with their members.""" + with get_connection() as conn: + query = "SELECT * FROM conflicts WHERE status = 'open'" + params: list = [] + if project: + query += " AND project = ?" + params.append(project) + query += " ORDER BY detected_at DESC" + rows = conn.execute(query, params).fetchall() + + conflicts = [] + for r in rows: + member_rows = conn.execute( + "SELECT * FROM conflict_members WHERE conflict_id = ?", + (r["id"],), + ).fetchall() + conflicts.append({ + "id": r["id"], + "slot_kind": r["slot_kind"], + "slot_key": r["slot_key"], + "project": r["project"] or "", + "status": r["status"], + "note": r["note"] or "", + "detected_at": r["detected_at"], + "members": [ + { + "id": m["id"], + "member_kind": m["member_kind"], + "member_id": m["member_id"], + "snapshot": m["value_snapshot"] or "", + } + for m in member_rows + ], + }) + return conflicts + + +def resolve_conflict( + conflict_id: str, + action: str, # "dismiss", "supersede_others", "no_action" + winner_id: str | None = None, + actor: str = "api", +) -> bool: + """Resolve a conflict. Optionally marks non-winner members as superseded.""" + if action not in ("dismiss", "supersede_others", "no_action"): + raise ValueError(f"Invalid action: {action}") + + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + with get_connection() as conn: + row = conn.execute( + "SELECT * FROM conflicts WHERE id = ?", (conflict_id,) + ).fetchone() + if row is None or row["status"] != "open": + return False + + if action == "supersede_others": + if not winner_id: + raise ValueError("winner_id required for supersede_others") + # Mark non-winner member entities as superseded + member_rows = conn.execute( + "SELECT member_id FROM conflict_members WHERE conflict_id = ?", + (conflict_id,), + ).fetchall() + for m in member_rows: + if m["member_id"] != winner_id: + conn.execute( + "UPDATE entities SET status = 'superseded', updated_at = ? " + "WHERE id = ? AND status = 'active'", + (now, m["member_id"]), + ) + + conn.execute( + "UPDATE conflicts SET status = 'resolved', resolution = ?, " + "resolved_at = ? WHERE id = ?", + (action, now, conflict_id), + ) + + log.info("conflict_resolved", conflict_id=conflict_id, + action=action, actor=actor) + return True diff --git a/src/atocore/engineering/service.py b/src/atocore/engineering/service.py index bd5ae2f..4b02996 100644 --- a/src/atocore/engineering/service.py +++ b/src/atocore/engineering/service.py @@ -334,14 +334,85 @@ def _set_entity_status( def promote_entity(entity_id: str, actor: str = "api", note: str = "") -> bool: - """Promote a candidate entity to active.""" - with get_connection() as conn: - row = conn.execute( - "SELECT status FROM entities WHERE id = ?", (entity_id,) - ).fetchone() - if row is None or row["status"] != "candidate": + """Promote a candidate entity to active. + + Phase 5F graduation hook: if this entity has source_refs pointing at + memories (format "memory:"), mark those source memories as + ``status=graduated`` and set their ``graduated_to_entity_id`` forward + pointer. This preserves the memory as an immutable historical record + while signalling that it's been absorbed into the typed graph. + """ + entity = get_entity(entity_id) + if entity is None or entity.status != "candidate": return False - return _set_entity_status(entity_id, "active", actor=actor, note=note) + + ok = _set_entity_status(entity_id, "active", actor=actor, note=note) + if not ok: + return False + + # Phase 5F: mark source memories as graduated + memory_ids = [ + ref.split(":", 1)[1] + for ref in (entity.source_refs or []) + if isinstance(ref, str) and ref.startswith("memory:") + ] + if memory_ids: + _graduate_source_memories(memory_ids, entity_id, actor=actor) + + # Phase 5G: sync conflict detection on promote. Fail-open — detection + # errors log but never undo the successful promote. + try: + from atocore.engineering.conflicts import detect_conflicts_for_entity + detect_conflicts_for_entity(entity_id) + except Exception as e: + log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e)) + + return True + + +def _graduate_source_memories(memory_ids: list[str], entity_id: str, actor: str) -> None: + """Mark source memories as graduated and set forward pointer.""" + if not memory_ids: + return + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + with get_connection() as conn: + for mid in memory_ids: + try: + row = conn.execute( + "SELECT status FROM memories WHERE id = ?", (mid,) + ).fetchone() + if row is None: + continue + old_status = row["status"] + if old_status == "graduated": + continue # already graduated — maybe by a different entity + conn.execute( + "UPDATE memories SET status = 'graduated', " + "graduated_to_entity_id = ?, updated_at = ? WHERE id = ?", + (entity_id, now, mid), + ) + # Write a memory_audit row for the graduation + conn.execute( + "INSERT INTO memory_audit (id, memory_id, action, actor, " + "before_json, after_json, note, entity_kind) " + "VALUES (?, ?, 'graduated', ?, ?, ?, ?, 'memory')", + ( + str(uuid.uuid4()), + mid, + actor or "api", + json.dumps({"status": old_status}), + json.dumps({ + "status": "graduated", + "graduated_to_entity_id": entity_id, + }), + f"graduated to entity {entity_id[:8]}", + ), + ) + log.info("memory_graduated", memory_id=mid, + entity_id=entity_id, old_status=old_status) + except Exception as e: + log.warning("memory_graduation_failed", + memory_id=mid, entity_id=entity_id, error=str(e)) def reject_entity_candidate(entity_id: str, actor: str = "api", note: str = "") -> bool: diff --git a/tests/test_engineering_v1_phase5.py b/tests/test_engineering_v1_phase5.py new file mode 100644 index 0000000..f0eea40 --- /dev/null +++ b/tests/test_engineering_v1_phase5.py @@ -0,0 +1,246 @@ +"""Phase 5F + 5G + 5H tests — graduation, conflicts, MCP tools.""" + +from __future__ import annotations + +import pytest + +from atocore.engineering.conflicts import ( + detect_conflicts_for_entity, + list_open_conflicts, + resolve_conflict, +) +from atocore.engineering._graduation_prompt import ( + build_user_message, + parse_graduation_output, +) +from atocore.engineering.service import ( + create_entity, + create_relationship, + get_entity, + init_engineering_schema, + promote_entity, +) +from atocore.memory.service import create_memory +from atocore.models.database import get_connection, init_db + + +# --- 5F Memory graduation --- + + +def test_graduation_prompt_parses_positive_decision(): + raw = """ + {"graduate": true, "entity_type": "component", "name": "Primary Mirror", + "description": "The 1.2m primary mirror for p04", "confidence": 0.85, + "relationships": [{"rel_type": "part_of", "target_hint": "Optics Subsystem"}]} + """ + decision = parse_graduation_output(raw) + assert decision is not None + assert decision["graduate"] is True + assert decision["entity_type"] == "component" + assert decision["name"] == "Primary Mirror" + assert decision["confidence"] == 0.85 + assert decision["relationships"] == [ + {"rel_type": "part_of", "target_hint": "Optics Subsystem"} + ] + + +def test_graduation_prompt_parses_negative_decision(): + raw = '{"graduate": false, "reason": "conversational filler, no typed entity"}' + decision = parse_graduation_output(raw) + assert decision is not None + assert decision["graduate"] is False + assert "filler" in decision["reason"] + + +def test_graduation_prompt_rejects_unknown_entity_type(): + raw = '{"graduate": true, "entity_type": "quantum_thing", "name": "x"}' + assert parse_graduation_output(raw) is None + + +def test_graduation_prompt_tolerates_markdown_fences(): + raw = '```json\n{"graduate": false, "reason": "ok"}\n```' + d = parse_graduation_output(raw) + assert d is not None + assert d["graduate"] is False + + +def test_promote_entity_marks_source_memory_graduated(tmp_data_dir): + init_db() + init_engineering_schema() + mem = create_memory("knowledge", "The Primary Mirror is 1.2m Zerodur", + project="p-test", status="active") + # Create entity candidate pointing back to the memory + ent = create_entity( + "component", + "Primary Mirror", + project="p-test", + status="candidate", + source_refs=[f"memory:{mem.id}"], + ) + # Promote + assert promote_entity(ent.id, actor="test-triage") + + # Memory should now be graduated with forward pointer + with get_connection() as conn: + row = conn.execute( + "SELECT status, graduated_to_entity_id FROM memories WHERE id = ?", + (mem.id,), + ).fetchone() + assert row["status"] == "graduated" + assert row["graduated_to_entity_id"] == ent.id + + +def test_promote_entity_without_memory_refs_no_graduation(tmp_data_dir): + """Entity not backed by any memory — promote still works, no graduation.""" + init_db() + init_engineering_schema() + ent = create_entity("component", "Orphan", project="p-test", status="candidate") + assert promote_entity(ent.id) + assert get_entity(ent.id).status == "active" + + +# --- 5G Conflict detection --- + + +def test_component_material_conflict_detected(tmp_data_dir): + init_db() + init_engineering_schema() + c = create_entity("component", "Mirror", project="p-test") + m1 = create_entity("material", "Zerodur", project="p-test") + m2 = create_entity("material", "ULE", project="p-test") + create_relationship(c.id, m1.id, "uses_material") + create_relationship(c.id, m2.id, "uses_material") + + detected = detect_conflicts_for_entity(c.id) + assert len(detected) == 1 + + conflicts = list_open_conflicts(project="p-test") + assert any(c["slot_kind"] == "component.material" for c in conflicts) + conflict = next(c for c in conflicts if c["slot_kind"] == "component.material") + assert len(conflict["members"]) == 2 + + +def test_component_part_of_conflict_detected(tmp_data_dir): + init_db() + init_engineering_schema() + c = create_entity("component", "MultiPart", project="p-test") + s1 = create_entity("subsystem", "Mechanical", project="p-test") + s2 = create_entity("subsystem", "Optical", project="p-test") + create_relationship(c.id, s1.id, "part_of") + create_relationship(c.id, s2.id, "part_of") + + detected = detect_conflicts_for_entity(c.id) + assert len(detected) == 1 + conflicts = list_open_conflicts(project="p-test") + assert any(c["slot_kind"] == "component.part_of" for c in conflicts) + + +def test_requirement_name_conflict_detected(tmp_data_dir): + init_db() + init_engineering_schema() + r1 = create_entity("requirement", "Surface figure < 25nm", + project="p-test", description="Primary mirror spec") + r2 = create_entity("requirement", "Surface figure < 25nm", + project="p-test", description="Different interpretation") + + detected = detect_conflicts_for_entity(r2.id) + assert len(detected) == 1 + conflicts = list_open_conflicts(project="p-test") + assert any(c["slot_kind"] == "requirement.name" for c in conflicts) + + +def test_conflict_not_detected_for_clean_component(tmp_data_dir): + init_db() + init_engineering_schema() + c = create_entity("component", "Clean", project="p-test") + m = create_entity("material", "Zerodur", project="p-test") + create_relationship(c.id, m.id, "uses_material") + + detected = detect_conflicts_for_entity(c.id) + assert detected == [] + + +def test_conflict_resolution_supersedes_losers(tmp_data_dir): + init_db() + init_engineering_schema() + c = create_entity("component", "Mirror2", project="p-test") + m1 = create_entity("material", "Zerodur2", project="p-test") + m2 = create_entity("material", "ULE2", project="p-test") + create_relationship(c.id, m1.id, "uses_material") + create_relationship(c.id, m2.id, "uses_material") + + detected = detect_conflicts_for_entity(c.id) + conflict_id = detected[0] + + # Resolve by picking m1 as the winner + assert resolve_conflict(conflict_id, "supersede_others", winner_id=m1.id) + + # m2 should now be superseded; m1 stays active + assert get_entity(m1.id).status == "active" + assert get_entity(m2.id).status == "superseded" + + # Conflict should be marked resolved + open_conflicts = list_open_conflicts(project="p-test") + assert not any(c["id"] == conflict_id for c in open_conflicts) + + +def test_conflict_resolution_dismiss_leaves_entities_alone(tmp_data_dir): + init_db() + init_engineering_schema() + r1 = create_entity("requirement", "Dup req", project="p-test", + description="first meaning") + r2 = create_entity("requirement", "Dup req", project="p-test", + description="second meaning") + detected = detect_conflicts_for_entity(r2.id) + conflict_id = detected[0] + + assert resolve_conflict(conflict_id, "dismiss") + # Both still active — dismiss just clears the conflict marker + assert get_entity(r1.id).status == "active" + assert get_entity(r2.id).status == "active" + + +def test_deduplicate_conflicts_for_same_slot(tmp_data_dir): + """Running detection twice on the same entity shouldn't dup the conflict row.""" + init_db() + init_engineering_schema() + c = create_entity("component", "Dup", project="p-test") + m1 = create_entity("material", "A", project="p-test") + m2 = create_entity("material", "B", project="p-test") + create_relationship(c.id, m1.id, "uses_material") + create_relationship(c.id, m2.id, "uses_material") + + detect_conflicts_for_entity(c.id) + detect_conflicts_for_entity(c.id) # should be a no-op + + conflicts = list_open_conflicts(project="p-test") + mat_conflicts = [c for c in conflicts if c["slot_kind"] == "component.material"] + assert len(mat_conflicts) == 1 + + +def test_promote_triggers_conflict_detection(tmp_data_dir): + """End-to-end: promoting a candidate component with 2 active material edges + triggers conflict detection.""" + init_db() + init_engineering_schema() + + c = create_entity("component", "AutoFlag", project="p-test", status="candidate") + m1 = create_entity("material", "X1", project="p-test") + m2 = create_entity("material", "X2", project="p-test") + create_relationship(c.id, m1.id, "uses_material") + create_relationship(c.id, m2.id, "uses_material") + + promote_entity(c.id, actor="test") + + conflicts = list_open_conflicts(project="p-test") + assert any(c["slot_kind"] == "component.material" for c in conflicts) + + +# --- 5H MCP tool shape checks (via build_user_message) --- + + +def test_graduation_user_message_includes_project_and_type(): + msg = build_user_message("some content", "p04-gigabit", "project") + assert "p04-gigabit" in msg + assert "project" in msg + assert "some content" in msg