feat(engineering): enforce V1-0 write invariants

This commit is contained in:
2026-04-22 14:59:17 -04:00
parent 9ab5b3c9d8
commit 2712c5d2d0
11 changed files with 734 additions and 8 deletions

View File

@@ -63,6 +63,12 @@ RELATIONSHIP_TYPES = [
ENTITY_STATUSES = ["candidate", "active", "superseded", "invalid"]
# V1-0: extractor version this module writes into new entity rows.
# Per promotion-rules.md:268, every candidate must record the version of
# the extractor that produced it so later re-evaluation is auditable.
# Bump this when extraction logic materially changes.
EXTRACTOR_VERSION = "v1.0.0"
@dataclass
class Entity:
@@ -77,6 +83,10 @@ class Entity:
source_refs: list[str] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
# V1-0 shared-header fields per engineering-v1-acceptance.md:45.
extractor_version: str = ""
canonical_home: str = "entity"
hand_authored: bool = False
@dataclass
@@ -103,10 +113,25 @@ def init_engineering_schema() -> None:
status TEXT NOT NULL DEFAULT 'active',
confidence REAL NOT NULL DEFAULT 1.0,
source_refs TEXT NOT NULL DEFAULT '[]',
extractor_version TEXT NOT NULL DEFAULT '',
canonical_home TEXT NOT NULL DEFAULT 'entity',
hand_authored INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# V1-0 (Engineering V1 completion): the three shared-header fields
# per engineering-v1-acceptance.md:45. Idempotent ALTERs for
# databases created before V1-0 land these columns without a full
# migration. Fresh DBs get them via the CREATE TABLE above; the
# ALTERs below are a no-op there.
from atocore.models.database import _column_exists # late import; avoids cycle
if not _column_exists(conn, "entities", "extractor_version"):
conn.execute("ALTER TABLE entities ADD COLUMN extractor_version TEXT DEFAULT ''")
if not _column_exists(conn, "entities", "canonical_home"):
conn.execute("ALTER TABLE entities ADD COLUMN canonical_home TEXT DEFAULT 'entity'")
if not _column_exists(conn, "entities", "hand_authored"):
conn.execute("ALTER TABLE entities ADD COLUMN hand_authored INTEGER DEFAULT 0")
conn.execute("""
CREATE TABLE IF NOT EXISTS relationships (
id TEXT PRIMARY KEY,
@@ -149,6 +174,8 @@ def create_entity(
confidence: float = 1.0,
source_refs: list[str] | None = None,
actor: str = "api",
hand_authored: bool = False,
extractor_version: str | None = None,
) -> Entity:
if entity_type not in ENTITY_TYPES:
raise ValueError(f"Invalid entity type: {entity_type}. Must be one of {ENTITY_TYPES}")
@@ -157,6 +184,21 @@ def create_entity(
if not name or not name.strip():
raise ValueError("Entity name must be non-empty")
refs = list(source_refs) if source_refs else []
# V1-0 (F-8 provenance enforcement, engineering-v1-acceptance.md:147):
# every new entity row must carry non-empty source_refs OR be explicitly
# flagged hand_authored. This is the non-negotiable invariant every
# later V1 phase depends on — without it, active entities can escape
# into the graph with no traceable origin. Raises at the write seam so
# the bug is impossible to introduce silently.
if not refs and not hand_authored:
raise ValueError(
"source_refs required: every entity must carry provenance "
"(source_chunk_id / source_interaction_id / kb_cad_export_id / ...) "
"or set hand_authored=True to explicitly flag a direct human write"
)
# Phase 5: enforce project canonicalization contract at the write seam.
# Aliases like "p04" become "p04-gigabit" so downstream reads stay
# consistent with the registry.
@@ -165,18 +207,22 @@ def create_entity(
entity_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
props = properties or {}
refs = source_refs or []
ev = extractor_version if extractor_version is not None else EXTRACTOR_VERSION
with get_connection() as conn:
conn.execute(
"""INSERT INTO entities
(id, entity_type, name, project, description, properties,
status, confidence, source_refs, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
status, confidence, source_refs,
extractor_version, canonical_home, hand_authored,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
entity_id, entity_type, name.strip(), project,
description, json.dumps(props), status, confidence,
json.dumps(refs), now, now,
json.dumps(refs),
ev, "entity", 1 if hand_authored else 0,
now, now,
),
)
@@ -194,14 +240,31 @@ def create_entity(
"project": project,
"status": status,
"confidence": confidence,
"hand_authored": hand_authored,
"extractor_version": ev,
},
)
# V1-0 (F-5 hook, engineering-v1-acceptance.md:99): synchronous
# conflict detection on any active-entity write. The promote path
# already had this hook (see promote_entity below); V1-0 adds it to
# direct-active creates so every active row — however it got that
# way — is checked. Fail-open per "flag, never block" rule in
# conflict-model.md:256: detector errors log but never fail the write.
if status == "active":
try:
from atocore.engineering.conflicts import detect_conflicts_for_entity
detect_conflicts_for_entity(entity_id)
except Exception as e:
log.warning("conflict_detection_failed", entity_id=entity_id, error=str(e))
return Entity(
id=entity_id, entity_type=entity_type, name=name.strip(),
project=project, description=description, properties=props,
status=status, confidence=confidence, source_refs=refs,
created_at=now, updated_at=now,
extractor_version=ev, canonical_home="entity",
hand_authored=hand_authored,
)
@@ -361,6 +424,20 @@ def promote_entity(
if entity is None or entity.status != "candidate":
return False
# V1-0 (F-8 provenance re-check at promote). The invariant must hold at
# BOTH create_entity AND promote_entity per the plan, because candidate
# rows can exist in the DB from before V1-0 (no enforcement at their
# create time) or can be inserted by code paths that bypass the service
# layer. Block any candidate with empty source_refs that is NOT flagged
# hand_authored from ever becoming active. Same error shape as the
# create-side check for symmetry.
if not (entity.source_refs or []) and not entity.hand_authored:
raise ValueError(
"source_refs required: cannot promote a candidate with no "
"provenance. Attach source_refs via PATCH /entities/{id}, "
"or flag hand_authored=true before promoting."
)
if target_project is not None:
new_project = (
resolve_project_name(target_project) if target_project else ""
@@ -503,6 +580,22 @@ def supersede_entity(
superseded_by=superseded_by,
error=str(e),
)
# V1-0 (F-5 hook on supersede, per plan's "every active-entity
# write path"). Supersede demotes `entity_id` AND adds a
# `supersedes` relationship rooted at the already-active
# `superseded_by`. That new edge can create a conflict the
# detector should catch synchronously. Fail-open per
# conflict-model.md:256.
try:
from atocore.engineering.conflicts import detect_conflicts_for_entity
detect_conflicts_for_entity(superseded_by)
except Exception as e:
log.warning(
"conflict_detection_failed",
entity_id=superseded_by,
error=str(e),
)
return True
@@ -774,6 +867,15 @@ def get_entity_with_context(entity_id: str) -> dict | None:
def _row_to_entity(row) -> Entity:
# V1-0 shared-header fields are optional on read — rows that predate
# V1-0 migration have NULL / missing values, so defaults kick in and
# older tests that build Entity() without the new fields keep passing.
# `row.keys()` lets us tolerate SQLite rows that lack the columns
# entirely (pre-migration sqlite3.Row).
keys = set(row.keys())
extractor_version = (row["extractor_version"] or "") if "extractor_version" in keys else ""
canonical_home = (row["canonical_home"] or "entity") if "canonical_home" in keys else "entity"
hand_authored = bool(row["hand_authored"]) if "hand_authored" in keys and row["hand_authored"] is not None else False
return Entity(
id=row["id"],
entity_type=row["entity_type"],
@@ -786,6 +888,9 @@ def _row_to_entity(row) -> Entity:
source_refs=json.loads(row["source_refs"] or "[]"),
created_at=row["created_at"] or "",
updated_at=row["updated_at"] or "",
extractor_version=extractor_version,
canonical_home=canonical_home,
hand_authored=hand_authored,
)