From 261277fd516488a1586329f68c89c4491906a758 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Wed, 8 Apr 2026 15:52:44 -0400 Subject: [PATCH] fix(migration): preserve superseded/invalid shadow state during rekey MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex caught a real data-loss bug in the legacy alias migration shipped in 7e60f5a. plan_state_migration filtered state rows to status='active' only, then apply_plan deleted the shadow projects row at the end. Because project_state.project_id has ON DELETE CASCADE, any superseded or invalid state rows still attached to the shadow project got silently cascade-deleted — exactly the audit loss a cleanup migration must not cause. This commit fixes the bug and adds regression tests that lock in the invariant "shadow state of every status is accounted for". Root cause ---------- scripts/migrate_legacy_aliases.py::plan_state_migration was: "SELECT * FROM project_state WHERE project_id = ? AND status = 'active'" which only found live rows. Any historical row (status in 'superseded' or 'invalid') was invisible to the plan, so the apply step had nothing to rekey for it. Then the shadow project row was deleted at the end, cascade-deleting every unplanned row. The fix ------- plan_state_migration now selects ALL state rows attached to the shadow project regardless of status, and handles every row per a per-status decision table: | Shadow status | Canonical at same triple? | Values | Action | |---------------|---------------------------|------------|--------------------------------| | any | no | — | clean rekey | | any | yes | same | shadow superseded in place | | active | yes, active | different | COLLISION, apply refuses | | active | yes, inactive | different | shadow wins, canonical deleted | | inactive | yes, any | different | historical drop (logged) | Four changes in the script: 1. SELECT drops the status filter so the plan walks every row. 2. New StateRekeyPlan.historical_drops list captures the shadow rows that lose to a canonical row at the same triple because the shadow is already inactive. These are the only unavoidable data losses, and they happen because the UNIQUE(project_id, category, key) constraint on project_state doesn't allow two rows per triple regardless of status. 3. New apply action 'replace_inactive_canonical' for the shadow-active-vs-canonical-inactive case. At apply time the canonical inactive row is DELETEd first (SQLite's default immediate constraint checking) and then the shadow is UPDATEd into its place in two separate statements. Adds a new state_rows_replaced_inactive_canonical counter. 4. New apply counter state_rows_historical_dropped for audit transparency. The rows themselves are still cascade-deleted when the shadow project row is dropped, but they're counted and reported. Five places render_plan_text and plan_to_json_dict updated: - counts() gains state_historical_drops - render_plan_text prints a 'historical drops' section with each shadow-canonical pair and their statuses when there are any, so the operator sees the audit loss BEFORE running --apply - The new section explicitly tells the operator: "if any of these values are worth keeping as separate audit records, manually copy them out before running --apply" - plan_to_json_dict carries historical_drops into the JSON report - The state counts table in the human report now shows both 'state collisions (block)' and 'state historical drops' as separate lines so the operator can distinguish "apply will refuse" from "apply will drop historical rows" Regression tests (3 new, all green) ----------------------------------- tests/test_migrate_legacy_aliases.py: - test_apply_preserves_superseded_shadow_state_when_no_collision: the direct regression for the codex finding. Seeds a shadow with a superseded state row on a triple the canonical doesn't have, runs the migration, verifies via raw SQL that the row is now attached to the canonical projects row and still has status 'superseded'. This is the test that would have failed before the fix. - test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple: covers the unavoidable data-loss case. Seeds shadow superseded + canonical active at the same triple with different values, verifies plan.counts() reports one historical_drop, runs apply, verifies the canonical value is preserved and the shadow value is gone. - test_apply_replaces_inactive_canonical_with_active_shadow: covers the cross-contamination case where shadow has live value and canonical has a stale invalid row. Shadow wins by deleting canonical and rekeying in its place. Verifies the counter and the final state. Plus _seed_state_row now accepts a status kwarg so the seeding helper can create superseded/invalid rows directly. test_dry_run_on_empty_registry_reports_empty_plan was updated to include the new state_historical_drops key in the expected counts dict (all zero for an empty plan, so the test shape is the same). Full suite: 197 passing (was 194), 1 warning. The +3 is the three new regression tests. What this commit does NOT do ---------------------------- - Does NOT try to preserve historical shadow rows that collide with a canonical row at the same triple. That would require a schema change (adding (id) to the UNIQUE key, or a separate history table) and isn't in scope for a cleanup migration. The operator sees these as explicit 'historical drops' in the plan output and can copy them out manually if any are worth preserving. - Does NOT change any behavior for rows that were already reachable from the canonicalized read path. The fix only affects legacy rows whose project_id points at a shadow row. - Does NOT re-verify the earlier happy-path tests beyond the full suite confirming them still green. --- scripts/migrate_legacy_aliases.py | 198 ++++++++++++++++++++++----- tests/test_migrate_legacy_aliases.py | 193 +++++++++++++++++++++++++- 2 files changed, 355 insertions(+), 36 deletions(-) diff --git a/scripts/migrate_legacy_aliases.py b/scripts/migrate_legacy_aliases.py index eae4f66..8e5e60e 100644 --- a/scripts/migrate_legacy_aliases.py +++ b/scripts/migrate_legacy_aliases.py @@ -111,6 +111,7 @@ class StateRekeyPlan: canonical_project_id: str rows_to_rekey: list[dict] = field(default_factory=list) collisions: list[dict] = field(default_factory=list) + historical_drops: list[dict] = field(default_factory=list) @dataclass @@ -154,6 +155,9 @@ class MigrationPlan: "shadow_projects": len(self.shadow_projects), "state_rekey_rows": sum(len(p.rows_to_rekey) for p in self.state_plans), "state_collisions": sum(len(p.collisions) for p in self.state_plans), + "state_historical_drops": sum( + len(p.historical_drops) for p in self.state_plans + ), "memory_rekey_rows": sum(len(p.rows_to_rekey) for p in self.memory_plans), "memory_supersede_rows": sum(len(p.to_supersede) for p in self.memory_plans), "interaction_rekey_rows": sum(len(p.rows_to_rekey) for p in self.interaction_plans), @@ -265,6 +269,34 @@ def plan_state_migration( conn: sqlite3.Connection, shadows: list[ShadowProject], ) -> list[StateRekeyPlan]: + """Plan the project_state migration for every shadow project. + + Selects ALL shadow-attached state rows regardless of status, so + superseded and invalid rows are preserved during the migration + instead of cascade-deleted when the shadow project row is dropped. + + Per-row decision rules (see the table in the migration module + docstring and in the commit that added this logic): + + - canonical triple is empty → clean rekey, any shadow status + - canonical triple has same value → shadow is a duplicate, mark + it superseded in place; its content lives on the canonical row + - canonical triple has different value, both active → collision, + apply refuses until a human resolves via POST /project/state + - canonical triple has different value, shadow active and + canonical inactive → shadow wins, canonical inactive row is + deleted and shadow is rekeyed into its place + - canonical triple has different value, shadow inactive → shadow + is historically out-of-date, canonical wins by definition, the + shadow row is recorded in historical_drops so the operator + sees the audit loss explicitly before running --apply + + The last case is the only unavoidable data loss, and it is + intrinsic to the UPSERT shape of the project_state table (at + most one row per (project_id, category, key)). The report + surfaces every drop with both shadow and canonical values so + nothing disappears silently. + """ plans: list[StateRekeyPlan] = [] for shadow in shadows: canonical_row_id = _find_or_plan_canonical_row( @@ -280,26 +312,29 @@ def plan_state_migration( canonical_project_id=shadow.canonical_project_id, ) - # All state rows currently attached to the shadow + # ALL state rows attached to the shadow, regardless of status. + # Dropping the status filter is the codex-flagged fix: without + # it, cascade delete of the shadow project silently destroys + # superseded/invalid historical state, which is exactly the + # audit loss a cleanup migration must not cause. state_rows = conn.execute( - "SELECT * FROM project_state WHERE project_id = ? AND status = 'active'", + "SELECT * FROM project_state WHERE project_id = ? ORDER BY updated_at DESC", (shadow.shadow_row_id,), ).fetchall() for raw in state_rows: state = _dict_row(raw) if canonical_row_id is None: - # No canonical row exists yet; nothing to collide with. - # The rekey is clean. + # No canonical row exists yet; any status rekeys cleanly. plan.rows_to_rekey.append(state) continue - # Check for a collision: an active state row on the - # canonical project with the same (category, key) whose - # value differs from the shadow's. + # Look for ANY row on the canonical project at the same + # (category, key) — status-agnostic, because the UNIQUE + # constraint is status-agnostic too. collision_row = conn.execute( "SELECT * FROM project_state " - "WHERE project_id = ? AND category = ? AND key = ? AND status = 'active'", + "WHERE project_id = ? AND category = ? AND key = ?", (canonical_row_id, state["category"], state["key"]), ).fetchone() @@ -307,21 +342,43 @@ def plan_state_migration( plan.rows_to_rekey.append(state) continue - if collision_row["value"] == state["value"]: - # Same value — trivially deduplicable. Plan to drop - # the shadow row (by marking it superseded via the - # rekey pathway: rekey would hit the UNIQUE - # constraint so we can't just UPDATE; instead we'll - # mark the shadow superseded at apply time). - state["_duplicate_of"] = _dict_row(collision_row) + canonical_state = _dict_row(collision_row) + + if canonical_state["value"] == state["value"]: + # Same value on both sides — the shadow is a pure + # duplicate. Mark it superseded in place; the content + # is already preserved on the canonical row. + state["_action"] = "supersede_shadow_duplicate" plan.rows_to_rekey.append(state) continue - plan.collisions.append( - { - "shadow": state, - "canonical": _dict_row(collision_row), - } + shadow_active = state["status"] == "active" + canonical_active = canonical_state["status"] == "active" + + if shadow_active and canonical_active: + # Two live values disagree — apply refuses until + # the human picks a winner. + plan.collisions.append( + {"shadow": state, "canonical": canonical_state} + ) + continue + + if shadow_active and not canonical_active: + # Shadow has the live value; canonical is stale. + # Shadow wins by deleting the canonical inactive row + # and rekeying the shadow into its place. + state["_action"] = "replace_inactive_canonical" + state["_canonical_to_delete"] = canonical_state + plan.rows_to_rekey.append(state) + continue + + # Shadow is inactive (superseded or invalid) and collides + # with a canonical row at the same triple. The canonical + # wins by definition of the UPSERT schema. Record this + # as a historical drop so the operator sees it in the + # plan output BEFORE running --apply. + plan.historical_drops.append( + {"shadow": state, "canonical": canonical_state} ) plans.append(plan) @@ -468,6 +525,8 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: "shadow_projects_deleted": 0, "state_rows_rekeyed": 0, "state_rows_merged_as_duplicate": 0, + "state_rows_replaced_inactive_canonical": 0, + "state_rows_historical_dropped": 0, "memory_rows_rekeyed": 0, "memory_rows_superseded": 0, "interaction_rows_rekeyed": 0, @@ -495,10 +554,16 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: summary["canonical_rows_created"] += 1 for state in sp_plan.rows_to_rekey: - if "_duplicate_of" in state: - # Same (category, key, value) already on canonical; - # mark the shadow state row as superseded rather - # than hitting the UNIQUE constraint. + action = state.get("_action") + + if action == "supersede_shadow_duplicate": + # Same (category, key, value) already on canonical. + # Mark the shadow state row superseded in place; + # the content is preserved on the canonical row. + # The shadow row will cascade-delete when its + # parent projects row is dropped at the end of + # this loop, which is fine because its value is + # preserved on the canonical side. conn.execute( "UPDATE project_state SET status = 'superseded', " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", @@ -506,6 +571,30 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: ) summary["state_rows_merged_as_duplicate"] += 1 continue + + if action == "replace_inactive_canonical": + # Shadow is active, canonical is stale (superseded + # or invalid). Delete the canonical inactive row + # and rekey the shadow in its place. SQLite's + # default immediate constraint checking means the + # DELETE must complete before the UPDATE for the + # UNIQUE(project_id, category, key) constraint to + # be happy. + canonical_to_delete = state["_canonical_to_delete"] + conn.execute( + "DELETE FROM project_state WHERE id = ?", + (canonical_to_delete["id"],), + ) + conn.execute( + "UPDATE project_state SET project_id = ?, " + "updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (canonical_row_id, state["id"]), + ) + summary["state_rows_rekeyed"] += 1 + summary["state_rows_replaced_inactive_canonical"] += 1 + continue + + # Clean rekey (no collision at the triple) conn.execute( "UPDATE project_state SET project_id = ?, " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", @@ -513,10 +602,19 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: ) summary["state_rows_rekeyed"] += 1 + # Historical drops are shadow rows that collide with a + # canonical row at the same triple where the shadow is + # inactive. No DB operation is needed — they'll be + # cascade-deleted when the shadow project row is + # dropped below. But we count them so the operator + # sees the audit loss in the summary. + summary["state_rows_historical_dropped"] += len(sp_plan.historical_drops) + # Delete shadow projects rows after their state dependents - # have moved. We can do this unconditionally because project_state - # has ON DELETE CASCADE on project_id, and all our dependents - # have already been rekeyed or superseded above. + # have moved. At this point, the only state rows still + # attached to any shadow project are the historical_drops + # entries, which are being deliberately discarded because + # the canonical row at the same triple already wins. for sp_plan in plan.state_plans: conn.execute( "DELETE FROM projects WHERE id = ?", (sp_plan.shadow_row_id,) @@ -603,12 +701,13 @@ def render_plan_text(plan: MigrationPlan) -> str: lines.append("") counts = plan.counts() lines.append("Counts:") - lines.append(f" shadow projects : {counts['shadow_projects']}") - lines.append(f" state rekey rows : {counts['state_rekey_rows']}") - lines.append(f" state collisions : {counts['state_collisions']}") - lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}") - lines.append(f" memory supersede : {counts['memory_supersede_rows']}") - lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}") + lines.append(f" shadow projects : {counts['shadow_projects']}") + lines.append(f" state rekey rows : {counts['state_rekey_rows']}") + lines.append(f" state collisions (block) : {counts['state_collisions']}") + lines.append(f" state historical drops : {counts['state_historical_drops']}") + lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}") + lines.append(f" memory supersede : {counts['memory_supersede_rows']}") + lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}") if plan.is_empty: lines.append("") @@ -643,6 +742,38 @@ def render_plan_text(plan: MigrationPlan) -> str: ) return "\n".join(lines) + # Historical drops are explicit, non-blocking audit-loss events. + # They're surfaced so the operator sees them BEFORE running --apply + # and can decide to manually preserve the shadow values if any of + # them are worth keeping as separate records. + historical_drops = [ + drop + for sp_plan in plan.state_plans + for drop in sp_plan.historical_drops + ] + if historical_drops: + lines.append("") + lines.append( + f"*** {len(historical_drops)} HISTORICAL DROPS — shadow inactive " + "state rows that collide with a canonical row: ***" + ) + for drop in historical_drops: + shadow = drop["shadow"] + canonical = drop["canonical"] + lines.append( + f" - [{shadow['category']}/{shadow['key']}] " + f"shadow({shadow['status']})={shadow['value']!r} -> " + f"dropped in favor of canonical({canonical['status']})={canonical['value']!r}" + ) + lines.append("") + lines.append( + "These rows are superseded/invalid on the shadow side. The " + "UNIQUE(project_id, category, key) constraint prevents " + "preserving them alongside the canonical row. If any of these " + "values are worth keeping as separate audit records, manually " + "copy them out before running --apply." + ) + lines.append("") lines.append("Apply plan (if --apply is given):") for sp_plan in plan.state_plans: @@ -687,6 +818,7 @@ def plan_to_json_dict(plan: MigrationPlan) -> dict: "canonical_project_id": sp.canonical_project_id, "rows_to_rekey": sp.rows_to_rekey, "collisions": sp.collisions, + "historical_drops": sp.historical_drops, } for sp in plan.state_plans ], diff --git a/tests/test_migrate_legacy_aliases.py b/tests/test_migrate_legacy_aliases.py index fbfde95..18af25c 100644 --- a/tests/test_migrate_legacy_aliases.py +++ b/tests/test_migrate_legacy_aliases.py @@ -73,13 +73,14 @@ def _seed_state_row( category: str, key: str, value: str, + status: str = "active", ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO project_state " - "(id, project_id, category, key, value, source, confidence) " - "VALUES (?, ?, ?, ?, ?, ?, ?)", - (row_id, project_id, category, key, value, "legacy-test", 1.0), + "(id, project_id, category, key, value, source, confidence, status) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (row_id, project_id, category, key, value, "legacy-test", 1.0, status), ) conn.commit() return row_id @@ -147,6 +148,7 @@ def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir): "shadow_projects": 0, "state_rekey_rows": 0, "state_collisions": 0, + "state_historical_drops": 0, "memory_rekey_rows": 0, "memory_supersede_rows": 0, "interaction_rekey_rows": 0, @@ -368,6 +370,191 @@ def test_apply_drops_shadow_state_duplicate_without_collision(project_registry): assert via_canonical[0].value == "Wave 1 ingestion" +def test_apply_preserves_superseded_shadow_state_when_no_collision(project_registry): + """Regression test for the codex-flagged data-loss bug. + + Before the fix, plan_state_migration only selected status='active' + rows. Any superseded or invalid row on the shadow project was + invisible to the plan and got silently cascade-deleted when the + shadow projects row was dropped at the end of apply. That's + exactly the kind of audit loss a cleanup migration must not cause. + + This test seeds a shadow project with a superseded state row on + a triple the canonical project doesn't have, runs the migration, + and verifies the row survived and is now attached to the + canonical project (still with status='superseded'). + """ + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + # Superseded row on a triple the canonical won't have + _seed_state_row( + conn, + shadow_id, + "status", + "historical_phase", + "Phase 0 legacy", + status="superseded", + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + # The superseded row should have been rekeyed, not dropped + assert summary["state_rows_rekeyed"] == 1 + assert summary["state_rows_historical_dropped"] == 0 + + # Verify via raw SQL that the row is now attached to the canonical + # projects row and still has status='superseded' + conn = _open_db_connection() + try: + row = conn.execute( + "SELECT ps.status, ps.value, p.name " + "FROM project_state ps JOIN projects p ON ps.project_id = p.id " + "WHERE ps.category = ? AND ps.key = ?", + ("status", "historical_phase"), + ).fetchone() + finally: + conn.close() + + assert row is not None, "superseded shadow row was lost during migration" + assert row["status"] == "superseded" + assert row["value"] == "Phase 0 legacy" + assert row["name"] == "p05-interferometer" + + +def test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple(project_registry): + """Shadow is inactive (superseded) and collides with an active canonical row. + + The canonical wins by definition of the UPSERT schema. The shadow + row is recorded as a historical_drop in the plan so the operator + sees the audit loss, and the apply cascade-deletes it via the + shadow projects row. This is the unavoidable data-loss case + documented in the migration module docstring. + """ + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + canonical_id = _seed_shadow_project(conn, "p05-interferometer") + + # Shadow has a superseded value on a triple where the canonical + # has a different active value. Can't preserve both: UNIQUE + # allows only one row per triple. + _seed_state_row( + conn, + shadow_id, + "status", + "next_focus", + "Old wave 1", + status="superseded", + ) + _seed_state_row( + conn, + canonical_id, + "status", + "next_focus", + "Wave 2 trusted-operational", + status="active", + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions # not an active-vs-active collision + assert plan.counts()["state_historical_drops"] == 1 + + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["state_rows_historical_dropped"] == 1 + + # The canonical's active row survives unchanged + via_canonical = get_state("p05-interferometer") + active_next_focus = [ + e + for e in via_canonical + if e.category == "status" and e.key == "next_focus" + ] + assert len(active_next_focus) == 1 + assert active_next_focus[0].value == "Wave 2 trusted-operational" + + +def test_apply_replaces_inactive_canonical_with_active_shadow(project_registry): + """Shadow is active, canonical has an inactive row at the same triple. + + The shadow wins: canonical inactive row is deleted, shadow is + rekeyed into canonical's project_id. This covers the + cross-contamination case where the old alias path was used for + the live value while the canonical path had a stale row. + """ + registry_path = project_registry( + ("p06-polisher", ["p06", "polisher"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p06") + canonical_id = _seed_shadow_project(conn, "p06-polisher") + + # Canonical has a stale invalid row; shadow has the live value. + _seed_state_row( + conn, + canonical_id, + "decision", + "frame", + "Old frame (no longer current)", + status="invalid", + ) + _seed_state_row( + conn, + shadow_id, + "decision", + "frame", + "kinematic mount frame", + status="active", + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions + assert plan.counts()["state_historical_drops"] == 0 + + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["state_rows_replaced_inactive_canonical"] == 1 + + # The active shadow value now lives on the canonical row + via_canonical = get_state("p06-polisher") + frame_entries = [ + e for e in via_canonical if e.category == "decision" and e.key == "frame" + ] + assert len(frame_entries) == 1 + assert frame_entries[0].value == "kinematic mount frame" + + # Confirm via raw SQL that the previously-inactive canonical row + # no longer exists + conn = _open_db_connection() + try: + stale = conn.execute( + "SELECT COUNT(*) AS c FROM project_state WHERE value = ?", + ("Old frame (no longer current)",), + ).fetchone() + finally: + conn.close() + assert stale["c"] == 0 + + def test_apply_migrates_memories(project_registry): registry_path = project_registry( ("p04-gigabit", ["p04", "gigabit"])