"""Phase 5G — Conflict detection on entity promote. When a candidate entity is promoted to active, we check whether another active entity is already claiming the "same slot" with an incompatible value. If so, we emit a conflicts row + conflict_members rows so the human can resolve. Slot keys are per-entity-type (from ``conflict-model.md``). V1 starts narrow with 3 slot kinds to avoid false positives: 1. **component.material** — a component should normally have ONE dominant material (via USES_MATERIAL edge). Two active USES_MATERIAL edges from the same component pointing at different materials = conflict. 2. **component.part_of** — a component should belong to AT MOST one subsystem (via PART_OF). Two active PART_OF edges = conflict. 3. **requirement.value** — two active Requirements with the same name in the same project but different descriptions = conflict. Rule: **flag, never block**. The promote succeeds; the conflict row is just a flag for the human. Users see conflicts in the dashboard and on wiki entity pages with a "⚠️ Disputed" badge. """ from __future__ import annotations import uuid from datetime import datetime, timezone from atocore.models.database import get_connection from atocore.observability.logger import get_logger log = get_logger("conflicts") def detect_conflicts_for_entity(entity_id: str) -> list[str]: """Run conflict detection for a newly-promoted active entity. Returns a list of conflict_ids created. Fail-open: any detection error is logged and returns an empty list; the promote itself is not affected. """ try: with get_connection() as conn: row = conn.execute( "SELECT * FROM entities WHERE id = ? AND status = 'active'", (entity_id,), ).fetchone() if row is None: return [] created: list[str] = [] etype = row["entity_type"] project = row["project"] or "" if etype == "component": created.extend(_check_component_conflicts(entity_id, project)) elif etype == "requirement": created.extend(_check_requirement_conflicts(entity_id, row["name"], project)) return created except Exception as e: log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e)) return [] def _check_component_conflicts(component_id: str, project: str) -> list[str]: """Check material + part_of slot uniqueness for a component.""" created: list[str] = [] with get_connection() as conn: # component.material conflicts mat_edges = conn.execute( "SELECT r.id AS rel_id, r.target_entity_id, e.name " "FROM relationships r " "JOIN entities e ON e.id = r.target_entity_id " "WHERE r.source_entity_id = ? AND r.relationship_type = 'uses_material' " "AND e.status = 'active'", (component_id,), ).fetchall() if len(mat_edges) > 1: cid = _record_conflict( slot_kind="component.material", slot_key=component_id, project=project, note=f"component has {len(mat_edges)} active material edges", members=[ { "kind": "entity", "id": m["target_entity_id"], "snapshot": m["name"], } for m in mat_edges ], ) if cid: created.append(cid) # component.part_of conflicts pof_edges = conn.execute( "SELECT r.id AS rel_id, r.target_entity_id, e.name " "FROM relationships r " "JOIN entities e ON e.id = r.target_entity_id " "WHERE r.source_entity_id = ? AND r.relationship_type = 'part_of' " "AND e.status = 'active'", (component_id,), ).fetchall() if len(pof_edges) > 1: cid = _record_conflict( slot_kind="component.part_of", slot_key=component_id, project=project, note=f"component is part_of {len(pof_edges)} subsystems", members=[ { "kind": "entity", "id": p["target_entity_id"], "snapshot": p["name"], } for p in pof_edges ], ) if cid: created.append(cid) return created def _check_requirement_conflicts(requirement_id: str, name: str, project: str) -> list[str]: """Two active Requirements with the same name in the same project.""" with get_connection() as conn: peers = conn.execute( "SELECT id, description FROM entities " "WHERE entity_type = 'requirement' AND status = 'active' " "AND project = ? AND LOWER(name) = LOWER(?) AND id != ?", (project, name, requirement_id), ).fetchall() if not peers: return [] members = [{"kind": "entity", "id": requirement_id, "snapshot": name}] for p in peers: members.append({"kind": "entity", "id": p["id"], "snapshot": (p["description"] or "")[:200]}) cid = _record_conflict( slot_kind="requirement.name", slot_key=f"{project}|{name.lower()}", project=project, note=f"{len(peers)+1} active requirements share the name '{name}'", members=members, ) return [cid] if cid else [] def _record_conflict( slot_kind: str, slot_key: str, project: str, note: str, members: list[dict], ) -> str | None: """Persist a conflict + its members; skip if an open conflict already exists for the same (slot_kind, slot_key).""" try: with get_connection() as conn: existing = conn.execute( "SELECT id FROM conflicts WHERE slot_kind = ? AND slot_key = ? " "AND status = 'open'", (slot_kind, slot_key), ).fetchone() if existing: return None # don't dup conflict_id = str(uuid.uuid4()) conn.execute( "INSERT INTO conflicts (id, slot_kind, slot_key, project, " "status, note) VALUES (?, ?, ?, ?, 'open', ?)", (conflict_id, slot_kind, slot_key, project, note[:500]), ) for m in members: conn.execute( "INSERT INTO conflict_members (id, conflict_id, member_kind, " "member_id, value_snapshot) VALUES (?, ?, ?, ?, ?)", (str(uuid.uuid4()), conflict_id, m.get("kind", "entity"), m.get("id", ""), (m.get("snapshot") or "")[:500]), ) log.info("conflict_detected", conflict_id=conflict_id, slot_kind=slot_kind, project=project) # Emit a warning alert so the operator sees it try: from atocore.observability.alerts import emit_alert emit_alert( severity="warning", title=f"Entity conflict: {slot_kind}", message=note, context={"project": project, "slot_key": slot_key, "member_count": len(members)}, ) except Exception: pass return conflict_id except Exception as e: log.warning("conflict_record_failed", error=str(e)) return None def list_open_conflicts(project: str | None = None) -> list[dict]: """Return open conflicts with their members.""" with get_connection() as conn: query = "SELECT * FROM conflicts WHERE status = 'open'" params: list = [] if project: query += " AND project = ?" params.append(project) query += " ORDER BY detected_at DESC" rows = conn.execute(query, params).fetchall() conflicts = [] for r in rows: member_rows = conn.execute( "SELECT * FROM conflict_members WHERE conflict_id = ?", (r["id"],), ).fetchall() conflicts.append({ "id": r["id"], "slot_kind": r["slot_kind"], "slot_key": r["slot_key"], "project": r["project"] or "", "status": r["status"], "note": r["note"] or "", "detected_at": r["detected_at"], "members": [ { "id": m["id"], "member_kind": m["member_kind"], "member_id": m["member_id"], "snapshot": m["value_snapshot"] or "", } for m in member_rows ], }) return conflicts def resolve_conflict( conflict_id: str, action: str, # "dismiss", "supersede_others", "no_action" winner_id: str | None = None, actor: str = "api", ) -> bool: """Resolve a conflict. Optionally marks non-winner members as superseded.""" if action not in ("dismiss", "supersede_others", "no_action"): raise ValueError(f"Invalid action: {action}") now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") with get_connection() as conn: row = conn.execute( "SELECT * FROM conflicts WHERE id = ?", (conflict_id,) ).fetchone() if row is None or row["status"] != "open": return False if action == "supersede_others": if not winner_id: raise ValueError("winner_id required for supersede_others") # Mark non-winner member entities as superseded member_rows = conn.execute( "SELECT member_id FROM conflict_members WHERE conflict_id = ?", (conflict_id,), ).fetchall() for m in member_rows: if m["member_id"] != winner_id: conn.execute( "UPDATE entities SET status = 'superseded', updated_at = ? " "WHERE id = ? AND status = 'active'", (now, m["member_id"]), ) conn.execute( "UPDATE conflicts SET status = 'resolved', resolution = ?, " "resolved_at = ? WHERE id = ?", (action, now, conflict_id), ) log.info("conflict_resolved", conflict_id=conflict_id, action=action, actor=actor) return True