diff --git a/scripts/migrate_legacy_aliases.py b/scripts/migrate_legacy_aliases.py index eae4f66..8e5e60e 100644 --- a/scripts/migrate_legacy_aliases.py +++ b/scripts/migrate_legacy_aliases.py @@ -111,6 +111,7 @@ class StateRekeyPlan: canonical_project_id: str rows_to_rekey: list[dict] = field(default_factory=list) collisions: list[dict] = field(default_factory=list) + historical_drops: list[dict] = field(default_factory=list) @dataclass @@ -154,6 +155,9 @@ class MigrationPlan: "shadow_projects": len(self.shadow_projects), "state_rekey_rows": sum(len(p.rows_to_rekey) for p in self.state_plans), "state_collisions": sum(len(p.collisions) for p in self.state_plans), + "state_historical_drops": sum( + len(p.historical_drops) for p in self.state_plans + ), "memory_rekey_rows": sum(len(p.rows_to_rekey) for p in self.memory_plans), "memory_supersede_rows": sum(len(p.to_supersede) for p in self.memory_plans), "interaction_rekey_rows": sum(len(p.rows_to_rekey) for p in self.interaction_plans), @@ -265,6 +269,34 @@ def plan_state_migration( conn: sqlite3.Connection, shadows: list[ShadowProject], ) -> list[StateRekeyPlan]: + """Plan the project_state migration for every shadow project. + + Selects ALL shadow-attached state rows regardless of status, so + superseded and invalid rows are preserved during the migration + instead of cascade-deleted when the shadow project row is dropped. + + Per-row decision rules (see the table in the migration module + docstring and in the commit that added this logic): + + - canonical triple is empty → clean rekey, any shadow status + - canonical triple has same value → shadow is a duplicate, mark + it superseded in place; its content lives on the canonical row + - canonical triple has different value, both active → collision, + apply refuses until a human resolves via POST /project/state + - canonical triple has different value, shadow active and + canonical inactive → shadow wins, canonical inactive row is + deleted and shadow is rekeyed into its place + - canonical triple has different value, shadow inactive → shadow + is historically out-of-date, canonical wins by definition, the + shadow row is recorded in historical_drops so the operator + sees the audit loss explicitly before running --apply + + The last case is the only unavoidable data loss, and it is + intrinsic to the UPSERT shape of the project_state table (at + most one row per (project_id, category, key)). The report + surfaces every drop with both shadow and canonical values so + nothing disappears silently. + """ plans: list[StateRekeyPlan] = [] for shadow in shadows: canonical_row_id = _find_or_plan_canonical_row( @@ -280,26 +312,29 @@ def plan_state_migration( canonical_project_id=shadow.canonical_project_id, ) - # All state rows currently attached to the shadow + # ALL state rows attached to the shadow, regardless of status. + # Dropping the status filter is the codex-flagged fix: without + # it, cascade delete of the shadow project silently destroys + # superseded/invalid historical state, which is exactly the + # audit loss a cleanup migration must not cause. state_rows = conn.execute( - "SELECT * FROM project_state WHERE project_id = ? AND status = 'active'", + "SELECT * FROM project_state WHERE project_id = ? ORDER BY updated_at DESC", (shadow.shadow_row_id,), ).fetchall() for raw in state_rows: state = _dict_row(raw) if canonical_row_id is None: - # No canonical row exists yet; nothing to collide with. - # The rekey is clean. + # No canonical row exists yet; any status rekeys cleanly. plan.rows_to_rekey.append(state) continue - # Check for a collision: an active state row on the - # canonical project with the same (category, key) whose - # value differs from the shadow's. + # Look for ANY row on the canonical project at the same + # (category, key) — status-agnostic, because the UNIQUE + # constraint is status-agnostic too. collision_row = conn.execute( "SELECT * FROM project_state " - "WHERE project_id = ? AND category = ? AND key = ? AND status = 'active'", + "WHERE project_id = ? AND category = ? AND key = ?", (canonical_row_id, state["category"], state["key"]), ).fetchone() @@ -307,21 +342,43 @@ def plan_state_migration( plan.rows_to_rekey.append(state) continue - if collision_row["value"] == state["value"]: - # Same value — trivially deduplicable. Plan to drop - # the shadow row (by marking it superseded via the - # rekey pathway: rekey would hit the UNIQUE - # constraint so we can't just UPDATE; instead we'll - # mark the shadow superseded at apply time). - state["_duplicate_of"] = _dict_row(collision_row) + canonical_state = _dict_row(collision_row) + + if canonical_state["value"] == state["value"]: + # Same value on both sides — the shadow is a pure + # duplicate. Mark it superseded in place; the content + # is already preserved on the canonical row. + state["_action"] = "supersede_shadow_duplicate" plan.rows_to_rekey.append(state) continue - plan.collisions.append( - { - "shadow": state, - "canonical": _dict_row(collision_row), - } + shadow_active = state["status"] == "active" + canonical_active = canonical_state["status"] == "active" + + if shadow_active and canonical_active: + # Two live values disagree — apply refuses until + # the human picks a winner. + plan.collisions.append( + {"shadow": state, "canonical": canonical_state} + ) + continue + + if shadow_active and not canonical_active: + # Shadow has the live value; canonical is stale. + # Shadow wins by deleting the canonical inactive row + # and rekeying the shadow into its place. + state["_action"] = "replace_inactive_canonical" + state["_canonical_to_delete"] = canonical_state + plan.rows_to_rekey.append(state) + continue + + # Shadow is inactive (superseded or invalid) and collides + # with a canonical row at the same triple. The canonical + # wins by definition of the UPSERT schema. Record this + # as a historical drop so the operator sees it in the + # plan output BEFORE running --apply. + plan.historical_drops.append( + {"shadow": state, "canonical": canonical_state} ) plans.append(plan) @@ -468,6 +525,8 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: "shadow_projects_deleted": 0, "state_rows_rekeyed": 0, "state_rows_merged_as_duplicate": 0, + "state_rows_replaced_inactive_canonical": 0, + "state_rows_historical_dropped": 0, "memory_rows_rekeyed": 0, "memory_rows_superseded": 0, "interaction_rows_rekeyed": 0, @@ -495,10 +554,16 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: summary["canonical_rows_created"] += 1 for state in sp_plan.rows_to_rekey: - if "_duplicate_of" in state: - # Same (category, key, value) already on canonical; - # mark the shadow state row as superseded rather - # than hitting the UNIQUE constraint. + action = state.get("_action") + + if action == "supersede_shadow_duplicate": + # Same (category, key, value) already on canonical. + # Mark the shadow state row superseded in place; + # the content is preserved on the canonical row. + # The shadow row will cascade-delete when its + # parent projects row is dropped at the end of + # this loop, which is fine because its value is + # preserved on the canonical side. conn.execute( "UPDATE project_state SET status = 'superseded', " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", @@ -506,6 +571,30 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: ) summary["state_rows_merged_as_duplicate"] += 1 continue + + if action == "replace_inactive_canonical": + # Shadow is active, canonical is stale (superseded + # or invalid). Delete the canonical inactive row + # and rekey the shadow in its place. SQLite's + # default immediate constraint checking means the + # DELETE must complete before the UPDATE for the + # UNIQUE(project_id, category, key) constraint to + # be happy. + canonical_to_delete = state["_canonical_to_delete"] + conn.execute( + "DELETE FROM project_state WHERE id = ?", + (canonical_to_delete["id"],), + ) + conn.execute( + "UPDATE project_state SET project_id = ?, " + "updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (canonical_row_id, state["id"]), + ) + summary["state_rows_rekeyed"] += 1 + summary["state_rows_replaced_inactive_canonical"] += 1 + continue + + # Clean rekey (no collision at the triple) conn.execute( "UPDATE project_state SET project_id = ?, " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", @@ -513,10 +602,19 @@ def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: ) summary["state_rows_rekeyed"] += 1 + # Historical drops are shadow rows that collide with a + # canonical row at the same triple where the shadow is + # inactive. No DB operation is needed — they'll be + # cascade-deleted when the shadow project row is + # dropped below. But we count them so the operator + # sees the audit loss in the summary. + summary["state_rows_historical_dropped"] += len(sp_plan.historical_drops) + # Delete shadow projects rows after their state dependents - # have moved. We can do this unconditionally because project_state - # has ON DELETE CASCADE on project_id, and all our dependents - # have already been rekeyed or superseded above. + # have moved. At this point, the only state rows still + # attached to any shadow project are the historical_drops + # entries, which are being deliberately discarded because + # the canonical row at the same triple already wins. for sp_plan in plan.state_plans: conn.execute( "DELETE FROM projects WHERE id = ?", (sp_plan.shadow_row_id,) @@ -603,12 +701,13 @@ def render_plan_text(plan: MigrationPlan) -> str: lines.append("") counts = plan.counts() lines.append("Counts:") - lines.append(f" shadow projects : {counts['shadow_projects']}") - lines.append(f" state rekey rows : {counts['state_rekey_rows']}") - lines.append(f" state collisions : {counts['state_collisions']}") - lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}") - lines.append(f" memory supersede : {counts['memory_supersede_rows']}") - lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}") + lines.append(f" shadow projects : {counts['shadow_projects']}") + lines.append(f" state rekey rows : {counts['state_rekey_rows']}") + lines.append(f" state collisions (block) : {counts['state_collisions']}") + lines.append(f" state historical drops : {counts['state_historical_drops']}") + lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}") + lines.append(f" memory supersede : {counts['memory_supersede_rows']}") + lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}") if plan.is_empty: lines.append("") @@ -643,6 +742,38 @@ def render_plan_text(plan: MigrationPlan) -> str: ) return "\n".join(lines) + # Historical drops are explicit, non-blocking audit-loss events. + # They're surfaced so the operator sees them BEFORE running --apply + # and can decide to manually preserve the shadow values if any of + # them are worth keeping as separate records. + historical_drops = [ + drop + for sp_plan in plan.state_plans + for drop in sp_plan.historical_drops + ] + if historical_drops: + lines.append("") + lines.append( + f"*** {len(historical_drops)} HISTORICAL DROPS — shadow inactive " + "state rows that collide with a canonical row: ***" + ) + for drop in historical_drops: + shadow = drop["shadow"] + canonical = drop["canonical"] + lines.append( + f" - [{shadow['category']}/{shadow['key']}] " + f"shadow({shadow['status']})={shadow['value']!r} -> " + f"dropped in favor of canonical({canonical['status']})={canonical['value']!r}" + ) + lines.append("") + lines.append( + "These rows are superseded/invalid on the shadow side. The " + "UNIQUE(project_id, category, key) constraint prevents " + "preserving them alongside the canonical row. If any of these " + "values are worth keeping as separate audit records, manually " + "copy them out before running --apply." + ) + lines.append("") lines.append("Apply plan (if --apply is given):") for sp_plan in plan.state_plans: @@ -687,6 +818,7 @@ def plan_to_json_dict(plan: MigrationPlan) -> dict: "canonical_project_id": sp.canonical_project_id, "rows_to_rekey": sp.rows_to_rekey, "collisions": sp.collisions, + "historical_drops": sp.historical_drops, } for sp in plan.state_plans ], diff --git a/tests/test_migrate_legacy_aliases.py b/tests/test_migrate_legacy_aliases.py index fbfde95..18af25c 100644 --- a/tests/test_migrate_legacy_aliases.py +++ b/tests/test_migrate_legacy_aliases.py @@ -73,13 +73,14 @@ def _seed_state_row( category: str, key: str, value: str, + status: str = "active", ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO project_state " - "(id, project_id, category, key, value, source, confidence) " - "VALUES (?, ?, ?, ?, ?, ?, ?)", - (row_id, project_id, category, key, value, "legacy-test", 1.0), + "(id, project_id, category, key, value, source, confidence, status) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (row_id, project_id, category, key, value, "legacy-test", 1.0, status), ) conn.commit() return row_id @@ -147,6 +148,7 @@ def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir): "shadow_projects": 0, "state_rekey_rows": 0, "state_collisions": 0, + "state_historical_drops": 0, "memory_rekey_rows": 0, "memory_supersede_rows": 0, "interaction_rekey_rows": 0, @@ -368,6 +370,191 @@ def test_apply_drops_shadow_state_duplicate_without_collision(project_registry): assert via_canonical[0].value == "Wave 1 ingestion" +def test_apply_preserves_superseded_shadow_state_when_no_collision(project_registry): + """Regression test for the codex-flagged data-loss bug. + + Before the fix, plan_state_migration only selected status='active' + rows. Any superseded or invalid row on the shadow project was + invisible to the plan and got silently cascade-deleted when the + shadow projects row was dropped at the end of apply. That's + exactly the kind of audit loss a cleanup migration must not cause. + + This test seeds a shadow project with a superseded state row on + a triple the canonical project doesn't have, runs the migration, + and verifies the row survived and is now attached to the + canonical project (still with status='superseded'). + """ + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + # Superseded row on a triple the canonical won't have + _seed_state_row( + conn, + shadow_id, + "status", + "historical_phase", + "Phase 0 legacy", + status="superseded", + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + # The superseded row should have been rekeyed, not dropped + assert summary["state_rows_rekeyed"] == 1 + assert summary["state_rows_historical_dropped"] == 0 + + # Verify via raw SQL that the row is now attached to the canonical + # projects row and still has status='superseded' + conn = _open_db_connection() + try: + row = conn.execute( + "SELECT ps.status, ps.value, p.name " + "FROM project_state ps JOIN projects p ON ps.project_id = p.id " + "WHERE ps.category = ? AND ps.key = ?", + ("status", "historical_phase"), + ).fetchone() + finally: + conn.close() + + assert row is not None, "superseded shadow row was lost during migration" + assert row["status"] == "superseded" + assert row["value"] == "Phase 0 legacy" + assert row["name"] == "p05-interferometer" + + +def test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple(project_registry): + """Shadow is inactive (superseded) and collides with an active canonical row. + + The canonical wins by definition of the UPSERT schema. The shadow + row is recorded as a historical_drop in the plan so the operator + sees the audit loss, and the apply cascade-deletes it via the + shadow projects row. This is the unavoidable data-loss case + documented in the migration module docstring. + """ + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + canonical_id = _seed_shadow_project(conn, "p05-interferometer") + + # Shadow has a superseded value on a triple where the canonical + # has a different active value. Can't preserve both: UNIQUE + # allows only one row per triple. + _seed_state_row( + conn, + shadow_id, + "status", + "next_focus", + "Old wave 1", + status="superseded", + ) + _seed_state_row( + conn, + canonical_id, + "status", + "next_focus", + "Wave 2 trusted-operational", + status="active", + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions # not an active-vs-active collision + assert plan.counts()["state_historical_drops"] == 1 + + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["state_rows_historical_dropped"] == 1 + + # The canonical's active row survives unchanged + via_canonical = get_state("p05-interferometer") + active_next_focus = [ + e + for e in via_canonical + if e.category == "status" and e.key == "next_focus" + ] + assert len(active_next_focus) == 1 + assert active_next_focus[0].value == "Wave 2 trusted-operational" + + +def test_apply_replaces_inactive_canonical_with_active_shadow(project_registry): + """Shadow is active, canonical has an inactive row at the same triple. + + The shadow wins: canonical inactive row is deleted, shadow is + rekeyed into canonical's project_id. This covers the + cross-contamination case where the old alias path was used for + the live value while the canonical path had a stale row. + """ + registry_path = project_registry( + ("p06-polisher", ["p06", "polisher"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p06") + canonical_id = _seed_shadow_project(conn, "p06-polisher") + + # Canonical has a stale invalid row; shadow has the live value. + _seed_state_row( + conn, + canonical_id, + "decision", + "frame", + "Old frame (no longer current)", + status="invalid", + ) + _seed_state_row( + conn, + shadow_id, + "decision", + "frame", + "kinematic mount frame", + status="active", + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions + assert plan.counts()["state_historical_drops"] == 0 + + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["state_rows_replaced_inactive_canonical"] == 1 + + # The active shadow value now lives on the canonical row + via_canonical = get_state("p06-polisher") + frame_entries = [ + e for e in via_canonical if e.category == "decision" and e.key == "frame" + ] + assert len(frame_entries) == 1 + assert frame_entries[0].value == "kinematic mount frame" + + # Confirm via raw SQL that the previously-inactive canonical row + # no longer exists + conn = _open_db_connection() + try: + stale = conn.execute( + "SELECT COUNT(*) AS c FROM project_state WHERE value = ?", + ("Old frame (no longer current)",), + ).fetchone() + finally: + conn.close() + assert stale["c"] == 0 + + def test_apply_migrates_memories(project_registry): registry_path = project_registry( ("p04-gigabit", ["p04", "gigabit"])