"""Tests for scripts/migrate_legacy_aliases.py. The migration script closes the compatibility gap documented in docs/architecture/project-identity-canonicalization.md. These tests cover: - empty/clean database behavior - shadow projects detection - state rekey without collisions - state collision detection + apply refusal - memory rekey + supersession of duplicates - interaction rekey - end-to-end apply on a realistic shadow - idempotency (running twice produces the same final state) - report artifact is written - the pre-fix regression gap is actually closed after migration """ from __future__ import annotations import json import sqlite3 import sys import uuid from pathlib import Path import pytest from atocore.context.project_state import ( get_state, init_project_state_schema, ) from atocore.models.database import init_db # Make scripts/ importable _REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(_REPO_ROOT / "scripts")) import migrate_legacy_aliases as mig # noqa: E402 # --------------------------------------------------------------------------- # Helpers that seed "legacy" rows the way they would have looked before fb6298a # --------------------------------------------------------------------------- def _open_db_connection(): """Open a direct SQLite connection to the test data dir's DB.""" import atocore.config as config conn = sqlite3.connect(str(config.settings.db_path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def _seed_shadow_project( conn: sqlite3.Connection, shadow_name: str ) -> str: """Insert a projects row keyed under an alias, like the old set_state would have.""" project_id = str(uuid.uuid4()) conn.execute( "INSERT INTO projects (id, name, description) VALUES (?, ?, ?)", (project_id, shadow_name, f"shadow row for {shadow_name}"), ) conn.commit() return project_id def _seed_state_row( conn: sqlite3.Connection, project_id: str, category: str, key: str, value: str, ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO project_state " "(id, project_id, category, key, value, source, confidence) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (row_id, project_id, category, key, value, "legacy-test", 1.0), ) conn.commit() return row_id def _seed_memory_row( conn: sqlite3.Connection, memory_type: str, content: str, project: str, status: str = "active", ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO memories " "(id, memory_type, content, project, source_chunk_id, confidence, status) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (row_id, memory_type, content, project, None, 1.0, status), ) conn.commit() return row_id def _seed_interaction_row( conn: sqlite3.Connection, prompt: str, project: str ) -> str: row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO interactions " "(id, prompt, context_pack, response_summary, response, " " memories_used, chunks_used, client, session_id, project, created_at) " "VALUES (?, ?, '{}', '', '', '[]', '[]', 'legacy-test', '', ?, '2026-04-01 12:00:00')", (row_id, prompt, project), ) conn.commit() return row_id # --------------------------------------------------------------------------- # plan-building tests # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def _setup(tmp_data_dir): init_db() init_project_state_schema() def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir): """Empty registry -> empty alias map -> empty plan.""" registry_path = tmp_data_dir / "empty-registry.json" registry_path.write_text('{"projects": []}', encoding="utf-8") conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() assert plan.alias_map == {} assert plan.is_empty assert not plan.has_collisions assert plan.counts() == { "shadow_projects": 0, "state_rekey_rows": 0, "state_collisions": 0, "memory_rekey_rows": 0, "memory_supersede_rows": 0, "interaction_rekey_rows": 0, } def test_dry_run_on_clean_registered_db_reports_empty_plan(project_registry): """A registry with projects but no legacy rows -> empty plan.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() assert plan.alias_map != {} assert plan.is_empty def test_dry_run_finds_shadow_project(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: _seed_shadow_project(conn, "p05") plan = mig.build_plan(conn, registry_path) finally: conn.close() assert len(plan.shadow_projects) == 1 assert plan.shadow_projects[0].shadow_name == "p05" assert plan.shadow_projects[0].canonical_project_id == "p05-interferometer" def test_dry_run_plans_state_rekey_without_collisions(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1 ingestion") _seed_state_row(conn, shadow_id, "decision", "lateral_support", "GF-PTFE") plan = mig.build_plan(conn, registry_path) finally: conn.close() assert len(plan.state_plans) == 1 sp = plan.state_plans[0] assert len(sp.rows_to_rekey) == 2 assert sp.collisions == [] assert not plan.has_collisions def test_dry_run_detects_state_collision(project_registry): """Shadow and canonical both have state under the same (category, key) with different values.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") _seed_state_row( conn, canonical_id, "status", "next_focus", "Wave 2" ) plan = mig.build_plan(conn, registry_path) finally: conn.close() assert plan.has_collisions collision = plan.state_plans[0].collisions[0] assert collision["shadow"]["value"] == "Wave 1" assert collision["canonical"]["value"] == "Wave 2" def test_dry_run_plans_memory_rekey_and_supersession(project_registry): registry_path = project_registry( ("p04-gigabit", ["p04", "gigabit"]) ) conn = _open_db_connection() try: # A clean memory under the alias that will just be rekeyed _seed_memory_row(conn, "project", "clean rekey memory", "p04") # A memory that collides with an existing canonical memory _seed_memory_row(conn, "project", "duplicate content", "p04") _seed_memory_row(conn, "project", "duplicate content", "p04-gigabit") plan = mig.build_plan(conn, registry_path) finally: conn.close() # There's exactly one memory plan (one alias matched) assert len(plan.memory_plans) == 1 mp = plan.memory_plans[0] # Two rows are candidates for rekey or supersession — one clean, # one duplicate. The duplicate is handled via to_supersede; the # other via rows_to_rekey. total_affected = len(mp.rows_to_rekey) + len(mp.to_supersede) assert total_affected == 2 def test_dry_run_plans_interaction_rekey(project_registry): registry_path = project_registry( ("p06-polisher", ["p06", "polisher"]) ) conn = _open_db_connection() try: _seed_interaction_row(conn, "quick capture under alias", "polisher") _seed_interaction_row(conn, "another alias-keyed row", "p06") plan = mig.build_plan(conn, registry_path) finally: conn.close() total = sum(len(p.rows_to_rekey) for p in plan.interaction_plans) assert total == 2 # --------------------------------------------------------------------------- # apply tests # --------------------------------------------------------------------------- def test_apply_refuses_on_state_collision(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") _seed_state_row(conn, canonical_id, "status", "next_focus", "Wave 2") plan = mig.build_plan(conn, registry_path) assert plan.has_collisions with pytest.raises(mig.MigrationRefused): mig.apply_plan(conn, plan) finally: conn.close() def test_apply_migrates_clean_shadow_end_to_end(project_registry): """The happy path: one shadow project with clean state rows, rekey into a freshly-created canonical row, verify reachability via get_state.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row( conn, shadow_id, "status", "next_focus", "Wave 1 ingestion" ) _seed_state_row( conn, shadow_id, "decision", "lateral_support", "GF-PTFE" ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["state_rows_rekeyed"] == 2 assert summary["shadow_projects_deleted"] == 1 assert summary["canonical_rows_created"] == 1 # The regression gap is now closed: the service layer can see # the state under the canonical id via either the alias OR the # canonical. via_alias = get_state("p05") via_canonical = get_state("p05-interferometer") assert len(via_alias) == 2 assert len(via_canonical) == 2 values = {entry.value for entry in via_canonical} assert values == {"Wave 1 ingestion", "GF-PTFE"} def test_apply_drops_shadow_state_duplicate_without_collision(project_registry): """Shadow and canonical both have the same (category, key, value) — shadow gets marked superseded rather than hitting the UNIQUE constraint.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row( conn, shadow_id, "status", "next_focus", "Wave 1 ingestion" ) _seed_state_row( conn, canonical_id, "status", "next_focus", "Wave 1 ingestion" ) plan = mig.build_plan(conn, registry_path) assert not plan.has_collisions summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["state_rows_merged_as_duplicate"] == 1 via_canonical = get_state("p05-interferometer") # Exactly one active row survives assert len(via_canonical) == 1 assert via_canonical[0].value == "Wave 1 ingestion" def test_apply_migrates_memories(project_registry): registry_path = project_registry( ("p04-gigabit", ["p04", "gigabit"]) ) conn = _open_db_connection() try: _seed_memory_row(conn, "project", "lateral support uses GF-PTFE", "p04") _seed_memory_row(conn, "preference", "I prefer descriptive commits", "gigabit") plan = mig.build_plan(conn, registry_path) summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["memory_rows_rekeyed"] == 2 # Both memories should now read as living under the canonical id from atocore.memory.service import get_memories rows = get_memories(project="p04-gigabit", limit=50) contents = {m.content for m in rows} assert "lateral support uses GF-PTFE" in contents assert "I prefer descriptive commits" in contents def test_apply_migrates_interactions(project_registry): registry_path = project_registry( ("p06-polisher", ["p06", "polisher"]) ) conn = _open_db_connection() try: _seed_interaction_row(conn, "alias-keyed 1", "polisher") _seed_interaction_row(conn, "alias-keyed 2", "p06") plan = mig.build_plan(conn, registry_path) summary = mig.apply_plan(conn, plan) finally: conn.close() assert summary["interaction_rows_rekeyed"] == 2 from atocore.interactions.service import list_interactions rows = list_interactions(project="p06-polisher", limit=50) prompts = {i.prompt for i in rows} assert prompts == {"alias-keyed 1", "alias-keyed 2"} def test_apply_is_idempotent(project_registry): """Running apply twice produces the same final state as running it once.""" registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") _seed_memory_row(conn, "project", "m1", "p05") _seed_interaction_row(conn, "i1", "p05") # first apply plan_a = mig.build_plan(conn, registry_path) summary_a = mig.apply_plan(conn, plan_a) # second apply: plan should be empty plan_b = mig.build_plan(conn, registry_path) assert plan_b.is_empty # forcing a second apply on the empty plan via the function # directly should also succeed as a no-op (caller normally # has to pass --allow-empty through the CLI, but apply_plan # itself doesn't enforce that — the refusal is in run()) summary_b = mig.apply_plan(conn, plan_b) finally: conn.close() assert summary_a["state_rows_rekeyed"] == 1 assert summary_a["memory_rows_rekeyed"] == 1 assert summary_a["interaction_rows_rekeyed"] == 1 assert summary_b["state_rows_rekeyed"] == 0 assert summary_b["memory_rows_rekeyed"] == 0 assert summary_b["interaction_rows_rekeyed"] == 0 def test_apply_refuses_with_integrity_errors(project_registry): """If the projects table has two case-variant rows for the canonical id, refuse. The projects.name column has a case-sensitive UNIQUE constraint, so exact duplicates can't exist. But case-variant rows ``p05-interferometer`` and ``P05-Interferometer`` can both survive the UNIQUE constraint while both matching the case-insensitive ``lower(name) = lower(?)`` lookup that the migration uses to find the canonical row. That ambiguity (which canonical row should dependents rekey into?) is exactly the integrity failure the migration is guarding against. """ registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: _seed_shadow_project(conn, "p05-interferometer") _seed_shadow_project(conn, "P05-Interferometer") plan = mig.build_plan(conn, registry_path) assert plan.integrity_errors with pytest.raises(mig.MigrationRefused): mig.apply_plan(conn, plan) finally: conn.close() # --------------------------------------------------------------------------- # reporting tests # --------------------------------------------------------------------------- def test_plan_to_json_dict_is_serializable(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") plan = mig.build_plan(conn, registry_path) finally: conn.close() payload = mig.plan_to_json_dict(plan) # Must be JSON-serializable json_str = json.dumps(payload, default=str) assert "p05-interferometer" in json_str assert payload["counts"]["state_rekey_rows"] == 1 def test_write_report_creates_file(tmp_path, project_registry): registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() report_dir = tmp_path / "reports" report_path = mig.write_report( plan, summary=None, db_path=Path("/tmp/fake.db"), registry_path=registry_path, mode="dry-run", report_dir=report_dir, ) assert report_path.exists() payload = json.loads(report_path.read_text(encoding="utf-8")) assert payload["mode"] == "dry-run" assert "plan" in payload def test_render_plan_text_on_empty_plan(project_registry): registry_path = project_registry() # empty conn = _open_db_connection() try: plan = mig.build_plan(conn, registry_path) finally: conn.close() text = mig.render_plan_text(plan) assert "nothing to plan" in text.lower() def test_render_plan_text_on_collision(project_registry): registry_path = project_registry( ("p05-interferometer", ["p05"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") canonical_id = _seed_shadow_project(conn, "p05-interferometer") _seed_state_row(conn, shadow_id, "status", "phase", "A") _seed_state_row(conn, canonical_id, "status", "phase", "B") plan = mig.build_plan(conn, registry_path) finally: conn.close() text = mig.render_plan_text(plan) assert "COLLISION" in text.upper() assert "REFUSE" in text.upper() or "refuse" in text.lower() # --------------------------------------------------------------------------- # gap-closed companion test — the flip side of # test_legacy_alias_keyed_state_is_invisible_until_migrated in # test_project_state.py. After running this migration, the legacy row # IS reachable via the canonical id. # --------------------------------------------------------------------------- def test_legacy_alias_gap_is_closed_after_migration(project_registry): """End-to-end regression test for the canonicalization gap. Simulates the exact scenario from test_legacy_alias_keyed_state_is_invisible_until_migrated in test_project_state.py — a shadow projects row with a state row pointing at it. Runs the migration. Verifies the state is now reachable via the canonical id. """ registry_path = project_registry( ("p05-interferometer", ["p05", "interferometer"]) ) conn = _open_db_connection() try: shadow_id = _seed_shadow_project(conn, "p05") _seed_state_row( conn, shadow_id, "status", "legacy_focus", "Wave 1 ingestion" ) # Before migration: the legacy row is invisible to get_state # (this is the documented gap, covered in test_project_state.py) assert all( entry.value != "Wave 1 ingestion" for entry in get_state("p05") ) assert all( entry.value != "Wave 1 ingestion" for entry in get_state("p05-interferometer") ) # Run the migration plan = mig.build_plan(conn, registry_path) mig.apply_plan(conn, plan) finally: conn.close() # After migration: the row is reachable via canonical AND alias via_canonical = get_state("p05-interferometer") via_alias = get_state("p05") assert any(e.value == "Wave 1 ingestion" for e in via_canonical) assert any(e.value == "Wave 1 ingestion" for e in via_alias)