"""Engineering entity and relationship CRUD.""" from __future__ import annotations import json import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from atocore.models.database import get_connection from atocore.observability.logger import get_logger from atocore.projects.registry import resolve_project_name log = get_logger("engineering") ENTITY_TYPES = [ "project", "system", "subsystem", "component", "interface", "requirement", "constraint", "decision", "material", "parameter", "analysis_model", "result", "validation_claim", "vendor", "process", ] RELATIONSHIP_TYPES = [ # Structural family "contains", "part_of", "interfaces_with", # Intent family "satisfies", "constrained_by", "affected_by_decision", "based_on_assumption", # Phase 5 — Q-009 killer query "supersedes", # Validation family "analyzed_by", "validated_by", "supports", # Phase 5 — Q-011 killer query "conflicts_with", # Phase 5 — Q-012 future "depends_on", # Provenance family "described_by", "updated_by_session", # Phase 5 — session→entity provenance "evidenced_by", # Phase 5 — Q-017 evidence trace "summarized_in", # Phase 5 — mirror caches # Domain-specific (pre-existing, retained) "uses_material", ] ENTITY_STATUSES = ["candidate", "active", "superseded", "invalid"] @dataclass class Entity: id: str entity_type: str name: str project: str description: str = "" properties: dict = field(default_factory=dict) status: str = "active" confidence: float = 1.0 source_refs: list[str] = field(default_factory=list) created_at: str = "" updated_at: str = "" @dataclass class Relationship: id: str source_entity_id: str target_entity_id: str relationship_type: str confidence: float = 1.0 source_refs: list[str] = field(default_factory=list) created_at: str = "" def init_engineering_schema() -> None: with get_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS entities ( id TEXT PRIMARY KEY, entity_type TEXT NOT NULL, name TEXT NOT NULL, project TEXT NOT NULL DEFAULT '', description TEXT NOT NULL DEFAULT '', properties TEXT NOT NULL DEFAULT '{}', status TEXT NOT NULL DEFAULT 'active', confidence REAL NOT NULL DEFAULT 1.0, source_refs TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS relationships ( id TEXT PRIMARY KEY, source_entity_id TEXT NOT NULL, target_entity_id TEXT NOT NULL, relationship_type TEXT NOT NULL, confidence REAL NOT NULL DEFAULT 1.0, source_refs TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (source_entity_id) REFERENCES entities(id), FOREIGN KEY (target_entity_id) REFERENCES entities(id) ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_entities_project ON entities(project) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(entity_type) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source_entity_id) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target_entity_id) """) log.info("engineering_schema_initialized") def create_entity( entity_type: str, name: str, project: str = "", description: str = "", properties: dict | None = None, status: str = "active", confidence: float = 1.0, source_refs: list[str] | None = None, actor: str = "api", ) -> Entity: if entity_type not in ENTITY_TYPES: raise ValueError(f"Invalid entity type: {entity_type}. Must be one of {ENTITY_TYPES}") if status not in ENTITY_STATUSES: raise ValueError(f"Invalid status: {status}. Must be one of {ENTITY_STATUSES}") if not name or not name.strip(): raise ValueError("Entity name must be non-empty") # Phase 5: enforce project canonicalization contract at the write seam. # Aliases like "p04" become "p04-gigabit" so downstream reads stay # consistent with the registry. project = resolve_project_name(project) if project else "" entity_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") props = properties or {} refs = source_refs or [] with get_connection() as conn: conn.execute( """INSERT INTO entities (id, entity_type, name, project, description, properties, status, confidence, source_refs, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( entity_id, entity_type, name.strip(), project, description, json.dumps(props), status, confidence, json.dumps(refs), now, now, ), ) log.info("entity_created", entity_id=entity_id, entity_type=entity_type, name=name) # Phase 5: entity audit rows share the memory_audit table via # entity_kind="entity" discriminator. Same infrastructure, unified history. _audit_entity( entity_id=entity_id, action="created", actor=actor, after={ "entity_type": entity_type, "name": name.strip(), "project": project, "status": status, "confidence": confidence, }, ) return Entity( id=entity_id, entity_type=entity_type, name=name.strip(), project=project, description=description, properties=props, status=status, confidence=confidence, source_refs=refs, created_at=now, updated_at=now, ) def _audit_entity( entity_id: str, action: str, actor: str = "api", before: dict | None = None, after: dict | None = None, note: str = "", ) -> None: """Append an entity mutation row to the shared memory_audit table.""" try: with get_connection() as conn: conn.execute( "INSERT INTO memory_audit (id, memory_id, action, actor, " "before_json, after_json, note, entity_kind) " "VALUES (?, ?, ?, ?, ?, ?, ?, 'entity')", ( str(uuid.uuid4()), entity_id, action, actor or "api", json.dumps(before or {}), json.dumps(after or {}), (note or "")[:500], ), ) except Exception as e: log.warning("entity_audit_failed", entity_id=entity_id, action=action, error=str(e)) def create_relationship( source_entity_id: str, target_entity_id: str, relationship_type: str, confidence: float = 1.0, source_refs: list[str] | None = None, ) -> Relationship: if relationship_type not in RELATIONSHIP_TYPES: raise ValueError(f"Invalid relationship type: {relationship_type}") rel_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") refs = source_refs or [] with get_connection() as conn: conn.execute( """INSERT INTO relationships (id, source_entity_id, target_entity_id, relationship_type, confidence, source_refs, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (rel_id, source_entity_id, target_entity_id, relationship_type, confidence, json.dumps(refs), now), ) log.info( "relationship_created", rel_id=rel_id, source=source_entity_id, target=target_entity_id, rel_type=relationship_type, ) # Phase 5: relationship audit as an entity action on the source _audit_entity( entity_id=source_entity_id, action="relationship_added", actor="api", after={ "rel_id": rel_id, "rel_type": relationship_type, "target": target_entity_id, }, ) return Relationship( id=rel_id, source_entity_id=source_entity_id, target_entity_id=target_entity_id, relationship_type=relationship_type, confidence=confidence, source_refs=refs, created_at=now, ) # --- Phase 5: Entity promote/reject lifecycle --- def _set_entity_status( entity_id: str, new_status: str, actor: str = "api", note: str = "", ) -> bool: """Transition an entity's status with audit.""" if new_status not in ENTITY_STATUSES: raise ValueError(f"Invalid status: {new_status}") with get_connection() as conn: row = conn.execute( "SELECT status FROM entities WHERE id = ?", (entity_id,) ).fetchone() if row is None: return False old_status = row["status"] if old_status == new_status: return False now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") conn.execute( "UPDATE entities SET status = ?, updated_at = ? WHERE id = ?", (new_status, now, entity_id), ) # Action verb mirrors memory pattern if new_status == "active" and old_status == "candidate": action = "promoted" elif new_status == "invalid" and old_status == "candidate": action = "rejected" elif new_status == "invalid": action = "invalidated" elif new_status == "superseded": action = "superseded" else: action = "status_changed" _audit_entity( entity_id=entity_id, action=action, actor=actor, before={"status": old_status}, after={"status": new_status}, note=note, ) log.info("entity_status_changed", entity_id=entity_id, old=old_status, new=new_status, action=action) return True def promote_entity(entity_id: str, actor: str = "api", note: str = "") -> bool: """Promote a candidate entity to active. Phase 5F graduation hook: if this entity has source_refs pointing at memories (format "memory:"), mark those source memories as ``status=graduated`` and set their ``graduated_to_entity_id`` forward pointer. This preserves the memory as an immutable historical record while signalling that it's been absorbed into the typed graph. """ entity = get_entity(entity_id) if entity is None or entity.status != "candidate": return False ok = _set_entity_status(entity_id, "active", actor=actor, note=note) if not ok: return False # Phase 5F: mark source memories as graduated memory_ids = [ ref.split(":", 1)[1] for ref in (entity.source_refs or []) if isinstance(ref, str) and ref.startswith("memory:") ] if memory_ids: _graduate_source_memories(memory_ids, entity_id, actor=actor) # Phase 5G: sync conflict detection on promote. Fail-open — detection # errors log but never undo the successful promote. try: from atocore.engineering.conflicts import detect_conflicts_for_entity detect_conflicts_for_entity(entity_id) except Exception as e: log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e)) return True def _graduate_source_memories(memory_ids: list[str], entity_id: str, actor: str) -> None: """Mark source memories as graduated and set forward pointer.""" if not memory_ids: return now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") with get_connection() as conn: for mid in memory_ids: try: row = conn.execute( "SELECT status FROM memories WHERE id = ?", (mid,) ).fetchone() if row is None: continue old_status = row["status"] if old_status == "graduated": continue # already graduated — maybe by a different entity conn.execute( "UPDATE memories SET status = 'graduated', " "graduated_to_entity_id = ?, updated_at = ? WHERE id = ?", (entity_id, now, mid), ) # Write a memory_audit row for the graduation conn.execute( "INSERT INTO memory_audit (id, memory_id, action, actor, " "before_json, after_json, note, entity_kind) " "VALUES (?, ?, 'graduated', ?, ?, ?, ?, 'memory')", ( str(uuid.uuid4()), mid, actor or "api", json.dumps({"status": old_status}), json.dumps({ "status": "graduated", "graduated_to_entity_id": entity_id, }), f"graduated to entity {entity_id[:8]}", ), ) log.info("memory_graduated", memory_id=mid, entity_id=entity_id, old_status=old_status) except Exception as e: log.warning("memory_graduation_failed", memory_id=mid, entity_id=entity_id, error=str(e)) def reject_entity_candidate(entity_id: str, actor: str = "api", note: str = "") -> bool: """Reject a candidate entity (status → invalid).""" with get_connection() as conn: row = conn.execute( "SELECT status FROM entities WHERE id = ?", (entity_id,) ).fetchone() if row is None or row["status"] != "candidate": return False return _set_entity_status(entity_id, "invalid", actor=actor, note=note) def supersede_entity(entity_id: str, actor: str = "api", note: str = "") -> bool: """Mark an active entity as superseded by a newer one.""" return _set_entity_status(entity_id, "superseded", actor=actor, note=note) def get_entity_audit(entity_id: str, limit: int = 100) -> list[dict]: """Fetch audit entries for an entity from the shared audit table.""" with get_connection() as conn: rows = conn.execute( "SELECT id, memory_id AS entity_id, action, actor, before_json, " "after_json, note, timestamp FROM memory_audit " "WHERE entity_kind = 'entity' AND memory_id = ? " "ORDER BY timestamp DESC LIMIT ?", (entity_id, limit), ).fetchall() out = [] for r in rows: try: before = json.loads(r["before_json"] or "{}") except Exception: before = {} try: after = json.loads(r["after_json"] or "{}") except Exception: after = {} out.append({ "id": r["id"], "entity_id": r["entity_id"], "action": r["action"], "actor": r["actor"] or "api", "before": before, "after": after, "note": r["note"] or "", "timestamp": r["timestamp"], }) return out def get_entities( entity_type: str | None = None, project: str | None = None, status: str = "active", name_contains: str | None = None, limit: int = 100, ) -> list[Entity]: query = "SELECT * FROM entities WHERE status = ?" params: list = [status] if entity_type: query += " AND entity_type = ?" params.append(entity_type) if project is not None: query += " AND project = ?" params.append(project) if name_contains: query += " AND name LIKE ?" params.append(f"%{name_contains}%") query += " ORDER BY entity_type, name LIMIT ?" params.append(min(limit, 500)) with get_connection() as conn: rows = conn.execute(query, params).fetchall() return [_row_to_entity(r) for r in rows] def get_entity(entity_id: str) -> Entity | None: with get_connection() as conn: row = conn.execute( "SELECT * FROM entities WHERE id = ?", (entity_id,) ).fetchone() if row is None: return None return _row_to_entity(row) def get_relationships( entity_id: str, direction: str = "both", ) -> list[Relationship]: results = [] with get_connection() as conn: if direction in ("outgoing", "both"): rows = conn.execute( "SELECT * FROM relationships WHERE source_entity_id = ?", (entity_id,), ).fetchall() results.extend(_row_to_relationship(r) for r in rows) if direction in ("incoming", "both"): rows = conn.execute( "SELECT * FROM relationships WHERE target_entity_id = ?", (entity_id,), ).fetchall() results.extend(_row_to_relationship(r) for r in rows) return results def get_entity_with_context(entity_id: str) -> dict | None: entity = get_entity(entity_id) if entity is None: return None relationships = get_relationships(entity_id) related_ids = set() for rel in relationships: related_ids.add(rel.source_entity_id) related_ids.add(rel.target_entity_id) related_ids.discard(entity_id) related_entities = {} for rid in related_ids: e = get_entity(rid) if e: related_entities[rid] = e return { "entity": entity, "relationships": relationships, "related_entities": related_entities, } def _row_to_entity(row) -> Entity: return Entity( id=row["id"], entity_type=row["entity_type"], name=row["name"], project=row["project"] or "", description=row["description"] or "", properties=json.loads(row["properties"] or "{}"), status=row["status"], confidence=row["confidence"], source_refs=json.loads(row["source_refs"] or "[]"), created_at=row["created_at"] or "", updated_at=row["updated_at"] or "", ) def _row_to_relationship(row) -> Relationship: return Relationship( id=row["id"], source_entity_id=row["source_entity_id"], target_entity_id=row["target_entity_id"], relationship_type=row["relationship_type"], confidence=row["confidence"], source_refs=json.loads(row["source_refs"] or "[]"), created_at=row["created_at"] or "", )