Files
ATOCore/src/atocore/engineering/service.py
Anto01 dc20033a93 feat: Engineering Knowledge Layer V1 — entities + relationships
Layer 2 of the AtoCore architecture. Adds typed engineering entities
with relationships on top of the flat memory/state/chunk substrate.

Schema:
- entities table: id, entity_type, name, project, description,
  properties (JSON), status, confidence, source_refs, timestamps
- relationships table: source_entity_id, target_entity_id,
  relationship_type, confidence, source_refs

15 entity types: project, system, subsystem, component, interface,
requirement, constraint, decision, material, parameter,
analysis_model, result, validation_claim, vendor, process

12 relationship types: contains, part_of, interfaces_with,
satisfies, constrained_by, affected_by_decision, analyzed_by,
validated_by, depends_on, uses_material, described_by, supersedes

Service layer: full CRUD + get_entity_with_context (returns an
entity with its relationships and all related entities in one call).

API endpoints:
- POST /entities — create entity
- GET /entities — list/filter by type, project, status, name
- GET /entities/{id} — entity + relationships + related entities
- POST /relationships — create relationship

Schema auto-initialized on app startup via init_engineering_schema().

7 tests covering entity CRUD, relationships, context traversal,
filtering, name search, and validation.

Test count: 290 -> 297.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 09:50:58 -04:00

318 lines
9.5 KiB
Python

"""Engineering entity and relationship CRUD."""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("engineering")
ENTITY_TYPES = [
"project",
"system",
"subsystem",
"component",
"interface",
"requirement",
"constraint",
"decision",
"material",
"parameter",
"analysis_model",
"result",
"validation_claim",
"vendor",
"process",
]
RELATIONSHIP_TYPES = [
"contains",
"part_of",
"interfaces_with",
"satisfies",
"constrained_by",
"affected_by_decision",
"analyzed_by",
"validated_by",
"depends_on",
"uses_material",
"described_by",
"supersedes",
]
ENTITY_STATUSES = ["candidate", "active", "superseded", "invalid"]
@dataclass
class Entity:
id: str
entity_type: str
name: str
project: str
description: str = ""
properties: dict = field(default_factory=dict)
status: str = "active"
confidence: float = 1.0
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
@dataclass
class Relationship:
id: str
source_entity_id: str
target_entity_id: str
relationship_type: str
confidence: float = 1.0
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
def init_engineering_schema() -> None:
with get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
entity_type TEXT NOT NULL,
name TEXT NOT NULL,
project TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
properties TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'active',
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS relationships (
id TEXT PRIMARY KEY,
source_entity_id TEXT NOT NULL,
target_entity_id TEXT NOT NULL,
relationship_type TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (source_entity_id) REFERENCES entities(id),
FOREIGN KEY (target_entity_id) REFERENCES entities(id)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entities_project
ON entities(project)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entities_type
ON entities(entity_type)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_relationships_source
ON relationships(source_entity_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_relationships_target
ON relationships(target_entity_id)
""")
log.info("engineering_schema_initialized")
def create_entity(
entity_type: str,
name: str,
project: str = "",
description: str = "",
properties: dict | None = None,
status: str = "active",
confidence: float = 1.0,
source_refs: list[str] | None = None,
) -> Entity:
if entity_type not in ENTITY_TYPES:
raise ValueError(f"Invalid entity type: {entity_type}. Must be one of {ENTITY_TYPES}")
if status not in ENTITY_STATUSES:
raise ValueError(f"Invalid status: {status}. Must be one of {ENTITY_STATUSES}")
if not name or not name.strip():
raise ValueError("Entity name must be non-empty")
entity_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
props = properties or {}
refs = source_refs or []
with get_connection() as conn:
conn.execute(
"""INSERT INTO entities
(id, entity_type, name, project, description, properties,
status, confidence, source_refs, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
entity_id, entity_type, name.strip(), project,
description, json.dumps(props), status, confidence,
json.dumps(refs), now, now,
),
)
log.info("entity_created", entity_id=entity_id, entity_type=entity_type, name=name)
return Entity(
id=entity_id, entity_type=entity_type, name=name.strip(),
project=project, description=description, properties=props,
status=status, confidence=confidence, source_refs=refs,
created_at=now, updated_at=now,
)
def create_relationship(
source_entity_id: str,
target_entity_id: str,
relationship_type: str,
confidence: float = 1.0,
source_refs: list[str] | None = None,
) -> Relationship:
if relationship_type not in RELATIONSHIP_TYPES:
raise ValueError(f"Invalid relationship type: {relationship_type}")
rel_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
refs = source_refs or []
with get_connection() as conn:
conn.execute(
"""INSERT INTO relationships
(id, source_entity_id, target_entity_id, relationship_type,
confidence, source_refs, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(rel_id, source_entity_id, target_entity_id,
relationship_type, confidence, json.dumps(refs), now),
)
log.info(
"relationship_created",
rel_id=rel_id,
source=source_entity_id,
target=target_entity_id,
rel_type=relationship_type,
)
return Relationship(
id=rel_id, source_entity_id=source_entity_id,
target_entity_id=target_entity_id,
relationship_type=relationship_type,
confidence=confidence, source_refs=refs, created_at=now,
)
def get_entities(
entity_type: str | None = None,
project: str | None = None,
status: str = "active",
name_contains: str | None = None,
limit: int = 100,
) -> list[Entity]:
query = "SELECT * FROM entities WHERE status = ?"
params: list = [status]
if entity_type:
query += " AND entity_type = ?"
params.append(entity_type)
if project is not None:
query += " AND project = ?"
params.append(project)
if name_contains:
query += " AND name LIKE ?"
params.append(f"%{name_contains}%")
query += " ORDER BY entity_type, name LIMIT ?"
params.append(min(limit, 500))
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_entity(r) for r in rows]
def get_entity(entity_id: str) -> Entity | None:
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if row is None:
return None
return _row_to_entity(row)
def get_relationships(
entity_id: str,
direction: str = "both",
) -> list[Relationship]:
results = []
with get_connection() as conn:
if direction in ("outgoing", "both"):
rows = conn.execute(
"SELECT * FROM relationships WHERE source_entity_id = ?",
(entity_id,),
).fetchall()
results.extend(_row_to_relationship(r) for r in rows)
if direction in ("incoming", "both"):
rows = conn.execute(
"SELECT * FROM relationships WHERE target_entity_id = ?",
(entity_id,),
).fetchall()
results.extend(_row_to_relationship(r) for r in rows)
return results
def get_entity_with_context(entity_id: str) -> dict | None:
entity = get_entity(entity_id)
if entity is None:
return None
relationships = get_relationships(entity_id)
related_ids = set()
for rel in relationships:
related_ids.add(rel.source_entity_id)
related_ids.add(rel.target_entity_id)
related_ids.discard(entity_id)
related_entities = {}
for rid in related_ids:
e = get_entity(rid)
if e:
related_entities[rid] = e
return {
"entity": entity,
"relationships": relationships,
"related_entities": related_entities,
}
def _row_to_entity(row) -> Entity:
return Entity(
id=row["id"],
entity_type=row["entity_type"],
name=row["name"],
project=row["project"] or "",
description=row["description"] or "",
properties=json.loads(row["properties"] or "{}"),
status=row["status"],
confidence=row["confidence"],
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
updated_at=row["updated_at"] or "",
)
def _row_to_relationship(row) -> Relationship:
return Relationship(
id=row["id"],
source_entity_id=row["source_entity_id"],
target_entity_id=row["target_entity_id"],
relationship_type=row["relationship_type"],
confidence=row["confidence"],
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
)