fix(migration): preserve superseded/invalid shadow state during rekey

Codex caught a real data-loss bug in the legacy alias migration
shipped in 7e60f5a. plan_state_migration filtered state rows to
status='active' only, then apply_plan deleted the shadow projects
row at the end. Because project_state.project_id has
ON DELETE CASCADE, any superseded or invalid state rows still
attached to the shadow project got silently cascade-deleted —
exactly the audit loss a cleanup migration must not cause.

This commit fixes the bug and adds regression tests that lock in
the invariant "shadow state of every status is accounted for".

Root cause
----------
scripts/migrate_legacy_aliases.py::plan_state_migration was:

    "SELECT * FROM project_state WHERE project_id = ? AND status = 'active'"

which only found live rows. Any historical row (status in
'superseded' or 'invalid') was invisible to the plan, so the apply
step had nothing to rekey for it. Then the shadow project row was
deleted at the end, cascade-deleting every unplanned row.

The fix
-------
plan_state_migration now selects ALL state rows attached to the
shadow project regardless of status, and handles every row per a
per-status decision table:

| Shadow status | Canonical at same triple? | Values     | Action                         |
|---------------|---------------------------|------------|--------------------------------|
| any           | no                        | —          | clean rekey                    |
| any           | yes                       | same       | shadow superseded in place     |
| active        | yes, active               | different  | COLLISION, apply refuses       |
| active        | yes, inactive             | different  | shadow wins, canonical deleted |
| inactive      | yes, any                  | different  | historical drop (logged)       |

Four changes in the script:

1. SELECT drops the status filter so the plan walks every row.
2. New StateRekeyPlan.historical_drops list captures the shadow
   rows that lose to a canonical row at the same triple because the
   shadow is already inactive. These are the only unavoidable data
   losses, and they happen because the UNIQUE(project_id, category,
   key) constraint on project_state doesn't allow two rows per
   triple regardless of status.
3. New apply action 'replace_inactive_canonical' for the
   shadow-active-vs-canonical-inactive case. At apply time the
   canonical inactive row is DELETEd first (SQLite's default
   immediate constraint checking) and then the shadow is UPDATEd
   into its place in two separate statements. Adds a new
   state_rows_replaced_inactive_canonical counter.
4. New apply counter state_rows_historical_dropped for audit
   transparency. The rows themselves are still cascade-deleted
   when the shadow project row is dropped, but they're counted
   and reported.

Five places render_plan_text and plan_to_json_dict updated:

- counts() gains state_historical_drops
- render_plan_text prints a 'historical drops' section with each
  shadow-canonical pair and their statuses when there are any, so
  the operator sees the audit loss BEFORE running --apply
- The new section explicitly tells the operator: "if any of these
  values are worth keeping as separate audit records, manually copy
  them out before running --apply"
- plan_to_json_dict carries historical_drops into the JSON report
- The state counts table in the human report now shows both
  'state collisions (block)' and 'state historical drops' as
  separate lines so the operator can distinguish
  "apply will refuse" from "apply will drop historical rows"

Regression tests (3 new, all green)
-----------------------------------
tests/test_migrate_legacy_aliases.py:

- test_apply_preserves_superseded_shadow_state_when_no_collision:
  the direct regression for the codex finding. Seeds a shadow with
  a superseded state row on a triple the canonical doesn't have,
  runs the migration, verifies via raw SQL that the row is now
  attached to the canonical projects row and still has status
  'superseded'. This is the test that would have failed before
  the fix.
- test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple:
  covers the unavoidable data-loss case. Seeds shadow superseded
  + canonical active at the same triple with different values,
  verifies plan.counts() reports one historical_drop, runs apply,
  verifies the canonical value is preserved and the shadow value
  is gone.
- test_apply_replaces_inactive_canonical_with_active_shadow:
  covers the cross-contamination case where shadow has live value
  and canonical has a stale invalid row. Shadow wins by deleting
  canonical and rekeying in its place. Verifies the counter and
  the final state.

Plus _seed_state_row now accepts a status kwarg so the seeding
helper can create superseded/invalid rows directly.

test_dry_run_on_empty_registry_reports_empty_plan was updated to
include the new state_historical_drops key in the expected counts
dict (all zero for an empty plan, so the test shape is the same).

Full suite: 197 passing (was 194), 1 warning. The +3 is the three
new regression tests.

What this commit does NOT do
----------------------------
- Does NOT try to preserve historical shadow rows that collide
  with a canonical row at the same triple. That would require a
  schema change (adding (id) to the UNIQUE key, or a separate
  history table) and isn't in scope for a cleanup migration.
  The operator sees these as explicit 'historical drops' in the
  plan output and can copy them out manually if any are worth
  preserving.
