Files
ATOCore/src/atocore/engineering/service.py

907 lines
32 KiB
Python

"""Engineering entity and relationship CRUD."""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
from atocore.projects.registry import resolve_project_name
log = get_logger("engineering")
ENTITY_TYPES = [
"project",
"system",
"subsystem",
"component",
"interface",
"requirement",
"constraint",
"decision",
"material",
"parameter",
"analysis_model",
"result",
"validation_claim",
"vendor",
"process",
# Issue F (visual evidence): images, PDFs, CAD exports attached to
# other entities via EVIDENCED_BY. properties carries kind +
# asset_id + caption + capture_context.
"artifact",
]
RELATIONSHIP_TYPES = [
# Structural family
"contains",
"part_of",
"interfaces_with",
# Intent family
"satisfies",
"constrained_by",
"affected_by_decision",
"based_on_assumption", # Phase 5 — Q-009 killer query
"supersedes",
# Validation family
"analyzed_by",
"validated_by",
"supports", # Phase 5 — Q-011 killer query
"conflicts_with", # Phase 5 — Q-012 future
"depends_on",
# Provenance family
"described_by",
"updated_by_session", # Phase 5 — session→entity provenance
"evidenced_by", # Phase 5 — Q-017 evidence trace
"summarized_in", # Phase 5 — mirror caches
# Domain-specific (pre-existing, retained)
"uses_material",
]
ENTITY_STATUSES = ["candidate", "active", "superseded", "invalid"]
# V1-0: extractor version this module writes into new entity rows.
# Per promotion-rules.md:268, every candidate must record the version of
# the extractor that produced it so later re-evaluation is auditable.
# Bump this when extraction logic materially changes.
EXTRACTOR_VERSION = "v1.0.0"
@dataclass
class Entity:
id: str
entity_type: str
name: str
project: str
description: str = ""
properties: dict = field(default_factory=dict)
status: str = "active"
confidence: float = 1.0
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
# V1-0 shared-header fields per engineering-v1-acceptance.md:45.
extractor_version: str = ""
canonical_home: str = "entity"
hand_authored: bool = False
@dataclass
class Relationship:
id: str
source_entity_id: str
target_entity_id: str
relationship_type: str
confidence: float = 1.0
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
def init_engineering_schema() -> None:
with get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
entity_type TEXT NOT NULL,
name TEXT NOT NULL,
project TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
properties TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'active',
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
extractor_version TEXT NOT NULL DEFAULT '',
canonical_home TEXT NOT NULL DEFAULT 'entity',
hand_authored INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# V1-0 (Engineering V1 completion): the three shared-header fields
# per engineering-v1-acceptance.md:45. Idempotent ALTERs for
# databases created before V1-0 land these columns without a full
# migration. Fresh DBs get them via the CREATE TABLE above; the
# ALTERs below are a no-op there.
from atocore.models.database import _column_exists # late import; avoids cycle
if not _column_exists(conn, "entities", "extractor_version"):
conn.execute("ALTER TABLE entities ADD COLUMN extractor_version TEXT DEFAULT ''")
if not _column_exists(conn, "entities", "canonical_home"):
conn.execute("ALTER TABLE entities ADD COLUMN canonical_home TEXT DEFAULT 'entity'")
if not _column_exists(conn, "entities", "hand_authored"):
conn.execute("ALTER TABLE entities ADD COLUMN hand_authored INTEGER DEFAULT 0")
conn.execute("""
CREATE TABLE IF NOT EXISTS relationships (
id TEXT PRIMARY KEY,
source_entity_id TEXT NOT NULL,
target_entity_id TEXT NOT NULL,
relationship_type TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (source_entity_id) REFERENCES entities(id),
FOREIGN KEY (target_entity_id) REFERENCES entities(id)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entities_project
ON entities(project)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entities_type
ON entities(entity_type)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_relationships_source
ON relationships(source_entity_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_relationships_target
ON relationships(target_entity_id)
""")
log.info("engineering_schema_initialized")
def create_entity(
entity_type: str,
name: str,
project: str = "",
description: str = "",
properties: dict | None = None,
status: str = "active",
confidence: float = 1.0,
source_refs: list[str] | None = None,
actor: str = "api",
hand_authored: bool = False,
extractor_version: str | None = None,
) -> Entity:
if entity_type not in ENTITY_TYPES:
raise ValueError(f"Invalid entity type: {entity_type}. Must be one of {ENTITY_TYPES}")
if status not in ENTITY_STATUSES:
raise ValueError(f"Invalid status: {status}. Must be one of {ENTITY_STATUSES}")
if not name or not name.strip():
raise ValueError("Entity name must be non-empty")
refs = list(source_refs) if source_refs else []
# V1-0 (F-8 provenance enforcement, engineering-v1-acceptance.md:147):
# every new entity row must carry non-empty source_refs OR be explicitly
# flagged hand_authored. This is the non-negotiable invariant every
# later V1 phase depends on — without it, active entities can escape
# into the graph with no traceable origin. Raises at the write seam so
# the bug is impossible to introduce silently.
if not refs and not hand_authored:
raise ValueError(
"source_refs required: every entity must carry provenance "
"(source_chunk_id / source_interaction_id / kb_cad_export_id / ...) "
"or set hand_authored=True to explicitly flag a direct human write"
)
# Phase 5: enforce project canonicalization contract at the write seam.
# Aliases like "p04" become "p04-gigabit" so downstream reads stay
# consistent with the registry.
project = resolve_project_name(project) if project else ""
entity_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
props = properties or {}
ev = extractor_version if extractor_version is not None else EXTRACTOR_VERSION
with get_connection() as conn:
conn.execute(
"""INSERT INTO entities
(id, entity_type, name, project, description, properties,
status, confidence, source_refs,
extractor_version, canonical_home, hand_authored,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
entity_id, entity_type, name.strip(), project,
description, json.dumps(props), status, confidence,
json.dumps(refs),
ev, "entity", 1 if hand_authored else 0,
now, now,
),
)
log.info("entity_created", entity_id=entity_id, entity_type=entity_type, name=name)
# Phase 5: entity audit rows share the memory_audit table via
# entity_kind="entity" discriminator. Same infrastructure, unified history.
_audit_entity(
entity_id=entity_id,
action="created",
actor=actor,
after={
"entity_type": entity_type,
"name": name.strip(),
"project": project,
"status": status,
"confidence": confidence,
"hand_authored": hand_authored,
"extractor_version": ev,
},
)
# V1-0 (F-5 hook, engineering-v1-acceptance.md:99): synchronous
# conflict detection on any active-entity write. The promote path
# already had this hook (see promote_entity below); V1-0 adds it to
# direct-active creates so every active row — however it got that
# way — is checked. Fail-open per "flag, never block" rule in
# conflict-model.md:256: detector errors log but never fail the write.
if status == "active":
try:
from atocore.engineering.conflicts import detect_conflicts_for_entity
detect_conflicts_for_entity(entity_id)
except Exception as e:
log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e))
return Entity(
id=entity_id, entity_type=entity_type, name=name.strip(),
project=project, description=description, properties=props,
status=status, confidence=confidence, source_refs=refs,
created_at=now, updated_at=now,
extractor_version=ev, canonical_home="entity",
hand_authored=hand_authored,
)
def _audit_entity(
entity_id: str,
action: str,
actor: str = "api",
before: dict | None = None,
after: dict | None = None,
note: str = "",
) -> None:
"""Append an entity mutation row to the shared memory_audit table."""
try:
with get_connection() as conn:
conn.execute(
"INSERT INTO memory_audit (id, memory_id, action, actor, "
"before_json, after_json, note, entity_kind) "
"VALUES (?, ?, ?, ?, ?, ?, ?, 'entity')",
(
str(uuid.uuid4()),
entity_id,
action,
actor or "api",
json.dumps(before or {}),
json.dumps(after or {}),
(note or "")[:500],
),
)
except Exception as e:
log.warning("entity_audit_failed", entity_id=entity_id, action=action, error=str(e))
def create_relationship(
source_entity_id: str,
target_entity_id: str,
relationship_type: str,
confidence: float = 1.0,
source_refs: list[str] | None = None,
) -> Relationship:
if relationship_type not in RELATIONSHIP_TYPES:
raise ValueError(f"Invalid relationship type: {relationship_type}")
rel_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
refs = source_refs or []
with get_connection() as conn:
conn.execute(
"""INSERT INTO relationships
(id, source_entity_id, target_entity_id, relationship_type,
confidence, source_refs, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(rel_id, source_entity_id, target_entity_id,
relationship_type, confidence, json.dumps(refs), now),
)
log.info(
"relationship_created",
rel_id=rel_id,
source=source_entity_id,
target=target_entity_id,
rel_type=relationship_type,
)
# Phase 5: relationship audit as an entity action on the source
_audit_entity(
entity_id=source_entity_id,
action="relationship_added",
actor="api",
after={
"rel_id": rel_id,
"rel_type": relationship_type,
"target": target_entity_id,
},
)
return Relationship(
id=rel_id, source_entity_id=source_entity_id,
target_entity_id=target_entity_id,
relationship_type=relationship_type,
confidence=confidence, source_refs=refs, created_at=now,
)
# --- Phase 5: Entity promote/reject lifecycle ---
def _set_entity_status(
entity_id: str,
new_status: str,
actor: str = "api",
note: str = "",
) -> bool:
"""Transition an entity's status with audit."""
if new_status not in ENTITY_STATUSES:
raise ValueError(f"Invalid status: {new_status}")
with get_connection() as conn:
row = conn.execute(
"SELECT status FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if row is None:
return False
old_status = row["status"]
if old_status == new_status:
return False
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"UPDATE entities SET status = ?, updated_at = ? WHERE id = ?",
(new_status, now, entity_id),
)
# Action verb mirrors memory pattern
if new_status == "active" and old_status == "candidate":
action = "promoted"
elif new_status == "invalid" and old_status == "candidate":
action = "rejected"
elif new_status == "invalid":
action = "invalidated"
elif new_status == "superseded":
action = "superseded"
else:
action = "status_changed"
_audit_entity(
entity_id=entity_id,
action=action,
actor=actor,
before={"status": old_status},
after={"status": new_status},
note=note,
)
log.info("entity_status_changed", entity_id=entity_id,
old=old_status, new=new_status, action=action)
return True
def promote_entity(
entity_id: str,
actor: str = "api",
note: str = "",
target_project: str | None = None,
) -> bool:
"""Promote a candidate entity to active.
When ``target_project`` is provided (Issue C), also retarget the
entity's project before flipping the status. Use this to graduate an
inbox/global lead into a real project (e.g. when a vendor quote
becomes a contract). ``target_project`` is canonicalized through the
registry; reserved ids (``inbox``) and ``""`` are accepted verbatim.
Phase 5F graduation hook: if this entity has source_refs pointing at
memories (format "memory:<uuid>"), mark those source memories as
``status=graduated`` and set their ``graduated_to_entity_id`` forward
pointer. This preserves the memory as an immutable historical record
while signalling that it's been absorbed into the typed graph.
"""
entity = get_entity(entity_id)
if entity is None or entity.status != "candidate":
return False
# V1-0 (F-8 provenance re-check at promote). The invariant must hold at
# BOTH create_entity AND promote_entity per the plan, because candidate
# rows can exist in the DB from before V1-0 (no enforcement at their
# create time) or can be inserted by code paths that bypass the service
# layer. Block any candidate with empty source_refs that is NOT flagged
# hand_authored from ever becoming active. Same error shape as the
# create-side check for symmetry.
if not (entity.source_refs or []) and not entity.hand_authored:
raise ValueError(
"source_refs required: cannot promote a candidate with no "
"provenance. Attach source_refs via PATCH /entities/{id}, "
"or flag hand_authored=true before promoting."
)
if target_project is not None:
new_project = (
resolve_project_name(target_project) if target_project else ""
)
if new_project != entity.project:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
conn.execute(
"UPDATE entities SET project = ?, updated_at = ? "
"WHERE id = ?",
(new_project, now, entity_id),
)
_audit_entity(
entity_id=entity_id,
action="retargeted",
actor=actor,
before={"project": entity.project},
after={"project": new_project},
note=note,
)
ok = _set_entity_status(entity_id, "active", actor=actor, note=note)
if not ok:
return False
# Phase 5F: mark source memories as graduated
memory_ids = [
ref.split(":", 1)[1]
for ref in (entity.source_refs or [])
if isinstance(ref, str) and ref.startswith("memory:")
]
if memory_ids:
_graduate_source_memories(memory_ids, entity_id, actor=actor)
# Phase 5G: sync conflict detection on promote. Fail-open — detection
# errors log but never undo the successful promote.
try:
from atocore.engineering.conflicts import detect_conflicts_for_entity
detect_conflicts_for_entity(entity_id)
except Exception as e:
log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e))
return True
def _graduate_source_memories(memory_ids: list[str], entity_id: str, actor: str) -> None:
"""Mark source memories as graduated and set forward pointer."""
if not memory_ids:
return
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
for mid in memory_ids:
try:
row = conn.execute(
"SELECT status FROM memories WHERE id = ?", (mid,)
).fetchone()
if row is None:
continue
old_status = row["status"]
if old_status == "graduated":
continue # already graduated — maybe by a different entity
conn.execute(
"UPDATE memories SET status = 'graduated', "
"graduated_to_entity_id = ?, updated_at = ? WHERE id = ?",
(entity_id, now, mid),
)
# Write a memory_audit row for the graduation
conn.execute(
"INSERT INTO memory_audit (id, memory_id, action, actor, "
"before_json, after_json, note, entity_kind) "
"VALUES (?, ?, 'graduated', ?, ?, ?, ?, 'memory')",
(
str(uuid.uuid4()),
mid,
actor or "api",
json.dumps({"status": old_status}),
json.dumps({
"status": "graduated",
"graduated_to_entity_id": entity_id,
}),
f"graduated to entity {entity_id[:8]}",
),
)
log.info("memory_graduated", memory_id=mid,
entity_id=entity_id, old_status=old_status)
except Exception as e:
log.warning("memory_graduation_failed",
memory_id=mid, entity_id=entity_id, error=str(e))
def reject_entity_candidate(entity_id: str, actor: str = "api", note: str = "") -> bool:
"""Reject a candidate entity (status → invalid)."""
with get_connection() as conn:
row = conn.execute(
"SELECT status FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if row is None or row["status"] != "candidate":
return False
return _set_entity_status(entity_id, "invalid", actor=actor, note=note)
def supersede_entity(
entity_id: str,
actor: str = "api",
note: str = "",
superseded_by: str | None = None,
) -> bool:
"""Mark an active entity as superseded.
When ``superseded_by`` names a real entity, also create a
``supersedes`` relationship from the new entity to the old one
(semantics: ``new SUPERSEDES old``). This keeps the graph
navigable without the caller remembering to make that edge.
"""
if superseded_by:
new_entity = get_entity(superseded_by)
if new_entity is None:
raise ValueError(
f"superseded_by entity not found: {superseded_by}"
)
if new_entity.id == entity_id:
raise ValueError("entity cannot supersede itself")
ok = _set_entity_status(entity_id, "superseded", actor=actor, note=note)
if not ok:
return False
if superseded_by:
try:
create_relationship(
source_entity_id=superseded_by,
target_entity_id=entity_id,
relationship_type="supersedes",
source_refs=[f"supersede-api:{actor}"],
)
except Exception as e:
log.warning(
"supersede_relationship_create_failed",
entity_id=entity_id,
superseded_by=superseded_by,
error=str(e),
)
# V1-0 (F-5 hook on supersede, per plan's "every active-entity
# write path"). Supersede demotes `entity_id` AND adds a
# `supersedes` relationship rooted at the already-active
# `superseded_by`. That new edge can create a conflict the
# detector should catch synchronously. Fail-open per
# conflict-model.md:256.
try:
from atocore.engineering.conflicts import detect_conflicts_for_entity
detect_conflicts_for_entity(superseded_by)
except Exception as e:
log.warning(
"conflict_detection_failed",
entity_id=superseded_by,
error=str(e),
)
return True
def invalidate_active_entity(
entity_id: str,
actor: str = "api",
reason: str = "",
) -> tuple[bool, str]:
"""Mark an active entity as invalid (Issue E — retraction path).
Returns (success, status_code) where status_code is one of:
- "invalidated" — happy path
- "not_found" — no such entity
- "already_invalid" — already invalid (idempotent)
- "not_active" — entity is candidate/superseded; use the
appropriate other endpoint
This is the public retraction API distinct from
``reject_entity_candidate`` (which only handles candidate→invalid).
"""
entity = get_entity(entity_id)
if entity is None:
return False, "not_found"
if entity.status == "invalid":
return True, "already_invalid"
if entity.status != "active":
return False, "not_active"
ok = _set_entity_status(entity_id, "invalid", actor=actor, note=reason)
return ok, "invalidated" if ok else "not_active"
def update_entity(
entity_id: str,
*,
description: str | None = None,
properties_patch: dict | None = None,
confidence: float | None = None,
append_source_refs: list[str] | None = None,
actor: str = "api",
note: str = "",
) -> Entity | None:
"""Update mutable fields on an existing entity (Issue E follow-up).
Field rules (kept narrow on purpose):
- ``description``: replaces the current value when provided.
- ``properties_patch``: merged into the existing ``properties`` dict,
shallow. Pass ``None`` as a value to delete a key; pass a new
value to overwrite it.
- ``confidence``: replaces when provided. Must be in [0, 1].
- ``append_source_refs``: appended verbatim to the existing list
(duplicates are filtered out, order preserved).
What you cannot change via this path:
- ``entity_type`` — requires supersede+create (a new type is a new
thing).
- ``project`` — use ``promote_entity`` with ``target_project`` for
inbox→project graduation, or supersede+create for anything else.
- ``name`` — renames are destructive to cross-references;
supersede+create.
- ``status`` — use the dedicated promote/reject/invalidate/supersede
endpoints.
Returns the updated entity, or None if no such entity exists.
"""
entity = get_entity(entity_id)
if entity is None:
return None
if confidence is not None and not (0.0 <= confidence <= 1.0):
raise ValueError("confidence must be in [0, 1]")
before = {
"description": entity.description,
"properties": dict(entity.properties or {}),
"confidence": entity.confidence,
"source_refs": list(entity.source_refs or []),
}
new_description = entity.description if description is None else description
new_confidence = entity.confidence if confidence is None else confidence
new_properties = dict(entity.properties or {})
if properties_patch:
for key, value in properties_patch.items():
if value is None:
new_properties.pop(key, None)
else:
new_properties[key] = value
new_refs = list(entity.source_refs or [])
if append_source_refs:
existing = set(new_refs)
for ref in append_source_refs:
if ref and ref not in existing:
new_refs.append(ref)
existing.add(ref)
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
conn.execute(
"""UPDATE entities
SET description = ?, properties = ?, confidence = ?,
source_refs = ?, updated_at = ?
WHERE id = ?""",
(
new_description,
json.dumps(new_properties),
new_confidence,
json.dumps(new_refs),
now,
entity_id,
),
)
after = {
"description": new_description,
"properties": new_properties,
"confidence": new_confidence,
"source_refs": new_refs,
}
_audit_entity(
entity_id=entity_id,
action="updated",
actor=actor,
before=before,
after=after,
note=note,
)
log.info("entity_updated", entity_id=entity_id, actor=actor)
return get_entity(entity_id)
def get_entity_audit(entity_id: str, limit: int = 100) -> list[dict]:
"""Fetch audit entries for an entity from the shared audit table."""
with get_connection() as conn:
rows = conn.execute(
"SELECT id, memory_id AS entity_id, action, actor, before_json, "
"after_json, note, timestamp FROM memory_audit "
"WHERE entity_kind = 'entity' AND memory_id = ? "
"ORDER BY timestamp DESC LIMIT ?",
(entity_id, limit),
).fetchall()
out = []
for r in rows:
try:
before = json.loads(r["before_json"] or "{}")
except Exception:
before = {}
try:
after = json.loads(r["after_json"] or "{}")
except Exception:
after = {}
out.append({
"id": r["id"],
"entity_id": r["entity_id"],
"action": r["action"],
"actor": r["actor"] or "api",
"before": before,
"after": after,
"note": r["note"] or "",
"timestamp": r["timestamp"],
})
return out
def get_entities(
entity_type: str | None = None,
project: str | None = None,
status: str = "active",
name_contains: str | None = None,
limit: int = 100,
scope_only: bool = False,
) -> list[Entity]:
"""List entities with optional filters.
Project scoping rules (Issue C — inbox + cross-project):
- ``project=None``: no project filter, return everything matching status.
- ``project=""``: return only cross-project (global) entities.
- ``project="inbox"``: return only inbox entities.
- ``project="<real>"`` and ``scope_only=False`` (default): return entities
scoped to that project PLUS cross-project (``project=""``) entities.
- ``project="<real>"`` and ``scope_only=True``: return only that project,
without the cross-project bleed.
"""
from atocore.projects.registry import (
INBOX_PROJECT, GLOBAL_PROJECT, is_reserved_project,
)
query = "SELECT * FROM entities WHERE status = ?"
params: list = [status]
if entity_type:
query += " AND entity_type = ?"
params.append(entity_type)
if project is not None:
p = (project or "").strip()
if p == GLOBAL_PROJECT or is_reserved_project(p) or scope_only:
query += " AND project = ?"
params.append(p)
else:
# Real project — include cross-project entities by default.
query += " AND (project = ? OR project = ?)"
params.extend([p, GLOBAL_PROJECT])
if name_contains:
query += " AND name LIKE ?"
params.append(f"%{name_contains}%")
query += " ORDER BY entity_type, name LIMIT ?"
params.append(min(limit, 500))
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_entity(r) for r in rows]
def get_entity(entity_id: str) -> Entity | None:
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if row is None:
return None
return _row_to_entity(row)
def get_relationships(
entity_id: str,
direction: str = "both",
) -> list[Relationship]:
results = []
with get_connection() as conn:
if direction in ("outgoing", "both"):
rows = conn.execute(
"SELECT * FROM relationships WHERE source_entity_id = ?",
(entity_id,),
).fetchall()
results.extend(_row_to_relationship(r) for r in rows)
if direction in ("incoming", "both"):
rows = conn.execute(
"SELECT * FROM relationships WHERE target_entity_id = ?",
(entity_id,),
).fetchall()
results.extend(_row_to_relationship(r) for r in rows)
return results
def get_entity_with_context(entity_id: str) -> dict | None:
entity = get_entity(entity_id)
if entity is None:
return None
relationships = get_relationships(entity_id)
related_ids = set()
for rel in relationships:
related_ids.add(rel.source_entity_id)
related_ids.add(rel.target_entity_id)
related_ids.discard(entity_id)
related_entities = {}
for rid in related_ids:
e = get_entity(rid)
if e:
related_entities[rid] = e
return {
"entity": entity,
"relationships": relationships,
"related_entities": related_entities,
}
def _row_to_entity(row) -> Entity:
# V1-0 shared-header fields are optional on read — rows that predate
# V1-0 migration have NULL / missing values, so defaults kick in and
# older tests that build Entity() without the new fields keep passing.
# `row.keys()` lets us tolerate SQLite rows that lack the columns
# entirely (pre-migration sqlite3.Row).
keys = set(row.keys())
extractor_version = (row["extractor_version"] or "") if "extractor_version" in keys else ""
canonical_home = (row["canonical_home"] or "entity") if "canonical_home" in keys else "entity"
hand_authored = bool(row["hand_authored"]) if "hand_authored" in keys and row["hand_authored"] is not None else False
return Entity(
id=row["id"],
entity_type=row["entity_type"],
name=row["name"],
project=row["project"] or "",
description=row["description"] or "",
properties=json.loads(row["properties"] or "{}"),
status=row["status"],
confidence=row["confidence"],
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
updated_at=row["updated_at"] or "",
extractor_version=extractor_version,
canonical_home=canonical_home,
hand_authored=hand_authored,
)
def _row_to_relationship(row) -> Relationship:
return Relationship(
id=row["id"],
source_entity_id=row["source_entity_id"],
target_entity_id=row["target_entity_id"],
relationship_type=row["relationship_type"],
confidence=row["confidence"],
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
)