"""Tests for scripts/migrate_legacy_aliases.py. The migration script closes the compatibility gap documented in docs/architecture/project-identity-canonicalization.md. These tests cover: - empty/clean database behavior - shadow projects detection - state rekey without collisions - state collision detection + apply refusal - memory rekey + supersession of duplicates - interaction rekey - end-to-end apply on a realistic shadow - idempotency (running twice produces the same final state) - report artifact is written - the pre-fix regression gap is actually closed after migration """ from __future__ import annotations import json import sqlite3 import sys import uuid from pathlib import Path import pytest from atocore.context.project_state import ( get_state, init_project_state_schema, ) from atocore.models.database import init_db # Make scripts/ importable _REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(_REPO_ROOT / "scripts")) import migrate_legacy_aliases as mig # noqa: E402 # --------------------------------------------------------------------------- # Helpers that seed "legacy" rows the way they would have looked before fb6298a # --------------------------------------------------------------------------- def _open_db_connection(): """Open a direct SQLite connection to the test data dir's DB.""" import atocore.config as config conn = sqlite3.connect(str(config.settings.db_path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def _seed_shadow_project( conn: sqlite3.Connection, shadow_name: str ) -> str: """Insert a projects row keyed under an alias, like the old set_state would have.""" project_id = str(uuid.uuid4()) conn.execute( "INSERT INTO projects (id, name, description) VALUES (?, ?, ?)", (project_id, shadow_name, f"shadow row for {shadow_name}"), ) conn.commit() return project_id def _seed_state_row( conn: sqlite3.Connection, project_id: str, category: str, key: str, value: str, status: str = "active", ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO project_state " "(id, project_id, category, key, value, source, confidence, status) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (row_id, project_id, category, key, value, "legacy-test", 1.0, status), ) conn.commit() return row_id def _seed_memory_row( conn: sqlite3.Connection, memory_type: str, content: str, project: str, status: str = "active", ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO memories " "(id, memory_type, content, project, source_chunk_id, confidence, status) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (row_id, memory_type, content, project, None, 1.0, status), ) conn.commit() return row_id def _seed_interaction_row( conn: sqlite3.Connection, prompt: str, project: str ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO interactions " "(id, prompt, context_pack, response_summary, response, " " memories_used, chunks_used, client, session_id, project, created_at) " "VALUES (?, ?, '{}', '', '', '[]', '[]', 'legacy-test', '', ?, '2026-04-01 12:00:00')", (row_id, prompt, project), ) conn.commit() return row_id # --------------------------------------------------------------------------- # plan-building tests # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def _setup(tmp_data_dir): init_db() init_project_state_schema() def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir): """Empty registry -> empty alias map -> empty plan.""" registry_path = tmp_data_dir / "empty-registry.json" registry_path.write_text('{"projects": []}', encoding="utf-8") conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() assert plan.alias_map == {} assert plan.is_empty assert not plan.has_collisions assert plan.counts() == { "shadow_projects": 0, "state_rekey_rows": 0, "state_collisions": 0, "state_historical_drops": 0, "memory_rekey_rows": 0, "memory_supersede_rows": 0, "interaction_rekey_rows": 0, } def test_dry_run_on_clean_registered_db_reports_empty_plan(project_registry): """A registry with projects but no legacy rows -> empty plan.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() assert plan.alias_map != {} assert plan.is_empty def test_dry_run_finds_shadow_project(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: _seed_shadow_project(conn, "p05") plan = mig.build_plan(conn, registry_path) finally: conn.close() assert len(plan.shadow_projects) == 1 assert plan.shadow_projects[0].shadow_name == "p05" assert plan.shadow_projects[0].canonical_project_id == "p05-interferometer" def test_dry_run_plans_state_rekey_without_collisions(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1 ingestion") _seed_state_row(conn, shadow_id, "decision", "lateral_support", "GF-PTFE") plan = mig.build_plan(conn, registry_path) finally: conn.close() assert len(plan.state_plans) == 1 sp = plan.state_plans[0] assert len(sp.rows_to_rekey) == 2 assert sp.collisions == [] assert not plan.has_collisions def test_dry_run_detects_state_collision(project_registry): """Shadow and canonical both have state under the same (category, key) with different values.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") _seed_state_row( conn, canonical_id, "status", "next_focus", "Wave 2" ) plan = mig.build_plan(conn, registry_path) finally: conn.close() assert plan.has_collisions collision = plan.state_plans[0].collisions[0] assert collision["shadow"]["value"] == "Wave 1" assert collision["canonical"]["value"] == "Wave 2" def test_dry_run_plans_memory_rekey_and_supersession(project_registry): registry_path = project_registry( ("p04-gigabit", ["p04", "gigabit"]) ) conn = _open_db_connection() try: # A clean memory under the alias that will just be rekeyed _seed_memory_row(conn, "project", "clean rekey memory", "p04") # A memory that collides with an existing canonical memory _seed_memory_row(conn, "project", "duplicate content", "p04") _seed_memory_row(conn, "project", "duplicate content", "p04-gigabit") plan = mig.build_plan(conn, registry_path) finally: conn.close() # There's exactly one memory plan (one alias matched) assert len(plan.memory_plans) == 1 mp = plan.memory_plans[0] # Two rows are candidates for rekey or supersession — one clean, # one duplicate. The duplicate is handled via to_supersede; the # other via rows_to_rekey. total_affected = len(mp.rows_to_rekey) + len(mp.to_supersede) assert total_affected == 2 def test_dry_run_plans_interaction_rekey(project_registry): registry_path = project_registry( ("p06-polisher", ["p06", "polisher"]) ) conn = _open_db_connection() try: _seed_interaction_row(conn, "quick capture under alias", "polisher") _seed_interaction_row(conn, "another alias-keyed row", "p06") plan = mig.build_plan(conn, registry_path) finally: conn.close() total = sum(len(p.rows_to_rekey) for p in plan.interaction_plans) assert total == 2 # --------------------------------------------------------------------------- # apply tests # --------------------------------------------------------------------------- def test_apply_refuses_on_state_collision(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") _seed_state_row(conn, canonical_id, "status", "next_focus", "Wave 2") plan = mig.build_plan(conn, registry_path) assert plan.has_collisions with pytest.raises(mig.MigrationRefused): mig.apply_plan(conn, plan) finally: conn.close() def test_apply_migrates_clean_shadow_end_to_end(project_registry): """The happy path: one shadow project with clean state rows, rekey into a freshly-created canonical row, verify reachability via get_state.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row( conn, shadow_id, "status", "next_focus", "Wave 1 ingestion" ) _seed_state_row( conn, shadow_id, "decision", "lateral_support", "GF-PTFE" ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["state_rows_rekeyed"] == 2 assert summary["shadow_projects_deleted"] == 1 assert summary["canonical_rows_created"] == 1 # The regression gap is now closed: the service layer can see # the state under the canonical id via either the alias OR the # canonical. via_alias = get_state("p05") via_canonical = get_state("p05-interferometer") assert len(via_alias) == 2 assert len(via_canonical) == 2 values = {entry.value for entry in via_canonical} assert values == {"Wave 1 ingestion", "GF-PTFE"} def test_apply_drops_shadow_state_duplicate_without_collision(project_registry): """Shadow and canonical both have the same (category, key, value) — shadow gets marked superseded rather than hitting the UNIQUE constraint.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row( conn, shadow_id, "status", "next_focus", "Wave 1 ingestion" ) _seed_state_row( conn, canonical_id, "status", "next_focus", "Wave 1 ingestion" ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["state_rows_merged_as_duplicate"] == 1 via_canonical = get_state("p05-interferometer") # Exactly one active row survives assert len(via_canonical) == 1 assert via_canonical[0].value == "Wave 1 ingestion" def test_apply_preserves_superseded_shadow_state_when_no_collision(project_registry): """Regression test for the codex-flagged data-loss bug. Before the fix, plan_state_migration only selected status='active' rows. Any superseded or invalid row on the shadow project was invisible to the plan and got silently cascade-deleted when the shadow projects row was dropped at the end of apply. That's exactly the kind of audit loss a cleanup migration must not cause. This test seeds a shadow project with a superseded state row on a triple the canonical project doesn't have, runs the migration, and verifies the row survived and is now attached to the canonical project (still with status='superseded'). """ registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") # Superseded row on a triple the canonical won't have _seed_state_row( conn, shadow_id, "status", "historical_phase", "Phase 0 legacy", status="superseded", ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions summary = mig.apply_plan(conn, plan) finally: conn.close() # The superseded row should have been rekeyed, not dropped assert summary["state_rows_rekeyed"] == 1 assert summary["state_rows_historical_dropped"] == 0 # Verify via raw SQL that the row is now attached to the canonical # projects row and still has status='superseded' conn = _open_db_connection() try: row = conn.execute( "SELECT ps.status, ps.value, p.name " "FROM project_state ps JOIN projects p ON ps.project_id = p.id " "WHERE ps.category = ? AND ps.key = ?", ("status", "historical_phase"), ).fetchone() finally: conn.close() assert row is not None, "superseded shadow row was lost during migration" assert row["status"] == "superseded" assert row["value"] == "Phase 0 legacy" assert row["name"] == "p05-interferometer" def test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple(project_registry): """Shadow is inactive (superseded) and collides with an active canonical row. The canonical wins by definition of the UPSERT schema. The shadow row is recorded as a historical_drop in the plan so the operator sees the audit loss, and the apply cascade-deletes it via the shadow projects row. This is the unavoidable data-loss case documented in the migration module docstring. """ registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") # Shadow has a superseded value on a triple where the canonical # has a different active value. Can't preserve both: UNIQUE # allows only one row per triple. _seed_state_row( conn, shadow_id, "status", "next_focus", "Old wave 1", status="superseded", ) _seed_state_row( conn, canonical_id, "status", "next_focus", "Wave 2 trusted-operational", status="active", ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions # not an active-vs-active collision assert plan.counts()["state_historical_drops"] == 1 summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["state_rows_historical_dropped"] == 1 # The canonical's active row survives unchanged via_canonical = get_state("p05-interferometer") active_next_focus = [ e for e in via_canonical if e.category == "status" and e.key == "next_focus" ] assert len(active_next_focus) == 1 assert active_next_focus[0].value == "Wave 2 trusted-operational" def test_apply_replaces_inactive_canonical_with_active_shadow(project_registry): """Shadow is active, canonical has an inactive row at the same triple. The shadow wins: canonical inactive row is deleted, shadow is rekeyed into canonical's project_id. This covers the cross-contamination case where the old alias path was used for the live value while the canonical path had a stale row. """ registry_path = project_registry( ("p06-polisher", ["p06", "polisher"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p06") canonical_id = _seed_shadow_project(conn, "p06-polisher") # Canonical has a stale invalid row; shadow has the live value. _seed_state_row( conn, canonical_id, "decision", "frame", "Old frame (no longer current)", status="invalid", ) _seed_state_row( conn, shadow_id, "decision", "frame", "kinematic mount frame", status="active", ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions assert plan.counts()["state_historical_drops"] == 0 summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["state_rows_replaced_inactive_canonical"] == 1 # The active shadow value now lives on the canonical row via_canonical = get_state("p06-polisher") frame_entries = [ e for e in via_canonical if e.category == "decision" and e.key == "frame" ] assert len(frame_entries) == 1 assert frame_entries[0].value == "kinematic mount frame" # Confirm via raw SQL that the previously-inactive canonical row # no longer exists conn = _open_db_connection() try: stale = conn.execute( "SELECT COUNT(*) AS c FROM project_state WHERE value = ?", ("Old frame (no longer current)",), ).fetchone() finally: conn.close() assert stale["c"] == 0 def test_apply_migrates_memories(project_registry): registry_path = project_registry( ("p04-gigabit", ["p04", "gigabit"]) ) conn = _open_db_connection() try: _seed_memory_row(conn, "project", "lateral support uses GF-PTFE", "p04") _seed_memory_row(conn, "preference", "I prefer descriptive commits", "gigabit") plan = mig.build_plan(conn, registry_path) summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["memory_rows_rekeyed"] == 2 # Both memories should now read as living under the canonical id from atocore.memory.service import get_memories rows = get_memories(project="p04-gigabit", limit=50) contents = {m.content for m in rows} assert "lateral support uses GF-PTFE" in contents assert "I prefer descriptive commits" in contents def test_apply_migrates_interactions(project_registry): registry_path = project_registry( ("p06-polisher", ["p06", "polisher"]) ) conn = _open_db_connection() try: _seed_interaction_row(conn, "alias-keyed 1", "polisher") _seed_interaction_row(conn, "alias-keyed 2", "p06") plan = mig.build_plan(conn, registry_path) summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["interaction_rows_rekeyed"] == 2 from atocore.interactions.service import list_interactions rows = list_interactions(project="p06-polisher", limit=50) prompts = {i.prompt for i in rows} assert prompts == {"alias-keyed 1", "alias-keyed 2"} def test_apply_is_idempotent(project_registry): """Running apply twice produces the same final state as running it once.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") _seed_memory_row(conn, "project", "m1", "p05") _seed_interaction_row(conn, "i1", "p05") # first apply plan_a = mig.build_plan(conn, registry_path) summary_a = mig.apply_plan(conn, plan_a) # second apply: plan should be empty plan_b = mig.build_plan(conn, registry_path) assert plan_b.is_empty # forcing a second apply on the empty plan via the function # directly should also succeed as a no-op (caller normally # has to pass --allow-empty through the CLI, but apply_plan # itself doesn't enforce that — the refusal is in run()) summary_b = mig.apply_plan(conn, plan_b) finally: conn.close() assert summary_a["state_rows_rekeyed"] == 1 assert summary_a["memory_rows_rekeyed"] == 1 assert summary_a["interaction_rows_rekeyed"] == 1 assert summary_b["state_rows_rekeyed"] == 0 assert summary_b["memory_rows_rekeyed"] == 0 assert summary_b["interaction_rows_rekeyed"] == 0 def test_apply_refuses_with_integrity_errors(project_registry): """If the projects table has two case-variant rows for the canonical id, refuse. The projects.name column has a case-sensitive UNIQUE constraint, so exact duplicates can't exist. But case-variant rows ``p05-interferometer`` and ``P05-Interferometer`` can both survive the UNIQUE constraint while both matching the case-insensitive ``lower(name) = lower(?)`` lookup that the migration uses to find the canonical row. That ambiguity (which canonical row should dependents rekey into?) is exactly the integrity failure the migration is guarding against. """ registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: _seed_shadow_project(conn, "p05-interferometer") _seed_shadow_project(conn, "P05-Interferometer") plan = mig.build_plan(conn, registry_path) assert plan.integrity_errors with pytest.raises(mig.MigrationRefused): mig.apply_plan(conn, plan) finally: conn.close() # --------------------------------------------------------------------------- # reporting tests # --------------------------------------------------------------------------- def test_plan_to_json_dict_is_serializable(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") plan = mig.build_plan(conn, registry_path) finally: conn.close() payload = mig.plan_to_json_dict(plan) # Must be JSON-serializable json_str = json.dumps(payload, default=str) assert "p05-interferometer" in json_str assert payload["counts"]["state_rekey_rows"] == 1 def test_write_report_creates_file(tmp_path, project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() report_dir = tmp_path / "reports" report_path = mig.write_report( plan, summary=None, db_path=Path("/tmp/fake.db"), registry_path=registry_path, mode="dry-run", report_dir=report_dir, ) assert report_path.exists() payload = json.loads(report_path.read_text(encoding="utf-8")) assert payload["mode"] == "dry-run" assert "plan" in payload def test_render_plan_text_on_empty_plan(project_registry): registry_path = project_registry() # empty conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() text = mig.render_plan_text(plan) assert "nothing to plan" in text.lower() def test_render_plan_text_on_collision(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row(conn, shadow_id, "status", "phase", "A") _seed_state_row(conn, canonical_id, "status", "phase", "B") plan = mig.build_plan(conn, registry_path) finally: conn.close() text = mig.render_plan_text(plan) assert "COLLISION" in text.upper() assert "REFUSE" in text.upper() or "refuse" in text.lower() # --------------------------------------------------------------------------- # gap-closed companion test — the flip side of # test_legacy_alias_keyed_state_is_invisible_until_migrated in # test_project_state.py. After running this migration, the legacy row # IS reachable via the canonical id. # --------------------------------------------------------------------------- def test_legacy_alias_gap_is_closed_after_migration(project_registry): """End-to-end regression test for the canonicalization gap. Simulates the exact scenario from test_legacy_alias_keyed_state_is_invisible_until_migrated in test_project_state.py — a shadow projects row with a state row pointing at it. Runs the migration. Verifies the state is now reachable via the canonical id. """ registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row( conn, shadow_id, "status", "legacy_focus", "Wave 1 ingestion" ) # Before migration: the legacy row is invisible to get_state # (this is the documented gap, covered in test_project_state.py) assert all( entry.value != "Wave 1 ingestion" for entry in get_state("p05") ) assert all( entry.value != "Wave 1 ingestion" for entry in get_state("p05-interferometer") ) # Run the migration plan = mig.build_plan(conn, registry_path) mig.apply_plan(conn, plan) finally: conn.close() # After migration: the row is reachable via canonical AND alias via_canonical = get_state("p05-interferometer") via_alias = get_state("p05") assert any(e.value == "Wave 1 ingestion" for e in via_canonical) assert any(e.value == "Wave 1 ingestion" for e in via_alias)