- Does NOT change any behavior for rows that were already
  reachable from the canonicalized read path. The fix only
  affects legacy rows whose project_id points at a shadow row.
- Does NOT re-verify the earlier happy-path tests beyond the full
  suite confirming them still green.
This commit is contained in:
2026-04-08 15:52:44 -04:00
parent 7e60f5a0e6
commit 261277fd51
2 changed files with 355 additions and 36 deletions

View File

@@ -111,6 +111,7 @@ class StateRekeyPlan:
canonical_project_id: str
rows_to_rekey: list[dict] = field(default_factory=list)
collisions: list[dict] = field(default_factory=list)
historical_drops: list[dict] = field(default_factory=list)
@dataclass
@@ -154,6 +155,9 @@ class MigrationPlan:
"shadow_projects": len(self.shadow_projects),
"state_rekey_rows": sum(len(p.rows_to_rekey) for p in self.state_plans),
"state_collisions": sum(len(p.collisions) for p in self.state_plans),
"state_historical_drops": sum(
len(p.historical_drops) for p in self.state_plans
),
"memory_rekey_rows": sum(len(p.rows_to_rekey) for p in self.memory_plans),
"memory_supersede_rows": sum(len(p.to_supersede) for p in self.memory_plans),
"interaction_rekey_rows": sum(len(p.rows_to_rekey) for p in self.interaction_plans),
@@ -265,6 +269,34 @@ def plan_state_migration(
conn: sqlite3.Connection,
shadows: list[ShadowProject],
) -> list[StateRekeyPlan]:
"""Plan the project_state migration for every shadow project.
Selects ALL shadow-attached state rows regardless of status, so
superseded and invalid rows are preserved during the migration
instead of cascade-deleted when the shadow project row is dropped.
Per-row decision rules (see the table in the migration module
docstring and in the commit that added this logic):
- canonical triple is empty → clean rekey, any shadow status
- canonical triple has same value → shadow is a duplicate, mark
it superseded in place; its content lives on the canonical row
- canonical triple has different value, both active → collision,
apply refuses until a human resolves via POST /project/state
- canonical triple has different value, shadow active and
canonical inactive → shadow wins, canonical inactive row is
deleted and shadow is rekeyed into its place
- canonical triple has different value, shadow inactive → shadow
is historically out-of-date, canonical wins by definition, the
shadow row is recorded in historical_drops so the operator
sees the audit loss explicitly before running --apply
The last case is the only unavoidable data loss, and it is
intrinsic to the UPSERT shape of the project_state table (at
most one row per (project_id, category, key)). The report
surfaces every drop with both shadow and canonical values so
nothing disappears silently.
"""
plans: list[StateRekeyPlan] = []
for shadow in shadows:
canonical_row_id = _find_or_plan_canonical_row(
@@ -280,26 +312,29 @@ def plan_state_migration(
canonical_project_id=shadow.canonical_project_id,
)
# All state rows currently attached to the shadow
# ALL state rows attached to the shadow, regardless of status.
# Dropping the status filter is the codex-flagged fix: without
# it, cascade delete of the shadow project silently destroys
# superseded/invalid historical state, which is exactly the
# audit loss a cleanup migration must not cause.
state_rows = conn.execute(
"SELECT * FROM project_state WHERE project_id = ? AND status = 'active'",
"SELECT * FROM project_state WHERE project_id = ? ORDER BY updated_at DESC",
(shadow.shadow_row_id,),
).fetchall()
for raw in state_rows:
state = _dict_row(raw)
if canonical_row_id is None:
# No canonical row exists yet; nothing to collide with.
# The rekey is clean.
# No canonical row exists yet; any status rekeys cleanly.
plan.rows_to_rekey.append(state)
continue
# Check for a collision: an active state row on the
# canonical project with the same (category, key) whose
# value differs from the shadow's.
# Look for ANY row on the canonical project at the same
# (category, key) — status-agnostic, because the UNIQUE
# constraint is status-agnostic too.
collision_row = conn.execute(
"SELECT * FROM project_state "
"WHERE project_id = ? AND category = ? AND key = ? AND status = 'active'",
"WHERE project_id = ? AND category = ? AND key = ?",
(canonical_row_id, state["category"], state["key"]),
).fetchone()
@@ -307,21 +342,43 @@ def plan_state_migration(
plan.rows_to_rekey.append(state)
continue
if collision_row["value"] == state["value"]:
# Same value — trivially deduplicable. Plan to drop
# the shadow row (by marking it superseded via the
# rekey pathway: rekey would hit the UNIQUE
# constraint so we can't just UPDATE; instead we'll
# mark the shadow superseded at apply time).
state["_duplicate_of"] = _dict_row(collision_row)
canonical_state = _dict_row(collision_row)
if canonical_state["value"] == state["value"]:
# Same value on both sides — the shadow is a pure
# duplicate. Mark it superseded in place; the content
# is already preserved on the canonical row.
state["_action"] = "supersede_shadow_duplicate"
plan.rows_to_rekey.append(state)
continue
plan.collisions.append(
{
"shadow": state,
"canonical": _dict_row(collision_row),
}
shadow_active = state["status"] == "active"
canonical_active = canonical_state["status"] == "active"
if shadow_active and canonical_active:
# Two live values disagree — apply refuses until
# the human picks a winner.
plan.collisions.append(
{"shadow": state, "canonical": canonical_state}
)
continue
if shadow_active and not canonical_active:
# Shadow has the live value; canonical is stale.
# Shadow wins by deleting the canonical inactive row
# and rekeying the shadow into its place.
state["_action"] = "replace_inactive_canonical"
state["_canonical_to_delete"] = canonical_state
plan.rows_to_rekey.append(state)
continue
# Shadow is inactive (superseded or invalid) and collides
# with a canonical row at the same triple. The canonical
# wins by definition of the UPSERT schema. Record this
# as a historical drop so the operator sees it in the
# plan output BEFORE running --apply.
plan.historical_drops.append(
{"shadow": state, "canonical": canonical_state}
)
plans.append(plan)
@@ -468,6 +525,8 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict:
"shadow_projects_deleted": 0,
"state_rows_rekeyed": 0,
"state_rows_merged_as_duplicate": 0,
"state_rows_replaced_inactive_canonical": 0,
"state_rows_historical_dropped": 0,
"memory_rows_rekeyed": 0,
"memory_rows_superseded": 0,
"interaction_rows_rekeyed": 0,
@@ -495,10 +554,16 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict:
summary["canonical_rows_created"] += 1
for state in sp_plan.rows_to_rekey:
if "_duplicate_of" in state:
# Same (category, key, value) already on canonical;
# mark the shadow state row as superseded rather
# than hitting the UNIQUE constraint.
action = state.get("_action")
if action == "supersede_shadow_duplicate":
# Same (category, key, value) already on canonical.
# Mark the shadow state row superseded in place;
# the content is preserved on the canonical row.
# The shadow row will cascade-delete when its
# parent projects row is dropped at the end of
# this loop, which is fine because its value is
# preserved on the canonical side.
conn.execute(
"UPDATE project_state SET status = 'superseded', "
"updated_at = CURRENT_TIMESTAMP WHERE id = ?",
@@ -506,6 +571,30 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict:
)
summary["state_rows_merged_as_duplicate"] += 1
continue
if action == "replace_inactive_canonical":
# Shadow is active, canonical is stale (superseded
# or invalid). Delete the canonical inactive row
# and rekey the shadow in its place. SQLite's
# default immediate constraint checking means the
# DELETE must complete before the UPDATE for the
# UNIQUE(project_id, category, key) constraint to
# be happy.
canonical_to_delete = state["_canonical_to_delete"]
conn.execute(
"DELETE FROM project_state WHERE id = ?",
(canonical_to_delete["id"],),
)
conn.execute(
"UPDATE project_state SET project_id = ?, "
"updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(canonical_row_id, state["id"]),
)
summary["state_rows_rekeyed"] += 1
summary["state_rows_replaced_inactive_canonical"] += 1
continue
# Clean rekey (no collision at the triple)
conn.execute(
"UPDATE project_state SET project_id = ?, "
"updated_at = CURRENT_TIMESTAMP WHERE id = ?",
@@ -513,10 +602,19 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict:
)
summary["state_rows_rekeyed"] += 1
# Historical drops are shadow rows that collide with a
# canonical row at the same triple where the shadow is
# inactive. No DB operation is needed — they'll be
# cascade-deleted when the shadow project row is
# dropped below. But we count them so the operator
# sees the audit loss in the summary.
summary["state_rows_historical_dropped"] += len(sp_plan.historical_drops)
# Delete shadow projects rows after their state dependents
# have moved. We can do this unconditionally because project_state
# has ON DELETE CASCADE on project_id, and all our dependents
# have already been rekeyed or superseded above.
# have moved. At this point, the only state rows still
# attached to any shadow project are the historical_drops
# entries, which are being deliberately discarded because
# the canonical row at the same triple already wins.
for sp_plan in plan.state_plans:
conn.execute(
"DELETE FROM projects WHERE id = ?", (sp_plan.shadow_row_id,)
@@ -603,12 +701,13 @@ def render_plan_text(plan: MigrationPlan) -> str:
lines.append("")
counts = plan.counts()
lines.append("Counts:")
lines.append(f" shadow projects : {counts['shadow_projects']}")
lines.append(f" state rekey rows : {counts['state_rekey_rows']}")
lines.append(f" state collisions : {counts['state_collisions']}")
lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}")
lines.append(f" memory supersede : {counts['memory_supersede_rows']}")
lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}")
lines.append(f" shadow projects : {counts['shadow_projects']}")
lines.append(f" state rekey rows : {counts['state_rekey_rows']}")
lines.append(f" state collisions (block) : {counts['state_collisions']}")
lines.append(f" state historical drops : {counts['state_historical_drops']}")
lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}")
lines.append(f" memory supersede : {counts['memory_supersede_rows']}")
lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}")
if plan.is_empty:
lines.append("")
@@ -643,6 +742,38 @@ def render_plan_text(plan: MigrationPlan) -> str:
)
return "\n".join(lines)
# Historical drops are explicit, non-blocking audit-loss events.
# They're surfaced so the operator sees them BEFORE running --apply
# and can decide to manually preserve the shadow values if any of
# them are worth keeping as separate records.
historical_drops = [
drop
for sp_plan in plan.state_plans
for drop in sp_plan.historical_drops
]
if historical_drops:
lines.append("")
lines.append(
f"*** {len(historical_drops)} HISTORICAL DROPS — shadow inactive "
"state rows that collide with a canonical row: ***"
)
for drop in historical_drops:
shadow = drop["shadow"]
canonical = drop["canonical"]
lines.append(
f" - [{shadow['category']}/{shadow['key']}] "
f"shadow({shadow['status']})={shadow['value']!r} -> "
f"dropped in favor of canonical({canonical['status']})={canonical['value']!r}"
)
lines.append("")
lines.append(
"These rows are superseded/invalid on the shadow side. The "
"UNIQUE(project_id, category, key) constraint prevents "
"preserving them alongside the canonical row. If any of these "
"values are worth keeping as separate audit records, manually "
"copy them out before running --apply."
)
lines.append("")
lines.append("Apply plan (if --apply is given):")
for sp_plan in plan.state_plans:
@@ -687,6 +818,7 @@ def plan_to_json_dict(plan: MigrationPlan) -> dict:
"canonical_project_id": sp.canonical_project_id,
"rows_to_rekey": sp.rows_to_rekey,
"collisions": sp.collisions,
"historical_drops": sp.historical_drops,
}
for sp in plan.state_plans
],

