feat: Engineering Knowledge Layer V1 — entities + relationships

Layer 2 of the AtoCore architecture. Adds typed engineering entities
with relationships on top of the flat memory/state/chunk substrate.

Schema:
- entities table: id, entity_type, name, project, description,
  properties (JSON), status, confidence, source_refs, timestamps
- relationships table: source_entity_id, target_entity_id,
  relationship_type, confidence, source_refs

15 entity types: project, system, subsystem, component, interface,
requirement, constraint, decision, material, parameter,
analysis_model, result, validation_claim, vendor, process

12 relationship types: contains, part_of, interfaces_with,
satisfies, constrained_by, affected_by_decision, analyzed_by,
validated_by, depends_on, uses_material, described_by, supersedes

Service layer: full CRUD + get_entity_with_context (returns an
entity with its relationships and all related entities in one call).

API endpoints:
- POST /entities — create entity
- GET /entities — list/filter by type, project, status, name
- GET /entities/{id} — entity + relationships + related entities
- POST /relationships — create relationship

Schema auto-initialized on app startup via init_engineering_schema().

7 tests covering entity CRUD, relationships, context traversal,
filtering, name search, and validation.

Test count: 290 -> 297.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-13 09:50:58 -04:00
parent b86181eb6c
commit dc20033a93
5 changed files with 601 additions and 0 deletions

View File

@@ -30,6 +30,16 @@ from atocore.interactions.service import (
list_interactions,
record_interaction,
)
from atocore.engineering.service import (
ENTITY_TYPES,
RELATIONSHIP_TYPES,
create_entity,
create_relationship,
get_entities,
get_entity,
get_entity_with_context,
get_relationships,
)
from atocore.memory.extractor import (
EXTRACTOR_VERSION,
MemoryCandidate,
@@ -926,6 +936,144 @@ def api_dashboard() -> dict:
}
# --- Engineering Knowledge Layer (Layer 2) ---
class EntityCreateRequest(BaseModel):
entity_type: str
name: str
project: str = ""
description: str = ""
properties: dict | None = None
status: str = "active"
confidence: float = 1.0
source_refs: list[str] | None = None
class RelationshipCreateRequest(BaseModel):
source_entity_id: str
target_entity_id: str
relationship_type: str
confidence: float = 1.0
source_refs: list[str] | None = None
@router.post("/entities")
def api_create_entity(req: EntityCreateRequest) -> dict:
"""Create a new engineering entity."""
try:
entity = create_entity(
entity_type=req.entity_type,
name=req.name,
project=req.project,
description=req.description,
properties=req.properties,
status=req.status,
confidence=req.confidence,
source_refs=req.source_refs,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "ok", "id": entity.id, "entity_type": entity.entity_type, "name": entity.name}
@router.get("/entities")
def api_list_entities(
entity_type: str | None = None,
project: str | None = None,
status: str = "active",
name_contains: str | None = None,
limit: int = 100,
) -> dict:
"""List engineering entities with optional filters."""
entities = get_entities(
entity_type=entity_type,
project=project,
status=status,
name_contains=name_contains,
limit=limit,
)
return {
"entities": [
{
"id": e.id,
"entity_type": e.entity_type,
"name": e.name,
"project": e.project,
"description": e.description,
"properties": e.properties,
"status": e.status,
"confidence": e.confidence,
}
for e in entities
],
"count": len(entities),
}
@router.get("/entities/{entity_id}")
def api_get_entity(entity_id: str) -> dict:
"""Get an entity with its relationships and related entities."""
result = get_entity_with_context(entity_id)
if result is None:
raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}")
entity = result["entity"]
return {
"entity": {
"id": entity.id,
"entity_type": entity.entity_type,
"name": entity.name,
"project": entity.project,
"description": entity.description,
"properties": entity.properties,
"status": entity.status,
"confidence": entity.confidence,
"source_refs": entity.source_refs,
"created_at": entity.created_at,
"updated_at": entity.updated_at,
},
"relationships": [
{
"id": r.id,
"source_entity_id": r.source_entity_id,
"target_entity_id": r.target_entity_id,
"relationship_type": r.relationship_type,
"confidence": r.confidence,
}
for r in result["relationships"]
],
"related_entities": {
eid: {
"entity_type": e.entity_type,
"name": e.name,
"project": e.project,
"description": e.description[:200],
}
for eid, e in result["related_entities"].items()
},
}
@router.post("/relationships")
def api_create_relationship(req: RelationshipCreateRequest) -> dict:
"""Create a relationship between two entities."""
try:
rel = create_relationship(
source_entity_id=req.source_entity_id,
target_entity_id=req.target_entity_id,
relationship_type=req.relationship_type,
confidence=req.confidence,
source_refs=req.source_refs,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "ok",
"id": rel.id,
"relationship_type": rel.relationship_type,
}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""

