From 7e60f5a0e6fa4895098210e4a1844df58b967f8b Mon Sep 17 00:00:00 2001 From: Anto01 Date: Wed, 8 Apr 2026 15:08:16 -0400 Subject: [PATCH] feat(ops): legacy alias migration script with dry-run/apply modes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the compatibility gap documented in docs/architecture/project-identity-canonicalization.md. Before fb6298a, writes to project_state, memories, and interactions stored the raw project name. After fb6298a every service-layer entry point canonicalizes through the registry, which silently made pre-fix alias-keyed rows unreachable from the new read path. Now there's a migration tool to find and fix them. This commit is the tool and its tests. The tool is NOT run against the live Dalidou DB in this commit — that's a separate supervised manual step after reviewing the dry-run output. scripts/migrate_legacy_aliases.py --------------------------------- Standalone offline migration tool. Dry-run default, --apply explicit. What it inspects: - projects: rows whose name is a registered alias and differs from the canonical project_id (shadow rows) - project_state: rows whose project_id points at a shadow; plan rekeys them to the canonical row's id. (category, key) collisions against the canonical block the apply step until a human resolves - memories: rows whose project column is a registered alias. Plain string rekey. Dedup collisions (after rekey, same (memory_type, content, project, status)) are handled by the existing memory supersession model: newer row stays active, older becomes superseded with updated_at as tiebreaker - interactions: rows whose project column is a registered alias. Plain string rekey, no collision handling What it does NOT do: - Never touches rows that are already canonical - Never auto-resolves project_state collisions (refuses until the human picks a winner via POST /project/state) - Never creates data; only rekeys or supersedes - Never runs outside a single SQLite transaction; any failure rolls back the entire migration Safety rails: - Dry-run is default. --apply is explicit. - Apply on empty plan refuses unless --allow-empty (prevents accidental runs that look meaningful but did nothing) - Apply refuses on any project_state collision - Apply refuses on integrity errors (e.g. two case-variant rows both matching the canonical lookup) - Writes a JSON report to data/migrations/ on every run (dry-run and apply alike) for audit - Idempotent: running twice produces the same final state as running once. The second run finds zero shadow rows and exits clean. CLI flags: --registry PATH override ATOCORE_PROJECT_REGISTRY_PATH --db PATH override the AtoCore SQLite DB path --apply actually mutate (default is dry-run) --allow-empty permit --apply on an empty plan --report-dir PATH where to write the JSON report --json emit the plan as JSON instead of human prose Smoke test against the Phase 9 validation DB produces the expected "Nothing to migrate. The database is clean." output with 4 known canonical projects and 0 shadows. tests/test_migrate_legacy_aliases.py ------------------------------------ 19 new tests, all green: Plan-building: - test_dry_run_on_empty_registry_reports_empty_plan - test_dry_run_on_clean_registered_db_reports_empty_plan - test_dry_run_finds_shadow_project - test_dry_run_plans_state_rekey_without_collisions - test_dry_run_detects_state_collision - test_dry_run_plans_memory_rekey_and_supersession - test_dry_run_plans_interaction_rekey Apply: - test_apply_refuses_on_state_collision - test_apply_migrates_clean_shadow_end_to_end (verifies get_state can see the state via BOTH the alias AND the canonical after migration) - test_apply_drops_shadow_state_duplicate_without_collision (same (category, key, value) on both sides - mark shadow superseded, don't hit the UNIQUE constraint) - test_apply_migrates_memories - test_apply_migrates_interactions - test_apply_is_idempotent - test_apply_refuses_with_integrity_errors (uses case-variant canonical rows to work around projects.name UNIQUE constraint; verifies the case-insensitive duplicate detection works) Reporting: - test_plan_to_json_dict_is_serializable - test_write_report_creates_file - test_render_plan_text_on_empty_plan - test_render_plan_text_on_collision End-to-end gap closure (the most important test): - test_legacy_alias_gap_is_closed_after_migration - Seeds the exact same scenario as test_legacy_alias_keyed_state_is_invisible_until_migrated in test_project_state.py (which documents the pre-migration gap) - Confirms the row is invisible before migration - Runs the migration - Verifies the row is reachable via BOTH the canonical id AND the alias afterward - This test and the pre-migration gap test together lock in "before migration: invisible, after migration: reachable" as the documented invariant Full suite: 194 passing (was 175), 1 warning. The +19 is the new migration test file. Next concrete step after this commit ------------------------------------ - Run the dry-run against the live Dalidou DB to find out the actual blast radius. The script is the inspection SQL, codified. - Review the dry-run output together - If clean (zero shadows), no apply needed; close the doc gap as "verified nothing to migrate on this deployment" - If there are shadows, resolve any collisions via POST /project/state, then run --apply under supervision - After apply, the test_legacy_alias_keyed_state_is_invisible_until_migrated test still passes (it simulates the gap directly, so it's independent of the live DB state) and the gap-closed companion test continues to guard forward --- scripts/migrate_legacy_aliases.py | 886 +++++++++++++++++++++++++++ tests/test_migrate_legacy_aliases.py | 615 +++++++++++++++++++ 2 files changed, 1501 insertions(+) create mode 100644 scripts/migrate_legacy_aliases.py create mode 100644 tests/test_migrate_legacy_aliases.py diff --git a/scripts/migrate_legacy_aliases.py b/scripts/migrate_legacy_aliases.py new file mode 100644 index 0000000..eae4f66 --- /dev/null +++ b/scripts/migrate_legacy_aliases.py @@ -0,0 +1,886 @@ +"""Migrate legacy alias-keyed rows to canonical project ids. + +Standalone, offline migration tool that closes the compatibility gap +described in ``docs/architecture/project-identity-canonicalization.md``. +Before ``fb6298a`` landed, writes to ``/project/state``, ``/memory``, +and ``/interactions`` stored the project name verbatim. After +``fb6298a`` every service-layer entry point canonicalizes through the +project registry, which silently makes pre-fix alias-keyed rows +unreachable from the new read path. + +This script finds those shadow rows and rekeys them to the canonical +project id. + +Usage +----- + +Dry-run (default, safe, idempotent): + + python scripts/migrate_legacy_aliases.py + +Override the registry or DB location: + + python scripts/migrate_legacy_aliases.py \ + --registry /srv/storage/atocore/config/project-registry.json \ + --db /srv/storage/atocore/data/db/atocore.db + +Apply after reviewing dry-run output: + + python scripts/migrate_legacy_aliases.py --apply + +Emit the plan as JSON instead of human prose: + + python scripts/migrate_legacy_aliases.py --json + +Behavior +-------- + +Dry-run always succeeds. It walks: + +1. ``projects`` — any row whose ``name`` (lowercased) matches a + registered alias and differs from the canonical project id. These + are "shadow" rows. +2. ``project_state`` — any row whose ``project_id`` points at a + shadow projects row. The plan rekeys these to the canonical + projects id. A collision (shadow and canonical both have state + under the same ``(category, key)``) blocks the apply step. +3. ``memories`` — any row whose ``project`` column is a registered + alias. The plan rekeys these to the canonical project id. + Duplicate-after-rekey cases are handled by marking the older row + ``superseded`` and keeping the newer row active. +4. ``interactions`` — any row whose ``project`` column is a + registered alias. Plain string rekey, no collision handling. + +Apply runs the plan inside a single SQLite transaction. Any error +rolls back the whole migration. A JSON report is written under +``data/migrations/`` (respects ``ATOCORE_DATA_DIR``). + +Safety rails +------------ + +- Dry-run is the default; ``--apply`` is explicit and not the default. +- Apply on an empty plan is a no-op that refuses unless + ``--allow-empty`` is passed (prevents accidental runs on clean DBs + that look like they did something). +- Apply refuses if the plan has any project_state collisions. The + operator must resolve the collision via the normal + ``POST /project/state`` path (or by manually editing the shadow + row) before running ``--apply``. +- The script never touches rows that are already canonical. Running + ``--apply`` twice produces the same final state as running it + once (idempotent). +- A pre-apply integrity check refuses to run if the DB has more than + one row for the canonical id of any alias — a situation the + migration wasn't designed to handle. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sqlite3 +import sys +import uuid +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(_REPO_ROOT / "src")) + + +# --------------------------------------------------------------------------- +# Data classes describing the plan +# --------------------------------------------------------------------------- + + +@dataclass +class ShadowProject: + """A projects row whose name is a registered alias.""" + + shadow_row_id: str + shadow_name: str + canonical_project_id: str # the id field from the registry (the name we want) + + +@dataclass +class StateRekeyPlan: + shadow_row_id: str + canonical_row_id: str # resolved at plan time; may be created if missing + canonical_project_id: str + rows_to_rekey: list[dict] = field(default_factory=list) + collisions: list[dict] = field(default_factory=list) + + +@dataclass +class MemoryRekeyPlan: + alias: str + canonical_project_id: str + rows_to_rekey: list[dict] = field(default_factory=list) + to_supersede: list[dict] = field(default_factory=list) + + +@dataclass +class InteractionRekeyPlan: + alias: str + canonical_project_id: str + rows_to_rekey: list[dict] = field(default_factory=list) + + +@dataclass +class MigrationPlan: + alias_map: dict[str, str] = field(default_factory=dict) # lowercased alias -> canonical id + shadow_projects: list[ShadowProject] = field(default_factory=list) + state_plans: list[StateRekeyPlan] = field(default_factory=list) + memory_plans: list[MemoryRekeyPlan] = field(default_factory=list) + interaction_plans: list[InteractionRekeyPlan] = field(default_factory=list) + integrity_errors: list[str] = field(default_factory=list) + + @property + def has_collisions(self) -> bool: + return any(p.collisions for p in self.state_plans) + + @property + def is_empty(self) -> bool: + return ( + not self.shadow_projects + and not any(p.rows_to_rekey for p in self.memory_plans) + and not any(p.rows_to_rekey for p in self.interaction_plans) + ) + + def counts(self) -> dict: + return { + "shadow_projects": len(self.shadow_projects), + "state_rekey_rows": sum(len(p.rows_to_rekey) for p in self.state_plans), + "state_collisions": sum(len(p.collisions) for p in self.state_plans), + "memory_rekey_rows": sum(len(p.rows_to_rekey) for p in self.memory_plans), + "memory_supersede_rows": sum(len(p.to_supersede) for p in self.memory_plans), + "interaction_rekey_rows": sum(len(p.rows_to_rekey) for p in self.interaction_plans), + } + + +# --------------------------------------------------------------------------- +# Plan building +# --------------------------------------------------------------------------- + + +def build_alias_map(registry_path: Path) -> dict[str, str]: + """Load the registry and return {lowercased alias_or_id: canonical_id}. + + The canonical id itself is also in the map so later code can + distinguish "already canonical" from "is an alias". + """ + if not registry_path.exists(): + return {} + payload = json.loads(registry_path.read_text(encoding="utf-8")) + mapping: dict[str, str] = {} + for entry in payload.get("projects", []): + canonical = str(entry.get("id", "")).strip() + if not canonical: + continue + mapping[canonical.lower()] = canonical + for alias in entry.get("aliases", []): + alias_str = str(alias).strip() + if not alias_str: + continue + mapping[alias_str.lower()] = canonical + return mapping + + +def _dict_row(row: sqlite3.Row) -> dict: + return {k: row[k] for k in row.keys()} + + +def find_shadow_projects( + conn: sqlite3.Connection, alias_map: dict[str, str] +) -> list[ShadowProject]: + """Find rows in the projects table whose name is a registered + alias that differs from the canonical project id.""" + shadows: list[ShadowProject] = [] + rows = conn.execute("SELECT id, name FROM projects").fetchall() + for row in rows: + name = (row["name"] or "").strip() + if not name: + continue + canonical = alias_map.get(name.lower()) + if canonical is None: + continue # not in registry; leave it alone (unregistered-name fallback) + if name == canonical: + continue # already canonical; nothing to migrate + shadows.append( + ShadowProject( + shadow_row_id=row["id"], + shadow_name=name, + canonical_project_id=canonical, + ) + ) + return shadows + + +def _find_or_plan_canonical_row( + conn: sqlite3.Connection, canonical_project_id: str +) -> str | None: + """Return the existing projects.id row whose name matches the + canonical project id, or None if no such row exists yet. + + Case-insensitive lookup matches ``ensure_project``'s semantics. + """ + row = conn.execute( + "SELECT id FROM projects WHERE lower(name) = lower(?)", + (canonical_project_id,), + ).fetchone() + return row["id"] if row else None + + +def check_canonical_integrity( + conn: sqlite3.Connection, alias_map: dict[str, str] +) -> list[str]: + """Refuse to run if any canonical project has more than one row. + + This shouldn't happen under normal operation, but if it does the + migration can't proceed because we don't know which canonical row + the shadow dependents should be rekeyed to. + """ + errors: list[str] = [] + seen_canonicals: set[str] = set() + for canonical in alias_map.values(): + if canonical in seen_canonicals: + continue + seen_canonicals.add(canonical) + count_row = conn.execute( + "SELECT COUNT(*) AS c FROM projects WHERE lower(name) = lower(?)", + (canonical,), + ).fetchone() + if count_row and count_row["c"] > 1: + errors.append( + f"canonical project '{canonical}' has {count_row['c']} rows in " + f"the projects table; migration refuses to run until this is " + f"resolved manually" + ) + return errors + + +def plan_state_migration( + conn: sqlite3.Connection, + shadows: list[ShadowProject], +) -> list[StateRekeyPlan]: + plans: list[StateRekeyPlan] = [] + for shadow in shadows: + canonical_row_id = _find_or_plan_canonical_row( + conn, shadow.canonical_project_id + ) + # If the canonical row doesn't exist yet we'll create it at + # apply time; planning can use a placeholder. + planned_canonical_row_id = canonical_row_id or "" + + plan = StateRekeyPlan( + shadow_row_id=shadow.shadow_row_id, + canonical_row_id=planned_canonical_row_id, + canonical_project_id=shadow.canonical_project_id, + ) + + # All state rows currently attached to the shadow + state_rows = conn.execute( + "SELECT * FROM project_state WHERE project_id = ? AND status = 'active'", + (shadow.shadow_row_id,), + ).fetchall() + + for raw in state_rows: + state = _dict_row(raw) + if canonical_row_id is None: + # No canonical row exists yet; nothing to collide with. + # The rekey is clean. + plan.rows_to_rekey.append(state) + continue + + # Check for a collision: an active state row on the + # canonical project with the same (category, key) whose + # value differs from the shadow's. + collision_row = conn.execute( + "SELECT * FROM project_state " + "WHERE project_id = ? AND category = ? AND key = ? AND status = 'active'", + (canonical_row_id, state["category"], state["key"]), + ).fetchone() + + if collision_row is None: + plan.rows_to_rekey.append(state) + continue + + if collision_row["value"] == state["value"]: + # Same value — trivially deduplicable. Plan to drop + # the shadow row (by marking it superseded via the + # rekey pathway: rekey would hit the UNIQUE + # constraint so we can't just UPDATE; instead we'll + # mark the shadow superseded at apply time). + state["_duplicate_of"] = _dict_row(collision_row) + plan.rows_to_rekey.append(state) + continue + + plan.collisions.append( + { + "shadow": state, + "canonical": _dict_row(collision_row), + } + ) + + plans.append(plan) + return plans + + +def plan_memory_migration( + conn: sqlite3.Connection, alias_map: dict[str, str] +) -> list[MemoryRekeyPlan]: + plans: list[MemoryRekeyPlan] = [] + + # Group aliases by canonical to build one plan per canonical. + aliases_by_canonical: dict[str, list[str]] = {} + for alias, canonical in alias_map.items(): + if alias == canonical.lower(): + continue # canonical string is not an alias worth rekeying + aliases_by_canonical.setdefault(canonical, []).append(alias) + + for canonical, aliases in aliases_by_canonical.items(): + for alias_key in aliases: + # memories store the string (mixed case allowed); find any + # row whose project column (lowercased) matches the alias + raw_rows = conn.execute( + "SELECT * FROM memories WHERE lower(project) = lower(?)", + (alias_key,), + ).fetchall() + if not raw_rows: + continue + + plan = MemoryRekeyPlan(alias=alias_key, canonical_project_id=canonical) + for raw in raw_rows: + mem = _dict_row(raw) + # Check for dedup collision against the canonical bucket + dup = conn.execute( + "SELECT * FROM memories WHERE memory_type = ? AND content = ? " + "AND lower(project) = lower(?) AND status = ? AND id != ?", + ( + mem["memory_type"], + mem["content"], + canonical, + mem["status"], + mem["id"], + ), + ).fetchone() + if dup is None: + plan.rows_to_rekey.append(mem) + continue + dup_dict = _dict_row(dup) + # Newer row wins; older is superseded. updated_at is + # the tiebreaker because created_at is the storage + # insert time and updated_at reflects last mutation. + shadow_updated = mem.get("updated_at") or "" + canonical_updated = dup_dict.get("updated_at") or "" + if shadow_updated > canonical_updated: + # Shadow is newer: canonical becomes superseded, + # shadow rekeys into active. + plan.rows_to_rekey.append(mem) + plan.to_supersede.append(dup_dict) + else: + # Canonical is newer (or equal): shadow becomes + # superseded in place (we still rekey it so its + # project column is canonical, but its status flips + # to superseded). + mem["_action"] = "supersede" + plan.to_supersede.append(mem) + plans.append(plan) + return plans + + +def plan_interaction_migration( + conn: sqlite3.Connection, alias_map: dict[str, str] +) -> list[InteractionRekeyPlan]: + plans: list[InteractionRekeyPlan] = [] + aliases_by_canonical: dict[str, list[str]] = {} + for alias, canonical in alias_map.items(): + if alias == canonical.lower(): + continue + aliases_by_canonical.setdefault(canonical, []).append(alias) + + for canonical, aliases in aliases_by_canonical.items(): + for alias_key in aliases: + raw_rows = conn.execute( + "SELECT * FROM interactions WHERE lower(project) = lower(?)", + (alias_key,), + ).fetchall() + if not raw_rows: + continue + plan = InteractionRekeyPlan(alias=alias_key, canonical_project_id=canonical) + for raw in raw_rows: + plan.rows_to_rekey.append(_dict_row(raw)) + plans.append(plan) + return plans + + +def build_plan( + conn: sqlite3.Connection, registry_path: Path +) -> MigrationPlan: + alias_map = build_alias_map(registry_path) + plan = MigrationPlan(alias_map=alias_map) + if not alias_map: + return plan + + plan.integrity_errors = check_canonical_integrity(conn, alias_map) + if plan.integrity_errors: + return plan + + plan.shadow_projects = find_shadow_projects(conn, alias_map) + plan.state_plans = plan_state_migration(conn, plan.shadow_projects) + plan.memory_plans = plan_memory_migration(conn, alias_map) + plan.interaction_plans = plan_interaction_migration(conn, alias_map) + return plan + + +# --------------------------------------------------------------------------- +# Apply +# --------------------------------------------------------------------------- + + +class MigrationRefused(RuntimeError): + """Raised when the apply step refuses to run due to a safety rail.""" + + +def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: + """Execute the plan inside a single SQLite transaction. + + Returns a summary dict. Raises ``MigrationRefused`` if the plan + has integrity errors, has state collisions, or is empty (callers + that want to apply an empty plan must check ``plan.is_empty`` + themselves). + """ + if plan.integrity_errors: + raise MigrationRefused( + "migration refuses to run due to integrity errors: " + + "; ".join(plan.integrity_errors) + ) + if plan.has_collisions: + raise MigrationRefused( + "migration refuses to run; the plan has " + f"{sum(len(p.collisions) for p in plan.state_plans)} " + "project_state collision(s) that require manual resolution" + ) + + summary: dict = { + "shadow_projects_deleted": 0, + "state_rows_rekeyed": 0, + "state_rows_merged_as_duplicate": 0, + "memory_rows_rekeyed": 0, + "memory_rows_superseded": 0, + "interaction_rows_rekeyed": 0, + "canonical_rows_created": 0, + } + + try: + conn.execute("BEGIN") + + # --- Projects + project_state ---------------------------------- + for sp_plan in plan.state_plans: + canonical_row_id = sp_plan.canonical_row_id + if canonical_row_id == "": + # Create the canonical projects row now + canonical_row_id = str(uuid.uuid4()) + conn.execute( + "INSERT INTO projects (id, name, description) " + "VALUES (?, ?, ?)", + ( + canonical_row_id, + sp_plan.canonical_project_id, + "created by legacy alias migration", + ), + ) + summary["canonical_rows_created"] += 1 + + for state in sp_plan.rows_to_rekey: + if "_duplicate_of" in state: + # Same (category, key, value) already on canonical; + # mark the shadow state row as superseded rather + # than hitting the UNIQUE constraint. + conn.execute( + "UPDATE project_state SET status = 'superseded', " + "updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (state["id"],), + ) + summary["state_rows_merged_as_duplicate"] += 1 + continue + conn.execute( + "UPDATE project_state SET project_id = ?, " + "updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (canonical_row_id, state["id"]), + ) + summary["state_rows_rekeyed"] += 1 + + # Delete shadow projects rows after their state dependents + # have moved. We can do this unconditionally because project_state + # has ON DELETE CASCADE on project_id, and all our dependents + # have already been rekeyed or superseded above. + for sp_plan in plan.state_plans: + conn.execute( + "DELETE FROM projects WHERE id = ?", (sp_plan.shadow_row_id,) + ) + summary["shadow_projects_deleted"] += 1 + + # Also delete any shadow projects that had no state rows + # attached (find_shadow_projects returned them but + # plan_state_migration produced an empty plan entry for them). + # They're already covered by the loop above because every + # shadow has a state plan entry, even if empty. + + # --- Memories -------------------------------------------------- + for mem_plan in plan.memory_plans: + for mem in mem_plan.rows_to_rekey: + conn.execute( + "UPDATE memories SET project = ?, " + "updated_at = CURRENT_TIMESTAMP WHERE id = ?", + (mem_plan.canonical_project_id, mem["id"]), + ) + summary["memory_rows_rekeyed"] += 1 + for mem in mem_plan.to_supersede: + # Supersede the older/duplicate row AND rekey its + # project to canonical so the historical audit trail + # reflects the canonical project. + conn.execute( + "UPDATE memories SET status = 'superseded', " + "project = ?, updated_at = CURRENT_TIMESTAMP " + "WHERE id = ?", + (mem_plan.canonical_project_id, mem["id"]), + ) + summary["memory_rows_superseded"] += 1 + + # --- Interactions ---------------------------------------------- + for ix_plan in plan.interaction_plans: + for ix in ix_plan.rows_to_rekey: + conn.execute( + "UPDATE interactions SET project = ? WHERE id = ?", + (ix_plan.canonical_project_id, ix["id"]), + ) + summary["interaction_rows_rekeyed"] += 1 + + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + raise + + return summary + + +# --------------------------------------------------------------------------- +# Reporting +# --------------------------------------------------------------------------- + + +def render_plan_text(plan: MigrationPlan) -> str: + lines: list[str] = [] + lines.append("=" * 72) + lines.append("Legacy alias migration — plan") + lines.append("=" * 72) + + if plan.integrity_errors: + lines.append("") + lines.append("INTEGRITY ERRORS — migration cannot run:") + for err in plan.integrity_errors: + lines.append(f" - {err}") + return "\n".join(lines) + + lines.append("") + if not plan.alias_map: + lines.append("No registered aliases; nothing to plan.") + return "\n".join(lines) + + canonicals = sorted(set(plan.alias_map.values())) + lines.append(f"Known canonical projects ({len(canonicals)}):") + for canonical in canonicals: + aliases = [ + alias + for alias, canon in plan.alias_map.items() + if canon == canonical and alias != canonical.lower() + ] + lines.append(f" - {canonical} (aliases: {', '.join(aliases) or 'none'})") + + lines.append("") + counts = plan.counts() + lines.append("Counts:") + lines.append(f" shadow projects : {counts['shadow_projects']}") + lines.append(f" state rekey rows : {counts['state_rekey_rows']}") + lines.append(f" state collisions : {counts['state_collisions']}") + lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}") + lines.append(f" memory supersede : {counts['memory_supersede_rows']}") + lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}") + + if plan.is_empty: + lines.append("") + lines.append("Nothing to migrate. The database is clean.") + return "\n".join(lines) + + if plan.shadow_projects: + lines.append("") + lines.append("Shadow projects (alias-named rows that will be deleted):") + for shadow in plan.shadow_projects: + lines.append( + f" - '{shadow.shadow_name}' (row {shadow.shadow_row_id[:8]}) " + f"-> '{shadow.canonical_project_id}'" + ) + + if plan.has_collisions: + lines.append("") + lines.append("*** STATE COLLISIONS — apply will REFUSE until resolved: ***") + for sp_plan in plan.state_plans: + for collision in sp_plan.collisions: + shadow = collision["shadow"] + canonical = collision["canonical"] + lines.append( + f" - [{shadow['category']}/{shadow['key']}] " + f"shadow value={shadow['value']!r} vs " + f"canonical value={canonical['value']!r}" + ) + lines.append("") + lines.append( + "Resolution: use POST /project/state to pick the winning value, " + "then re-run --apply." + ) + return "\n".join(lines) + + lines.append("") + lines.append("Apply plan (if --apply is given):") + for sp_plan in plan.state_plans: + if sp_plan.rows_to_rekey: + lines.append( + f" - rekey {len(sp_plan.rows_to_rekey)} project_state row(s) " + f"from shadow {sp_plan.shadow_row_id[:8]} -> '{sp_plan.canonical_project_id}'" + ) + for mem_plan in plan.memory_plans: + if mem_plan.rows_to_rekey: + lines.append( + f" - rekey {len(mem_plan.rows_to_rekey)} memory row(s) from " + f"project='{mem_plan.alias}' -> '{mem_plan.canonical_project_id}'" + ) + if mem_plan.to_supersede: + lines.append( + f" - supersede {len(mem_plan.to_supersede)} duplicate " + f"memory row(s) under '{mem_plan.canonical_project_id}'" + ) + for ix_plan in plan.interaction_plans: + if ix_plan.rows_to_rekey: + lines.append( + f" - rekey {len(ix_plan.rows_to_rekey)} interaction row(s) " + f"from project='{ix_plan.alias}' -> '{ix_plan.canonical_project_id}'" + ) + + return "\n".join(lines) + + +def plan_to_json_dict(plan: MigrationPlan) -> dict: + return { + "alias_map": plan.alias_map, + "integrity_errors": plan.integrity_errors, + "counts": plan.counts(), + "is_empty": plan.is_empty, + "has_collisions": plan.has_collisions, + "shadow_projects": [asdict(sp) for sp in plan.shadow_projects], + "state_plans": [ + { + "shadow_row_id": sp.shadow_row_id, + "canonical_row_id": sp.canonical_row_id, + "canonical_project_id": sp.canonical_project_id, + "rows_to_rekey": sp.rows_to_rekey, + "collisions": sp.collisions, + } + for sp in plan.state_plans + ], + "memory_plans": [ + { + "alias": mp.alias, + "canonical_project_id": mp.canonical_project_id, + "rows_to_rekey": mp.rows_to_rekey, + "to_supersede": mp.to_supersede, + } + for mp in plan.memory_plans + ], + "interaction_plans": [ + { + "alias": ip.alias, + "canonical_project_id": ip.canonical_project_id, + "rows_to_rekey": ip.rows_to_rekey, + } + for ip in plan.interaction_plans + ], + } + + +def write_report( + plan: MigrationPlan, + summary: dict | None, + db_path: Path, + registry_path: Path, + mode: str, + report_dir: Path, +) -> Path: + report_dir.mkdir(parents=True, exist_ok=True) + stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + report_path = report_dir / f"legacy-aliases-{stamp}.json" + payload = { + "created_at": datetime.now(timezone.utc).isoformat(), + "mode": mode, + "db_path": str(db_path), + "registry_path": str(registry_path), + "plan": plan_to_json_dict(plan), + "apply_summary": summary, + } + report_path.write_text( + json.dumps(payload, indent=2, default=str, ensure_ascii=True) + "\n", + encoding="utf-8", + ) + return report_path + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def _open_db(db_path: Path) -> sqlite3.Connection: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys = ON") + return conn + + +def run( + db_path: Path, + registry_path: Path, + apply: bool, + allow_empty: bool, + report_dir: Path, + out_json: bool, +) -> int: + # Ensure the schema exists (init_db handles migrations too). We + # do this by importing the app-side init path and pointing it at + # the caller's db_path via ATOCORE_DATA_DIR + ATOCORE_DB_DIR. + os.environ["ATOCORE_DB_DIR"] = str(db_path.parent) + os.environ["ATOCORE_PROJECT_REGISTRY_PATH"] = str(registry_path) + import atocore.config as _config + + _config.settings = _config.Settings() + from atocore.context.project_state import init_project_state_schema + from atocore.models.database import init_db + + init_db() + init_project_state_schema() + + conn = _open_db(db_path) + try: + plan = build_plan(conn, registry_path) + + if out_json: + print(json.dumps(plan_to_json_dict(plan), indent=2, default=str)) + else: + print(render_plan_text(plan)) + + if not apply: + # Dry-run always writes a report for audit + write_report( + plan, + summary=None, + db_path=db_path, + registry_path=registry_path, + mode="dry-run", + report_dir=report_dir, + ) + return 0 + + # --apply path + if plan.integrity_errors: + print("\nRefused: integrity errors prevent apply.", file=sys.stderr) + return 2 + if plan.has_collisions: + print( + "\nRefused: project_state collisions prevent apply.", + file=sys.stderr, + ) + return 2 + if plan.is_empty and not allow_empty: + print( + "\nRefused: plan is empty. Pass --allow-empty to confirm.", + file=sys.stderr, + ) + return 3 + + summary = apply_plan(conn, plan) + print("\nApply complete:") + for k, v in summary.items(): + print(f" {k}: {v}") + + report_path = write_report( + plan, + summary=summary, + db_path=db_path, + registry_path=registry_path, + mode="apply", + report_dir=report_dir, + ) + print(f"\nReport: {report_path}") + return 0 + finally: + conn.close() + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__.split("\n\n", 1)[0]) + parser.add_argument( + "--registry", + type=Path, + default=None, + help="override ATOCORE_PROJECT_REGISTRY_PATH for this run", + ) + parser.add_argument( + "--db", + type=Path, + default=None, + help="override the AtoCore SQLite DB path", + ) + parser.add_argument("--apply", action="store_true", help="actually mutate the DB") + parser.add_argument( + "--allow-empty", + action="store_true", + help="don't refuse when --apply is called with an empty plan", + ) + parser.add_argument( + "--report-dir", + type=Path, + default=None, + help="where to write the JSON report (default: data/migrations/)", + ) + parser.add_argument( + "--json", + action="store_true", + help="emit the plan as JSON instead of human prose", + ) + args = parser.parse_args() + + # Resolve the registry and db paths. If the caller didn't override + # them, fall back to what the app's config resolves today. + import atocore.config as _config + + _config.settings = _config.Settings() + + registry_path = args.registry or _config.settings.resolved_project_registry_path + db_path = args.db or _config.settings.db_path + report_dir = ( + args.report_dir or _config.settings.resolved_data_dir / "migrations" + ) + + return run( + db_path=Path(db_path), + registry_path=Path(registry_path), + apply=args.apply, + allow_empty=args.allow_empty, + report_dir=Path(report_dir), + out_json=args.json, + ) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_migrate_legacy_aliases.py b/tests/test_migrate_legacy_aliases.py new file mode 100644 index 0000000..fbfde95 --- /dev/null +++ b/tests/test_migrate_legacy_aliases.py @@ -0,0 +1,615 @@ +"""Tests for scripts/migrate_legacy_aliases.py. + +The migration script closes the compatibility gap documented in +docs/architecture/project-identity-canonicalization.md. These tests +cover: + +- empty/clean database behavior +- shadow projects detection +- state rekey without collisions +- state collision detection + apply refusal +- memory rekey + supersession of duplicates +- interaction rekey +- end-to-end apply on a realistic shadow +- idempotency (running twice produces the same final state) +- report artifact is written +- the pre-fix regression gap is actually closed after migration +""" + +from __future__ import annotations + +import json +import sqlite3 +import sys +import uuid +from pathlib import Path + +import pytest + +from atocore.context.project_state import ( + get_state, + init_project_state_schema, +) +from atocore.models.database import init_db + +# Make scripts/ importable +_REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(_REPO_ROOT / "scripts")) + +import migrate_legacy_aliases as mig # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers that seed "legacy" rows the way they would have looked before fb6298a +# --------------------------------------------------------------------------- + + +def _open_db_connection(): + """Open a direct SQLite connection to the test data dir's DB.""" + import atocore.config as config + + conn = sqlite3.connect(str(config.settings.db_path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys = ON") + return conn + + +def _seed_shadow_project( + conn: sqlite3.Connection, shadow_name: str +) -> str: + """Insert a projects row keyed under an alias, like the old set_state would have.""" + project_id = str(uuid.uuid4()) + conn.execute( + "INSERT INTO projects (id, name, description) VALUES (?, ?, ?)", + (project_id, shadow_name, f"shadow row for {shadow_name}"), + ) + conn.commit() + return project_id + + +def _seed_state_row( + conn: sqlite3.Connection, + project_id: str, + category: str, + key: str, + value: str, +) -> str: + row_id = str(uuid.uuid4()) + conn.execute( + "INSERT INTO project_state " + "(id, project_id, category, key, value, source, confidence) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (row_id, project_id, category, key, value, "legacy-test", 1.0), + ) + conn.commit() + return row_id + + +def _seed_memory_row( + conn: sqlite3.Connection, + memory_type: str, + content: str, + project: str, + status: str = "active", +) -> str: + row_id = str(uuid.uuid4()) + conn.execute( + "INSERT INTO memories " + "(id, memory_type, content, project, source_chunk_id, confidence, status) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (row_id, memory_type, content, project, None, 1.0, status), + ) + conn.commit() + return row_id + + +def _seed_interaction_row( + conn: sqlite3.Connection, prompt: str, project: str +) -> str: + row_id = str(uuid.uuid4()) + conn.execute( + "INSERT INTO interactions " + "(id, prompt, context_pack, response_summary, response, " + " memories_used, chunks_used, client, session_id, project, created_at) " + "VALUES (?, ?, '{}', '', '', '[]', '[]', 'legacy-test', '', ?, '2026-04-01 12:00:00')", + (row_id, prompt, project), + ) + conn.commit() + return row_id + + +# --------------------------------------------------------------------------- +# plan-building tests +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _setup(tmp_data_dir): + init_db() + init_project_state_schema() + + +def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir): + """Empty registry -> empty alias map -> empty plan.""" + registry_path = tmp_data_dir / "empty-registry.json" + registry_path.write_text('{"projects": []}', encoding="utf-8") + + conn = _open_db_connection() + try: + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + assert plan.alias_map == {} + assert plan.is_empty + assert not plan.has_collisions + assert plan.counts() == { + "shadow_projects": 0, + "state_rekey_rows": 0, + "state_collisions": 0, + "memory_rekey_rows": 0, + "memory_supersede_rows": 0, + "interaction_rekey_rows": 0, + } + + +def test_dry_run_on_clean_registered_db_reports_empty_plan(project_registry): + """A registry with projects but no legacy rows -> empty plan.""" + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + assert plan.alias_map != {} + assert plan.is_empty + + +def test_dry_run_finds_shadow_project(project_registry): + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + _seed_shadow_project(conn, "p05") + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + assert len(plan.shadow_projects) == 1 + assert plan.shadow_projects[0].shadow_name == "p05" + assert plan.shadow_projects[0].canonical_project_id == "p05-interferometer" + + +def test_dry_run_plans_state_rekey_without_collisions(project_registry): + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1 ingestion") + _seed_state_row(conn, shadow_id, "decision", "lateral_support", "GF-PTFE") + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + assert len(plan.state_plans) == 1 + sp = plan.state_plans[0] + assert len(sp.rows_to_rekey) == 2 + assert sp.collisions == [] + assert not plan.has_collisions + + +def test_dry_run_detects_state_collision(project_registry): + """Shadow and canonical both have state under the same (category, key) with different values.""" + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + canonical_id = _seed_shadow_project(conn, "p05-interferometer") + _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") + _seed_state_row( + conn, canonical_id, "status", "next_focus", "Wave 2" + ) + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + assert plan.has_collisions + collision = plan.state_plans[0].collisions[0] + assert collision["shadow"]["value"] == "Wave 1" + assert collision["canonical"]["value"] == "Wave 2" + + +def test_dry_run_plans_memory_rekey_and_supersession(project_registry): + registry_path = project_registry( + ("p04-gigabit", ["p04", "gigabit"]) + ) + + conn = _open_db_connection() + try: + # A clean memory under the alias that will just be rekeyed + _seed_memory_row(conn, "project", "clean rekey memory", "p04") + # A memory that collides with an existing canonical memory + _seed_memory_row(conn, "project", "duplicate content", "p04") + _seed_memory_row(conn, "project", "duplicate content", "p04-gigabit") + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + # There's exactly one memory plan (one alias matched) + assert len(plan.memory_plans) == 1 + mp = plan.memory_plans[0] + # Two rows are candidates for rekey or supersession — one clean, + # one duplicate. The duplicate is handled via to_supersede; the + # other via rows_to_rekey. + total_affected = len(mp.rows_to_rekey) + len(mp.to_supersede) + assert total_affected == 2 + + +def test_dry_run_plans_interaction_rekey(project_registry): + registry_path = project_registry( + ("p06-polisher", ["p06", "polisher"]) + ) + + conn = _open_db_connection() + try: + _seed_interaction_row(conn, "quick capture under alias", "polisher") + _seed_interaction_row(conn, "another alias-keyed row", "p06") + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + total = sum(len(p.rows_to_rekey) for p in plan.interaction_plans) + assert total == 2 + + +# --------------------------------------------------------------------------- +# apply tests +# --------------------------------------------------------------------------- + + +def test_apply_refuses_on_state_collision(project_registry): + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + canonical_id = _seed_shadow_project(conn, "p05-interferometer") + _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") + _seed_state_row(conn, canonical_id, "status", "next_focus", "Wave 2") + + plan = mig.build_plan(conn, registry_path) + assert plan.has_collisions + + with pytest.raises(mig.MigrationRefused): + mig.apply_plan(conn, plan) + finally: + conn.close() + + +def test_apply_migrates_clean_shadow_end_to_end(project_registry): + """The happy path: one shadow project with clean state rows, rekey into a freshly-created canonical row, verify reachability via get_state.""" + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + _seed_state_row( + conn, shadow_id, "status", "next_focus", "Wave 1 ingestion" + ) + _seed_state_row( + conn, shadow_id, "decision", "lateral_support", "GF-PTFE" + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["state_rows_rekeyed"] == 2 + assert summary["shadow_projects_deleted"] == 1 + assert summary["canonical_rows_created"] == 1 + + # The regression gap is now closed: the service layer can see + # the state under the canonical id via either the alias OR the + # canonical. + via_alias = get_state("p05") + via_canonical = get_state("p05-interferometer") + assert len(via_alias) == 2 + assert len(via_canonical) == 2 + values = {entry.value for entry in via_canonical} + assert values == {"Wave 1 ingestion", "GF-PTFE"} + + +def test_apply_drops_shadow_state_duplicate_without_collision(project_registry): + """Shadow and canonical both have the same (category, key, value) — shadow gets marked superseded rather than hitting the UNIQUE constraint.""" + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + canonical_id = _seed_shadow_project(conn, "p05-interferometer") + _seed_state_row( + conn, shadow_id, "status", "next_focus", "Wave 1 ingestion" + ) + _seed_state_row( + conn, canonical_id, "status", "next_focus", "Wave 1 ingestion" + ) + + plan = mig.build_plan(conn, registry_path) + assert not plan.has_collisions + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["state_rows_merged_as_duplicate"] == 1 + + via_canonical = get_state("p05-interferometer") + # Exactly one active row survives + assert len(via_canonical) == 1 + assert via_canonical[0].value == "Wave 1 ingestion" + + +def test_apply_migrates_memories(project_registry): + registry_path = project_registry( + ("p04-gigabit", ["p04", "gigabit"]) + ) + + conn = _open_db_connection() + try: + _seed_memory_row(conn, "project", "lateral support uses GF-PTFE", "p04") + _seed_memory_row(conn, "preference", "I prefer descriptive commits", "gigabit") + plan = mig.build_plan(conn, registry_path) + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["memory_rows_rekeyed"] == 2 + + # Both memories should now read as living under the canonical id + from atocore.memory.service import get_memories + + rows = get_memories(project="p04-gigabit", limit=50) + contents = {m.content for m in rows} + assert "lateral support uses GF-PTFE" in contents + assert "I prefer descriptive commits" in contents + + +def test_apply_migrates_interactions(project_registry): + registry_path = project_registry( + ("p06-polisher", ["p06", "polisher"]) + ) + + conn = _open_db_connection() + try: + _seed_interaction_row(conn, "alias-keyed 1", "polisher") + _seed_interaction_row(conn, "alias-keyed 2", "p06") + plan = mig.build_plan(conn, registry_path) + summary = mig.apply_plan(conn, plan) + finally: + conn.close() + + assert summary["interaction_rows_rekeyed"] == 2 + + from atocore.interactions.service import list_interactions + + rows = list_interactions(project="p06-polisher", limit=50) + prompts = {i.prompt for i in rows} + assert prompts == {"alias-keyed 1", "alias-keyed 2"} + + +def test_apply_is_idempotent(project_registry): + """Running apply twice produces the same final state as running it once.""" + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") + _seed_memory_row(conn, "project", "m1", "p05") + _seed_interaction_row(conn, "i1", "p05") + + # first apply + plan_a = mig.build_plan(conn, registry_path) + summary_a = mig.apply_plan(conn, plan_a) + + # second apply: plan should be empty + plan_b = mig.build_plan(conn, registry_path) + assert plan_b.is_empty + + # forcing a second apply on the empty plan via the function + # directly should also succeed as a no-op (caller normally + # has to pass --allow-empty through the CLI, but apply_plan + # itself doesn't enforce that — the refusal is in run()) + summary_b = mig.apply_plan(conn, plan_b) + finally: + conn.close() + + assert summary_a["state_rows_rekeyed"] == 1 + assert summary_a["memory_rows_rekeyed"] == 1 + assert summary_a["interaction_rows_rekeyed"] == 1 + assert summary_b["state_rows_rekeyed"] == 0 + assert summary_b["memory_rows_rekeyed"] == 0 + assert summary_b["interaction_rows_rekeyed"] == 0 + + +def test_apply_refuses_with_integrity_errors(project_registry): + """If the projects table has two case-variant rows for the canonical id, refuse. + + The projects.name column has a case-sensitive UNIQUE constraint, + so exact duplicates can't exist. But case-variant rows + ``p05-interferometer`` and ``P05-Interferometer`` can both + survive the UNIQUE constraint while both matching the + case-insensitive ``lower(name) = lower(?)`` lookup that the + migration uses to find the canonical row. That ambiguity + (which canonical row should dependents rekey into?) is exactly + the integrity failure the migration is guarding against. + """ + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + _seed_shadow_project(conn, "p05-interferometer") + _seed_shadow_project(conn, "P05-Interferometer") + plan = mig.build_plan(conn, registry_path) + assert plan.integrity_errors + with pytest.raises(mig.MigrationRefused): + mig.apply_plan(conn, plan) + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# reporting tests +# --------------------------------------------------------------------------- + + +def test_plan_to_json_dict_is_serializable(project_registry): + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + _seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1") + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + payload = mig.plan_to_json_dict(plan) + # Must be JSON-serializable + json_str = json.dumps(payload, default=str) + assert "p05-interferometer" in json_str + assert payload["counts"]["state_rekey_rows"] == 1 + + +def test_write_report_creates_file(tmp_path, project_registry): + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + report_dir = tmp_path / "reports" + report_path = mig.write_report( + plan, + summary=None, + db_path=Path("/tmp/fake.db"), + registry_path=registry_path, + mode="dry-run", + report_dir=report_dir, + ) + assert report_path.exists() + payload = json.loads(report_path.read_text(encoding="utf-8")) + assert payload["mode"] == "dry-run" + assert "plan" in payload + + +def test_render_plan_text_on_empty_plan(project_registry): + registry_path = project_registry() # empty + conn = _open_db_connection() + try: + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + text = mig.render_plan_text(plan) + assert "nothing to plan" in text.lower() + + +def test_render_plan_text_on_collision(project_registry): + registry_path = project_registry( + ("p05-interferometer", ["p05"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + canonical_id = _seed_shadow_project(conn, "p05-interferometer") + _seed_state_row(conn, shadow_id, "status", "phase", "A") + _seed_state_row(conn, canonical_id, "status", "phase", "B") + plan = mig.build_plan(conn, registry_path) + finally: + conn.close() + + text = mig.render_plan_text(plan) + assert "COLLISION" in text.upper() + assert "REFUSE" in text.upper() or "refuse" in text.lower() + + +# --------------------------------------------------------------------------- +# gap-closed companion test — the flip side of +# test_legacy_alias_keyed_state_is_invisible_until_migrated in +# test_project_state.py. After running this migration, the legacy row +# IS reachable via the canonical id. +# --------------------------------------------------------------------------- + + +def test_legacy_alias_gap_is_closed_after_migration(project_registry): + """End-to-end regression test for the canonicalization gap. + + Simulates the exact scenario from + test_legacy_alias_keyed_state_is_invisible_until_migrated in + test_project_state.py — a shadow projects row with a state row + pointing at it. Runs the migration. Verifies the state is now + reachable via the canonical id. + """ + registry_path = project_registry( + ("p05-interferometer", ["p05", "interferometer"]) + ) + + conn = _open_db_connection() + try: + shadow_id = _seed_shadow_project(conn, "p05") + _seed_state_row( + conn, shadow_id, "status", "legacy_focus", "Wave 1 ingestion" + ) + + # Before migration: the legacy row is invisible to get_state + # (this is the documented gap, covered in test_project_state.py) + assert all( + entry.value != "Wave 1 ingestion" for entry in get_state("p05") + ) + assert all( + entry.value != "Wave 1 ingestion" + for entry in get_state("p05-interferometer") + ) + + # Run the migration + plan = mig.build_plan(conn, registry_path) + mig.apply_plan(conn, plan) + finally: + conn.close() + + # After migration: the row is reachable via canonical AND alias + via_canonical = get_state("p05-interferometer") + via_alias = get_state("p05") + assert any(e.value == "Wave 1 ingestion" for e in via_canonical) + assert any(e.value == "Wave 1 ingestion" for e in via_alias)