View File

@@ -73,13 +73,14 @@ def _seed_state_row(
category: str,
key: str,
value: str,
status: str = "active",
) -> str:
row_id = str(uuid.uuid4())
conn.execute(
"INSERT INTO project_state "
"(id, project_id, category, key, value, source, confidence) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(row_id, project_id, category, key, value, "legacy-test", 1.0),
"(id, project_id, category, key, value, source, confidence, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(row_id, project_id, category, key, value, "legacy-test", 1.0, status),
)
conn.commit()
return row_id
@@ -147,6 +148,7 @@ def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir):
"shadow_projects": 0,
"state_rekey_rows": 0,
"state_collisions": 0,
"state_historical_drops": 0,
"memory_rekey_rows": 0,
"memory_supersede_rows": 0,
"interaction_rekey_rows": 0,
@@ -368,6 +370,191 @@ def test_apply_drops_shadow_state_duplicate_without_collision(project_registry):
assert via_canonical[0].value == "Wave 1 ingestion"
def test_apply_preserves_superseded_shadow_state_when_no_collision(project_registry):
"""Regression test for the codex-flagged data-loss bug.
Before the fix, plan_state_migration only selected status='active'
rows. Any superseded or invalid row on the shadow project was
invisible to the plan and got silently cascade-deleted when the
shadow projects row was dropped at the end of apply. That's
exactly the kind of audit loss a cleanup migration must not cause.
This test seeds a shadow project with a superseded state row on
a triple the canonical project doesn't have, runs the migration,
and verifies the row survived and is now attached to the
canonical project (still with status='superseded').
"""
registry_path = project_registry(
("p05-interferometer", ["p05", "interferometer"])
)
conn = _open_db_connection()
try:
shadow_id = _seed_shadow_project(conn, "p05")
# Superseded row on a triple the canonical won't have
_seed_state_row(
conn,
shadow_id,
"status",
"historical_phase",
"Phase 0 legacy",
status="superseded",
)
plan = mig.build_plan(conn, registry_path)
assert not plan.has_collisions
summary = mig.apply_plan(conn, plan)
finally:
conn.close()
# The superseded row should have been rekeyed, not dropped
assert summary["state_rows_rekeyed"] == 1
assert summary["state_rows_historical_dropped"] == 0
# Verify via raw SQL that the row is now attached to the canonical
# projects row and still has status='superseded'
conn = _open_db_connection()
try:
row = conn.execute(
"SELECT ps.status, ps.value, p.name "
"FROM project_state ps JOIN projects p ON ps.project_id = p.id "
"WHERE ps.category = ? AND ps.key = ?",
("status", "historical_phase"),
).fetchone()
finally:
conn.close()
assert row is not None, "superseded shadow row was lost during migration"
assert row["status"] == "superseded"
assert row["value"] == "Phase 0 legacy"
assert row["name"] == "p05-interferometer"
def test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple(project_registry):
"""Shadow is inactive (superseded) and collides with an active canonical row.
The canonical wins by definition of the UPSERT schema. The shadow
row is recorded as a historical_drop in the plan so the operator
sees the audit loss, and the apply cascade-deletes it via the
shadow projects row. This is the unavoidable data-loss case
documented in the migration module docstring.
"""
registry_path = project_registry(
("p05-interferometer", ["p05", "interferometer"])
)
conn = _open_db_connection()
try:
shadow_id = _seed_shadow_project(conn, "p05")
canonical_id = _seed_shadow_project(conn, "p05-interferometer")
# Shadow has a superseded value on a triple where the canonical
# has a different active value. Can't preserve both: UNIQUE
# allows only one row per triple.
_seed_state_row(
conn,
shadow_id,
"status",
"next_focus",
"Old wave 1",
status="superseded",
)
_seed_state_row(
conn,
canonical_id,
"status",
"next_focus",
"Wave 2 trusted-operational",
status="active",
)
plan = mig.build_plan(conn, registry_path)
assert not plan.has_collisions # not an active-vs-active collision
assert plan.counts()["state_historical_drops"] == 1
summary = mig.apply_plan(conn, plan)
finally:
conn.close()
assert summary["state_rows_historical_dropped"] == 1
# The canonical's active row survives unchanged
via_canonical = get_state("p05-interferometer")
active_next_focus = [
e
for e in via_canonical
if e.category == "status" and e.key == "next_focus"
]
assert len(active_next_focus) == 1
assert active_next_focus[0].value == "Wave 2 trusted-operational"
def test_apply_replaces_inactive_canonical_with_active_shadow(project_registry):
"""Shadow is active, canonical has an inactive row at the same triple.
The shadow wins: canonical inactive row is deleted, shadow is
rekeyed into canonical's project_id. This covers the
cross-contamination case where the old alias path was used for
the live value while the canonical path had a stale row.
"""
registry_path = project_registry(
("p06-polisher", ["p06", "polisher"])
)
conn = _open_db_connection()
try:
shadow_id = _seed_shadow_project(conn, "p06")
canonical_id = _seed_shadow_project(conn, "p06-polisher")
# Canonical has a stale invalid row; shadow has the live value.
_seed_state_row(
conn,
canonical_id,
"decision",
"frame",
"Old frame (no longer current)",
status="invalid",
)
_seed_state_row(
conn,
shadow_id,
"decision",
"frame",
"kinematic mount frame",
status="active",
)
plan = mig.build_plan(conn, registry_path)
assert not plan.has_collisions
assert plan.counts()["state_historical_drops"] == 0
summary = mig.apply_plan(conn, plan)
finally:
conn.close()
assert summary["state_rows_replaced_inactive_canonical"] == 1
# The active shadow value now lives on the canonical row
via_canonical = get_state("p06-polisher")
frame_entries = [
e for e in via_canonical if e.category == "decision" and e.key == "frame"
]
assert len(frame_entries) == 1
assert frame_entries[0].value == "kinematic mount frame"
# Confirm via raw SQL that the previously-inactive canonical row
# no longer exists
conn = _open_db_connection()
try:
stale = conn.execute(
"SELECT COUNT(*) AS c FROM project_state WHERE value = ?",
("Old frame (no longer current)",),
).fetchone()
finally:
conn.close()
assert stale["c"] == 0
def test_apply_migrates_memories(project_registry):
registry_path = project_registry(
("p04-gigabit", ["p04", "gigabit"])