View File

@@ -0,0 +1,16 @@
"""Engineering Knowledge Layer — typed entities and relationships.
Layer 2 of the AtoCore architecture. Sits on top of the core machine
layer (memories, project state, retrieval) and adds structured
engineering objects with typed relationships so queries like "what
requirements does this component satisfy" can be answered directly
instead of relying on flat text search.
V1 entity types (from docs/architecture/engineering-ontology-v1.md):
Component, Subsystem, Requirement, Constraint, Decision, Material,
Parameter, Interface
V1 relationship types:
CONTAINS, PART_OF, INTERFACES_WITH, SATISFIES, CONSTRAINED_BY,
AFFECTED_BY_DECISION, ANALYZED_BY, VALIDATED_BY, DEPENDS_ON
"""

View File

@@ -0,0 +1,317 @@
"""Engineering entity and relationship CRUD."""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("engineering")
ENTITY_TYPES = [
"project",
"system",
"subsystem",
"component",
"interface",
"requirement",
"constraint",
"decision",
"material",
"parameter",
"analysis_model",
"result",
"validation_claim",
"vendor",
"process",
]
RELATIONSHIP_TYPES = [
"contains",
"part_of",
"interfaces_with",
"satisfies",
"constrained_by",
"affected_by_decision",
"analyzed_by",
"validated_by",
"depends_on",
"uses_material",
"described_by",
"supersedes",
]
ENTITY_STATUSES = ["candidate", "active", "superseded", "invalid"]
@dataclass
class Entity:
id: str
entity_type: str
name: str
project: str
description: str = ""
properties: dict = field(default_factory=dict)
status: str = "active"
confidence: float = 1.0
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
@dataclass
class Relationship:
id: str
source_entity_id: str
target_entity_id: str
relationship_type: str
confidence: float = 1.0
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
def init_engineering_schema() -> None:
with get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
entity_type TEXT NOT NULL,
name TEXT NOT NULL,
project TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
properties TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'active',
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS relationships (
id TEXT PRIMARY KEY,
source_entity_id TEXT NOT NULL,
target_entity_id TEXT NOT NULL,
relationship_type TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (source_entity_id) REFERENCES entities(id),
FOREIGN KEY (target_entity_id) REFERENCES entities(id)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entities_project
ON entities(project)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entities_type
ON entities(entity_type)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_relationships_source
ON relationships(source_entity_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_relationships_target
ON relationships(target_entity_id)
""")
log.info("engineering_schema_initialized")
def create_entity(
entity_type: str,
name: str,
project: str = "",
description: str = "",
properties: dict | None = None,
status: str = "active",
confidence: float = 1.0,
source_refs: list[str] | None = None,
) -> Entity:
if entity_type not in ENTITY_TYPES:
raise ValueError(f"Invalid entity type: {entity_type}. Must be one of {ENTITY_TYPES}")
if status not in ENTITY_STATUSES:
raise ValueError(f"Invalid status: {status}. Must be one of {ENTITY_STATUSES}")
if not name or not name.strip():
raise ValueError("Entity name must be non-empty")
entity_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
props = properties or {}
refs = source_refs or []
with get_connection() as conn:
conn.execute(
"""INSERT INTO entities
(id, entity_type, name, project, description, properties,
status, confidence, source_refs, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
entity_id, entity_type, name.strip(), project,
description, json.dumps(props), status, confidence,
json.dumps(refs), now, now,
),
)
log.info("entity_created", entity_id=entity_id, entity_type=entity_type, name=name)
return Entity(
id=entity_id, entity_type=entity_type, name=name.strip(),
project=project, description=description, properties=props,
status=status, confidence=confidence, source_refs=refs,
created_at=now, updated_at=now,
)
def create_relationship(
source_entity_id: str,
target_entity_id: str,
relationship_type: str,
confidence: float = 1.0,
source_refs: list[str] | None = None,
) -> Relationship:
if relationship_type not in RELATIONSHIP_TYPES:
raise ValueError(f"Invalid relationship type: {relationship_type}")
rel_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
refs = source_refs or []
with get_connection() as conn:
conn.execute(
"""INSERT INTO relationships
(id, source_entity_id, target_entity_id, relationship_type,
confidence, source_refs, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(rel_id, source_entity_id, target_entity_id,
relationship_type, confidence, json.dumps(refs), now),
)
log.info(
"relationship_created",
rel_id=rel_id,
source=source_entity_id,
target=target_entity_id,
rel_type=relationship_type,
)
return Relationship(
id=rel_id, source_entity_id=source_entity_id,
target_entity_id=target_entity_id,
relationship_type=relationship_type,
confidence=confidence, source_refs=refs, created_at=now,
)
def get_entities(
entity_type: str | None = None,
project: str | None = None,
status: str = "active",
name_contains: str | None = None,
limit: int = 100,
) -> list[Entity]:
query = "SELECT * FROM entities WHERE status = ?"
params: list = [status]
if entity_type:
query += " AND entity_type = ?"
params.append(entity_type)
if project is not None:
query += " AND project = ?"
params.append(project)
if name_contains:
query += " AND name LIKE ?"
params.append(f"%{name_contains}%")
query += " ORDER BY entity_type, name LIMIT ?"
params.append(min(limit, 500))
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_entity(r) for r in rows]
def get_entity(entity_id: str) -> Entity | None:
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if row is None:
return None
return _row_to_entity(row)
def get_relationships(
entity_id: str,
direction: str = "both",
) -> list[Relationship]:
results = []
with get_connection() as conn:
if direction in ("outgoing", "both"):
rows = conn.execute(
"SELECT * FROM relationships WHERE source_entity_id = ?",
(entity_id,),
).fetchall()
results.extend(_row_to_relationship(r) for r in rows)
if direction in ("incoming", "both"):
rows = conn.execute(
"SELECT * FROM relationships WHERE target_entity_id = ?",
(entity_id,),
).fetchall()
results.extend(_row_to_relationship(r) for r in rows)
return results
def get_entity_with_context(entity_id: str) -> dict | None:
entity = get_entity(entity_id)
if entity is None:
return None
relationships = get_relationships(entity_id)
related_ids = set()
for rel in relationships:
related_ids.add(rel.source_entity_id)
related_ids.add(rel.target_entity_id)
related_ids.discard(entity_id)
related_entities = {}
for rid in related_ids:
e = get_entity(rid)
if e:
related_entities[rid] = e
return {
"entity": entity,
"relationships": relationships,
"related_entities": related_entities,
}
def _row_to_entity(row) -> Entity:
return Entity(
id=row["id"],
entity_type=row["entity_type"],
name=row["name"],
project=row["project"] or "",
description=row["description"] or "",
properties=json.loads(row["properties"] or "{}"),
status=row["status"],
confidence=row["confidence"],
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
updated_at=row["updated_at"] or "",
)
def _row_to_relationship(row) -> Relationship:
return Relationship(
id=row["id"],
source_entity_id=row["source_entity_id"],
target_entity_id=row["target_entity_id"],
relationship_type=row["relationship_type"],
confidence=row["confidence"],
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
)

View File

@@ -8,6 +8,7 @@ from atocore import __version__
from atocore.api.routes import router
import atocore.config as _config
from atocore.context.project_state import init_project_state_schema
from atocore.engineering.service import init_engineering_schema
from atocore.ingestion.pipeline import get_source_status
from atocore.models.database import init_db
from atocore.observability.logger import get_logger, setup_logging
@@ -29,6 +30,7 @@ async def lifespan(app: FastAPI):
_config.ensure_runtime_dirs()
init_db()
init_project_state_schema()
init_engineering_schema()
log.info(
"startup_ready",
env=_config.settings.env,

118
tests/test_engineering.py Normal file
View File

@@ -0,0 +1,118 @@
"""Tests for the Engineering Knowledge Layer."""
from atocore.engineering.service import (
ENTITY_TYPES,
RELATIONSHIP_TYPES,
create_entity,
create_relationship,
get_entities,
get_entity,
get_entity_with_context,
get_relationships,
init_engineering_schema,
)
from atocore.models.database import init_db
import pytest
def test_create_and_get_entity(tmp_data_dir):
init_db()
init_engineering_schema()
e = create_entity(
entity_type="component",
name="Pivot Pin",
project="p04-gigabit",
description="Lateral support pivot pin for M1 assembly",
properties={"material": "GF-PTFE", "diameter_mm": 12},
)
assert e.entity_type == "component"
assert e.name == "Pivot Pin"
assert e.properties["material"] == "GF-PTFE"
fetched = get_entity(e.id)
assert fetched is not None
assert fetched.name == "Pivot Pin"
def test_create_relationship(tmp_data_dir):
init_db()
init_engineering_schema()
subsystem = create_entity("subsystem", "Lateral Support", project="p04-gigabit")
component = create_entity("component", "Pivot Pin", project="p04-gigabit")
rel = create_relationship(
source_entity_id=subsystem.id,
target_entity_id=component.id,
relationship_type="contains",
)
assert rel.relationship_type == "contains"
rels = get_relationships(subsystem.id, direction="outgoing")
assert len(rels) == 1
assert rels[0].target_entity_id == component.id
def test_entity_with_context(tmp_data_dir):
init_db()
init_engineering_schema()
subsystem = create_entity("subsystem", "Lateral Support", project="p04-gigabit")
pin = create_entity("component", "Pivot Pin", project="p04-gigabit")
pad = create_entity("component", "PTFE Pad", project="p04-gigabit")
material = create_entity("material", "GF-PTFE", project="p04-gigabit",
description="Glass-filled PTFE for thermal stability")
create_relationship(subsystem.id, pin.id, "contains")
create_relationship(subsystem.id, pad.id, "contains")
create_relationship(pad.id, material.id, "uses_material")
ctx = get_entity_with_context(subsystem.id)
assert ctx is not None
assert len(ctx["relationships"]) == 2
assert pin.id in ctx["related_entities"]
assert pad.id in ctx["related_entities"]
def test_filter_entities_by_type_and_project(tmp_data_dir):
init_db()
init_engineering_schema()
create_entity("component", "Pin A", project="p04-gigabit")
create_entity("component", "Pin B", project="p04-gigabit")
create_entity("material", "Steel", project="p04-gigabit")
create_entity("component", "Actuator", project="p06-polisher")
components = get_entities(entity_type="component", project="p04-gigabit")
assert len(components) == 2
all_p04 = get_entities(project="p04-gigabit")
assert len(all_p04) == 3
polisher = get_entities(project="p06-polisher")
assert len(polisher) == 1
def test_invalid_entity_type_raises(tmp_data_dir):
init_db()
init_engineering_schema()
with pytest.raises(ValueError, match="Invalid entity type"):
create_entity("spaceship", "Enterprise")
def test_invalid_relationship_type_raises(tmp_data_dir):
init_db()
init_engineering_schema()
a = create_entity("component", "A")
b = create_entity("component", "B")
with pytest.raises(ValueError, match="Invalid relationship type"):
create_relationship(a.id, b.id, "loves")
def test_entity_name_search(tmp_data_dir):
init_db()
init_engineering_schema()
create_entity("component", "Vertical Support Pad")
create_entity("component", "Lateral Support Bracket")
create_entity("component", "Reference Frame")
results = get_entities(name_contains="Support")
assert len(results) == 2