"""Migrate legacy alias-keyed rows to canonical project ids. Standalone, offline migration tool that closes the compatibility gap described in ``docs/architecture/project-identity-canonicalization.md``. Before ``fb6298a`` landed, writes to ``/project/state``, ``/memory``, and ``/interactions`` stored the project name verbatim. After ``fb6298a`` every service-layer entry point canonicalizes through the project registry, which silently makes pre-fix alias-keyed rows unreachable from the new read path. This script finds those shadow rows and rekeys them to the canonical project id. Usage ----- Dry-run (default, safe, idempotent): python scripts/migrate_legacy_aliases.py Override the registry or DB location: python scripts/migrate_legacy_aliases.py \ --registry /srv/storage/atocore/config/project-registry.json \ --db /srv/storage/atocore/data/db/atocore.db Apply after reviewing dry-run output: python scripts/migrate_legacy_aliases.py --apply Emit the plan as JSON instead of human prose: python scripts/migrate_legacy_aliases.py --json Behavior -------- Dry-run always succeeds. It walks: 1. ``projects`` — any row whose ``name`` (lowercased) matches a registered alias and differs from the canonical project id. These are "shadow" rows. 2. ``project_state`` — any row whose ``project_id`` points at a shadow projects row. The plan rekeys these to the canonical projects id. A collision (shadow and canonical both have state under the same ``(category, key)``) blocks the apply step. 3. ``memories`` — any row whose ``project`` column is a registered alias. The plan rekeys these to the canonical project id. Duplicate-after-rekey cases are handled by marking the older row ``superseded`` and keeping the newer row active. 4. ``interactions`` — any row whose ``project`` column is a registered alias. Plain string rekey, no collision handling. Apply runs the plan inside a single SQLite transaction. Any error rolls back the whole migration. A JSON report is written under ``data/migrations/`` (respects ``ATOCORE_DATA_DIR``). Safety rails ------------ - Dry-run is the default; ``--apply`` is explicit and not the default. - Apply on an empty plan is a no-op that refuses unless ``--allow-empty`` is passed (prevents accidental runs on clean DBs that look like they did something). - Apply refuses if the plan has any project_state collisions. The operator must resolve the collision via the normal ``POST /project/state`` path (or by manually editing the shadow row) before running ``--apply``. - The script never touches rows that are already canonical. Running ``--apply`` twice produces the same final state as running it once (idempotent). - A pre-apply integrity check refuses to run if the DB has more than one row for the canonical id of any alias — a situation the migration wasn't designed to handle. """ from __future__ import annotations import argparse import json import os import sqlite3 import sys import uuid from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(_REPO_ROOT / "src")) # --------------------------------------------------------------------------- # Data classes describing the plan # --------------------------------------------------------------------------- @dataclass class ShadowProject: """A projects row whose name is a registered alias.""" shadow_row_id: str shadow_name: str canonical_project_id: str # the id field from the registry (the name we want) @dataclass class StateRekeyPlan: shadow_row_id: str canonical_row_id: str # resolved at plan time; may be created if missing canonical_project_id: str rows_to_rekey: list[dict] = field(default_factory=list) collisions: list[dict] = field(default_factory=list) historical_drops: list[dict] = field(default_factory=list) @dataclass class MemoryRekeyPlan: alias: str canonical_project_id: str rows_to_rekey: list[dict] = field(default_factory=list) to_supersede: list[dict] = field(default_factory=list) @dataclass class InteractionRekeyPlan: alias: str canonical_project_id: str rows_to_rekey: list[dict] = field(default_factory=list) @dataclass class MigrationPlan: alias_map: dict[str, str] = field(default_factory=dict) # lowercased alias -> canonical id shadow_projects: list[ShadowProject] = field(default_factory=list) state_plans: list[StateRekeyPlan] = field(default_factory=list) memory_plans: list[MemoryRekeyPlan] = field(default_factory=list) interaction_plans: list[InteractionRekeyPlan] = field(default_factory=list) integrity_errors: list[str] = field(default_factory=list) @property def has_collisions(self) -> bool: return any(p.collisions for p in self.state_plans) @property def is_empty(self) -> bool: return ( not self.shadow_projects and not any(p.rows_to_rekey for p in self.memory_plans) and not any(p.rows_to_rekey for p in self.interaction_plans) ) def counts(self) -> dict: return { "shadow_projects": len(self.shadow_projects), "state_rekey_rows": sum(len(p.rows_to_rekey) for p in self.state_plans), "state_collisions": sum(len(p.collisions) for p in self.state_plans), "state_historical_drops": sum( len(p.historical_drops) for p in self.state_plans ), "memory_rekey_rows": sum(len(p.rows_to_rekey) for p in self.memory_plans), "memory_supersede_rows": sum(len(p.to_supersede) for p in self.memory_plans), "interaction_rekey_rows": sum(len(p.rows_to_rekey) for p in self.interaction_plans), } # --------------------------------------------------------------------------- # Plan building # --------------------------------------------------------------------------- def build_alias_map(registry_path: Path) -> dict[str, str]: """Load the registry and return {lowercased alias_or_id: canonical_id}. The canonical id itself is also in the map so later code can distinguish "already canonical" from "is an alias". """ if not registry_path.exists(): return {} payload = json.loads(registry_path.read_text(encoding="utf-8")) mapping: dict[str, str] = {} for entry in payload.get("projects", []): canonical = str(entry.get("id", "")).strip() if not canonical: continue mapping[canonical.lower()] = canonical for alias in entry.get("aliases", []): alias_str = str(alias).strip() if not alias_str: continue mapping[alias_str.lower()] = canonical return mapping def _dict_row(row: sqlite3.Row) -> dict: return {k: row[k] for k in row.keys()} def find_shadow_projects( conn: sqlite3.Connection, alias_map: dict[str, str] ) -> list[ShadowProject]: """Find rows in the projects table whose name is a registered alias that differs from the canonical project id.""" shadows: list[ShadowProject] = [] rows = conn.execute("SELECT id, name FROM projects").fetchall() for row in rows: name = (row["name"] or "").strip() if not name: continue canonical = alias_map.get(name.lower()) if canonical is None: continue # not in registry; leave it alone (unregistered-name fallback) if name == canonical: continue # already canonical; nothing to migrate shadows.append( ShadowProject( shadow_row_id=row["id"], shadow_name=name, canonical_project_id=canonical, ) ) return shadows def _find_or_plan_canonical_row( conn: sqlite3.Connection, canonical_project_id: str ) -> str | None: """Return the existing projects.id row whose name matches the canonical project id, or None if no such row exists yet. Case-insensitive lookup matches ``ensure_project``'s semantics. """ row = conn.execute( "SELECT id FROM projects WHERE lower(name) = lower(?)", (canonical_project_id,), ).fetchone() return row["id"] if row else None def check_canonical_integrity( conn: sqlite3.Connection, alias_map: dict[str, str] ) -> list[str]: """Refuse to run if any canonical project has more than one row. This shouldn't happen under normal operation, but if it does the migration can't proceed because we don't know which canonical row the shadow dependents should be rekeyed to. """ errors: list[str] = [] seen_canonicals: set[str] = set() for canonical in alias_map.values(): if canonical in seen_canonicals: continue seen_canonicals.add(canonical) count_row = conn.execute( "SELECT COUNT(*) AS c FROM projects WHERE lower(name) = lower(?)", (canonical,), ).fetchone() if count_row and count_row["c"] > 1: errors.append( f"canonical project '{canonical}' has {count_row['c']} rows in " f"the projects table; migration refuses to run until this is " f"resolved manually" ) return errors def plan_state_migration( conn: sqlite3.Connection, shadows: list[ShadowProject], ) -> list[StateRekeyPlan]: """Plan the project_state migration for every shadow project. Selects ALL shadow-attached state rows regardless of status, so superseded and invalid rows are preserved during the migration instead of cascade-deleted when the shadow project row is dropped. Per-row decision rules (see the table in the migration module docstring and in the commit that added this logic): - canonical triple is empty → clean rekey, any shadow status - canonical triple has same value → shadow is a duplicate, mark it superseded in place; its content lives on the canonical row - canonical triple has different value, both active → collision, apply refuses until a human resolves via POST /project/state - canonical triple has different value, shadow active and canonical inactive → shadow wins, canonical inactive row is deleted and shadow is rekeyed into its place - canonical triple has different value, shadow inactive → shadow is historically out-of-date, canonical wins by definition, the shadow row is recorded in historical_drops so the operator sees the audit loss explicitly before running --apply The last case is the only unavoidable data loss, and it is intrinsic to the UPSERT shape of the project_state table (at most one row per (project_id, category, key)). The report surfaces every drop with both shadow and canonical values so nothing disappears silently. """ plans: list[StateRekeyPlan] = [] for shadow in shadows: canonical_row_id = _find_or_plan_canonical_row( conn, shadow.canonical_project_id ) # If the canonical row doesn't exist yet we'll create it at # apply time; planning can use a placeholder. planned_canonical_row_id = canonical_row_id or "" plan = StateRekeyPlan( shadow_row_id=shadow.shadow_row_id, canonical_row_id=planned_canonical_row_id, canonical_project_id=shadow.canonical_project_id, ) # ALL state rows attached to the shadow, regardless of status. # Dropping the status filter is the codex-flagged fix: without # it, cascade delete of the shadow project silently destroys # superseded/invalid historical state, which is exactly the # audit loss a cleanup migration must not cause. state_rows = conn.execute( "SELECT * FROM project_state WHERE project_id = ? ORDER BY updated_at DESC", (shadow.shadow_row_id,), ).fetchall() for raw in state_rows: state = _dict_row(raw) if canonical_row_id is None: # No canonical row exists yet; any status rekeys cleanly. plan.rows_to_rekey.append(state) continue # Look for ANY row on the canonical project at the same # (category, key) — status-agnostic, because the UNIQUE # constraint is status-agnostic too. collision_row = conn.execute( "SELECT * FROM project_state " "WHERE project_id = ? AND category = ? AND key = ?", (canonical_row_id, state["category"], state["key"]), ).fetchone() if collision_row is None: plan.rows_to_rekey.append(state) continue canonical_state = _dict_row(collision_row) if canonical_state["value"] == state["value"]: # Same value on both sides — the shadow is a pure # duplicate. Mark it superseded in place; the content # is already preserved on the canonical row. state["_action"] = "supersede_shadow_duplicate" plan.rows_to_rekey.append(state) continue shadow_active = state["status"] == "active" canonical_active = canonical_state["status"] == "active" if shadow_active and canonical_active: # Two live values disagree — apply refuses until # the human picks a winner. plan.collisions.append( {"shadow": state, "canonical": canonical_state} ) continue if shadow_active and not canonical_active: # Shadow has the live value; canonical is stale. # Shadow wins by deleting the canonical inactive row # and rekeying the shadow into its place. state["_action"] = "replace_inactive_canonical" state["_canonical_to_delete"] = canonical_state plan.rows_to_rekey.append(state) continue # Shadow is inactive (superseded or invalid) and collides # with a canonical row at the same triple. The canonical # wins by definition of the UPSERT schema. Record this # as a historical drop so the operator sees it in the # plan output BEFORE running --apply. plan.historical_drops.append( {"shadow": state, "canonical": canonical_state} ) plans.append(plan) return plans def plan_memory_migration( conn: sqlite3.Connection, alias_map: dict[str, str] ) -> list[MemoryRekeyPlan]: plans: list[MemoryRekeyPlan] = [] # Group aliases by canonical to build one plan per canonical. aliases_by_canonical: dict[str, list[str]] = {} for alias, canonical in alias_map.items(): if alias == canonical.lower(): continue # canonical string is not an alias worth rekeying aliases_by_canonical.setdefault(canonical, []).append(alias) for canonical, aliases in aliases_by_canonical.items(): for alias_key in aliases: # memories store the string (mixed case allowed); find any # row whose project column (lowercased) matches the alias raw_rows = conn.execute( "SELECT * FROM memories WHERE lower(project) = lower(?)", (alias_key,), ).fetchall() if not raw_rows: continue plan = MemoryRekeyPlan(alias=alias_key, canonical_project_id=canonical) for raw in raw_rows: mem = _dict_row(raw) # Check for dedup collision against the canonical bucket dup = conn.execute( "SELECT * FROM memories WHERE memory_type = ? AND content = ? " "AND lower(project) = lower(?) AND status = ? AND id != ?", ( mem["memory_type"], mem["content"], canonical, mem["status"], mem["id"], ), ).fetchone() if dup is None: plan.rows_to_rekey.append(mem) continue dup_dict = _dict_row(dup) # Newer row wins; older is superseded. updated_at is # the tiebreaker because created_at is the storage # insert time and updated_at reflects last mutation. shadow_updated = mem.get("updated_at") or "" canonical_updated = dup_dict.get("updated_at") or "" if shadow_updated > canonical_updated: # Shadow is newer: canonical becomes superseded, # shadow rekeys into active. plan.rows_to_rekey.append(mem) plan.to_supersede.append(dup_dict) else: # Canonical is newer (or equal): shadow becomes # superseded in place (we still rekey it so its # project column is canonical, but its status flips # to superseded). mem["_action"] = "supersede" plan.to_supersede.append(mem) plans.append(plan) return plans def plan_interaction_migration( conn: sqlite3.Connection, alias_map: dict[str, str] ) -> list[InteractionRekeyPlan]: plans: list[InteractionRekeyPlan] = [] aliases_by_canonical: dict[str, list[str]] = {} for alias, canonical in alias_map.items(): if alias == canonical.lower(): continue aliases_by_canonical.setdefault(canonical, []).append(alias) for canonical, aliases in aliases_by_canonical.items(): for alias_key in aliases: raw_rows = conn.execute( "SELECT * FROM interactions WHERE lower(project) = lower(?)", (alias_key,), ).fetchall() if not raw_rows: continue plan = InteractionRekeyPlan(alias=alias_key, canonical_project_id=canonical) for raw in raw_rows: plan.rows_to_rekey.append(_dict_row(raw)) plans.append(plan) return plans def build_plan( conn: sqlite3.Connection, registry_path: Path ) -> MigrationPlan: alias_map = build_alias_map(registry_path) plan = MigrationPlan(alias_map=alias_map) if not alias_map: return plan plan.integrity_errors = check_canonical_integrity(conn, alias_map) if plan.integrity_errors: return plan plan.shadow_projects = find_shadow_projects(conn, alias_map) plan.state_plans = plan_state_migration(conn, plan.shadow_projects) plan.memory_plans = plan_memory_migration(conn, alias_map) plan.interaction_plans = plan_interaction_migration(conn, alias_map) return plan # --------------------------------------------------------------------------- # Apply # --------------------------------------------------------------------------- class MigrationRefused(RuntimeError): """Raised when the apply step refuses to run due to a safety rail.""" def apply_plan(conn: sqlite3.Connection, plan: MigrationPlan) -> dict: """Execute the plan inside a single SQLite transaction. Returns a summary dict. Raises ``MigrationRefused`` if the plan has integrity errors, has state collisions, or is empty (callers that want to apply an empty plan must check ``plan.is_empty`` themselves). """ if plan.integrity_errors: raise MigrationRefused( "migration refuses to run due to integrity errors: " + "; ".join(plan.integrity_errors) ) if plan.has_collisions: raise MigrationRefused( "migration refuses to run; the plan has " f"{sum(len(p.collisions) for p in plan.state_plans)} " "project_state collision(s) that require manual resolution" ) summary: dict = { "shadow_projects_deleted": 0, "state_rows_rekeyed": 0, "state_rows_merged_as_duplicate": 0, "state_rows_replaced_inactive_canonical": 0, "state_rows_historical_dropped": 0, "memory_rows_rekeyed": 0, "memory_rows_superseded": 0, "interaction_rows_rekeyed": 0, "canonical_rows_created": 0, } try: conn.execute("BEGIN") # --- Projects + project_state ---------------------------------- for sp_plan in plan.state_plans: canonical_row_id = sp_plan.canonical_row_id if canonical_row_id == "": # Create the canonical projects row now canonical_row_id = str(uuid.uuid4()) conn.execute( "INSERT INTO projects (id, name, description) " "VALUES (?, ?, ?)", ( canonical_row_id, sp_plan.canonical_project_id, "created by legacy alias migration", ), ) summary["canonical_rows_created"] += 1 for state in sp_plan.rows_to_rekey: action = state.get("_action") if action == "supersede_shadow_duplicate": # Same (category, key, value) already on canonical. # Mark the shadow state row superseded in place; # the content is preserved on the canonical row. # The shadow row will cascade-delete when its # parent projects row is dropped at the end of # this loop, which is fine because its value is # preserved on the canonical side. conn.execute( "UPDATE project_state SET status = 'superseded', " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", (state["id"],), ) summary["state_rows_merged_as_duplicate"] += 1 continue if action == "replace_inactive_canonical": # Shadow is active, canonical is stale (superseded # or invalid). Delete the canonical inactive row # and rekey the shadow in its place. SQLite's # default immediate constraint checking means the # DELETE must complete before the UPDATE for the # UNIQUE(project_id, category, key) constraint to # be happy. canonical_to_delete = state["_canonical_to_delete"] conn.execute( "DELETE FROM project_state WHERE id = ?", (canonical_to_delete["id"],), ) conn.execute( "UPDATE project_state SET project_id = ?, " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", (canonical_row_id, state["id"]), ) summary["state_rows_rekeyed"] += 1 summary["state_rows_replaced_inactive_canonical"] += 1 continue # Clean rekey (no collision at the triple) conn.execute( "UPDATE project_state SET project_id = ?, " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", (canonical_row_id, state["id"]), ) summary["state_rows_rekeyed"] += 1 # Historical drops are shadow rows that collide with a # canonical row at the same triple where the shadow is # inactive. No DB operation is needed — they'll be # cascade-deleted when the shadow project row is # dropped below. But we count them so the operator # sees the audit loss in the summary. summary["state_rows_historical_dropped"] += len(sp_plan.historical_drops) # Delete shadow projects rows after their state dependents # have moved. At this point, the only state rows still # attached to any shadow project are the historical_drops # entries, which are being deliberately discarded because # the canonical row at the same triple already wins. for sp_plan in plan.state_plans: conn.execute( "DELETE FROM projects WHERE id = ?", (sp_plan.shadow_row_id,) ) summary["shadow_projects_deleted"] += 1 # Also delete any shadow projects that had no state rows # attached (find_shadow_projects returned them but # plan_state_migration produced an empty plan entry for them). # They're already covered by the loop above because every # shadow has a state plan entry, even if empty. # --- Memories -------------------------------------------------- for mem_plan in plan.memory_plans: for mem in mem_plan.rows_to_rekey: conn.execute( "UPDATE memories SET project = ?, " "updated_at = CURRENT_TIMESTAMP WHERE id = ?", (mem_plan.canonical_project_id, mem["id"]), ) summary["memory_rows_rekeyed"] += 1 for mem in mem_plan.to_supersede: # Supersede the older/duplicate row AND rekey its # project to canonical so the historical audit trail # reflects the canonical project. conn.execute( "UPDATE memories SET status = 'superseded', " "project = ?, updated_at = CURRENT_TIMESTAMP " "WHERE id = ?", (mem_plan.canonical_project_id, mem["id"]), ) summary["memory_rows_superseded"] += 1 # --- Interactions ---------------------------------------------- for ix_plan in plan.interaction_plans: for ix in ix_plan.rows_to_rekey: conn.execute( "UPDATE interactions SET project = ? WHERE id = ?", (ix_plan.canonical_project_id, ix["id"]), ) summary["interaction_rows_rekeyed"] += 1 conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise return summary # --------------------------------------------------------------------------- # Reporting # --------------------------------------------------------------------------- def render_plan_text(plan: MigrationPlan) -> str: lines: list[str] = [] lines.append("=" * 72) lines.append("Legacy alias migration — plan") lines.append("=" * 72) if plan.integrity_errors: lines.append("") lines.append("INTEGRITY ERRORS — migration cannot run:") for err in plan.integrity_errors: lines.append(f" - {err}") return "\n".join(lines) lines.append("") if not plan.alias_map: lines.append("No registered aliases; nothing to plan.") return "\n".join(lines) canonicals = sorted(set(plan.alias_map.values())) lines.append(f"Known canonical projects ({len(canonicals)}):") for canonical in canonicals: aliases = [ alias for alias, canon in plan.alias_map.items() if canon == canonical and alias != canonical.lower() ] lines.append(f" - {canonical} (aliases: {', '.join(aliases) or 'none'})") lines.append("") counts = plan.counts() lines.append("Counts:") lines.append(f" shadow projects : {counts['shadow_projects']}") lines.append(f" state rekey rows : {counts['state_rekey_rows']}") lines.append(f" state collisions (block) : {counts['state_collisions']}") lines.append(f" state historical drops : {counts['state_historical_drops']}") lines.append(f" memory rekey rows : {counts['memory_rekey_rows']}") lines.append(f" memory supersede : {counts['memory_supersede_rows']}") lines.append(f" interaction rekey : {counts['interaction_rekey_rows']}") if plan.is_empty: lines.append("") lines.append("Nothing to migrate. The database is clean.") return "\n".join(lines) if plan.shadow_projects: lines.append("") lines.append("Shadow projects (alias-named rows that will be deleted):") for shadow in plan.shadow_projects: lines.append( f" - '{shadow.shadow_name}' (row {shadow.shadow_row_id[:8]}) " f"-> '{shadow.canonical_project_id}'" ) if plan.has_collisions: lines.append("") lines.append("*** STATE COLLISIONS — apply will REFUSE until resolved: ***") for sp_plan in plan.state_plans: for collision in sp_plan.collisions: shadow = collision["shadow"] canonical = collision["canonical"] lines.append( f" - [{shadow['category']}/{shadow['key']}] " f"shadow value={shadow['value']!r} vs " f"canonical value={canonical['value']!r}" ) lines.append("") lines.append( "Resolution: use POST /project/state to pick the winning value, " "then re-run --apply." ) return "\n".join(lines) # Historical drops are explicit, non-blocking audit-loss events. # They're surfaced so the operator sees them BEFORE running --apply # and can decide to manually preserve the shadow values if any of # them are worth keeping as separate records. historical_drops = [ drop for sp_plan in plan.state_plans for drop in sp_plan.historical_drops ] if historical_drops: lines.append("") lines.append( f"*** {len(historical_drops)} HISTORICAL DROPS — shadow inactive " "state rows that collide with a canonical row: ***" ) for drop in historical_drops: shadow = drop["shadow"] canonical = drop["canonical"] lines.append( f" - [{shadow['category']}/{shadow['key']}] " f"shadow({shadow['status']})={shadow['value']!r} -> " f"dropped in favor of canonical({canonical['status']})={canonical['value']!r}" ) lines.append("") lines.append( "These rows are superseded/invalid on the shadow side. The " "UNIQUE(project_id, category, key) constraint prevents " "preserving them alongside the canonical row. If any of these " "values are worth keeping as separate audit records, manually " "copy them out before running --apply." ) lines.append("") lines.append("Apply plan (if --apply is given):") for sp_plan in plan.state_plans: if sp_plan.rows_to_rekey: lines.append( f" - rekey {len(sp_plan.rows_to_rekey)} project_state row(s) " f"from shadow {sp_plan.shadow_row_id[:8]} -> '{sp_plan.canonical_project_id}'" ) for mem_plan in plan.memory_plans: if mem_plan.rows_to_rekey: lines.append( f" - rekey {len(mem_plan.rows_to_rekey)} memory row(s) from " f"project='{mem_plan.alias}' -> '{mem_plan.canonical_project_id}'" ) if mem_plan.to_supersede: lines.append( f" - supersede {len(mem_plan.to_supersede)} duplicate " f"memory row(s) under '{mem_plan.canonical_project_id}'" ) for ix_plan in plan.interaction_plans: if ix_plan.rows_to_rekey: lines.append( f" - rekey {len(ix_plan.rows_to_rekey)} interaction row(s) " f"from project='{ix_plan.alias}' -> '{ix_plan.canonical_project_id}'" ) return "\n".join(lines) def plan_to_json_dict(plan: MigrationPlan) -> dict: return { "alias_map": plan.alias_map, "integrity_errors": plan.integrity_errors, "counts": plan.counts(), "is_empty": plan.is_empty, "has_collisions": plan.has_collisions, "shadow_projects": [asdict(sp) for sp in plan.shadow_projects], "state_plans": [ { "shadow_row_id": sp.shadow_row_id, "canonical_row_id": sp.canonical_row_id, "canonical_project_id": sp.canonical_project_id, "rows_to_rekey": sp.rows_to_rekey, "collisions": sp.collisions, "historical_drops": sp.historical_drops, } for sp in plan.state_plans ], "memory_plans": [ { "alias": mp.alias, "canonical_project_id": mp.canonical_project_id, "rows_to_rekey": mp.rows_to_rekey, "to_supersede": mp.to_supersede, } for mp in plan.memory_plans ], "interaction_plans": [ { "alias": ip.alias, "canonical_project_id": ip.canonical_project_id, "rows_to_rekey": ip.rows_to_rekey, } for ip in plan.interaction_plans ], } def write_report( plan: MigrationPlan, summary: dict | None, db_path: Path, registry_path: Path, mode: str, report_dir: Path, ) -> Path: report_dir.mkdir(parents=True, exist_ok=True) stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") report_path = report_dir / f"legacy-aliases-{stamp}.json" payload = { "created_at": datetime.now(timezone.utc).isoformat(), "mode": mode, "db_path": str(db_path), "registry_path": str(registry_path), "plan": plan_to_json_dict(plan), "apply_summary": summary, } report_path.write_text( json.dumps(payload, indent=2, default=str, ensure_ascii=True) + "\n", encoding="utf-8", ) return report_path # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def _open_db(db_path: Path) -> sqlite3.Connection: conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def run( db_path: Path, registry_path: Path, apply: bool, allow_empty: bool, report_dir: Path, out_json: bool, ) -> int: # Ensure the schema exists (init_db handles migrations too). We # do this by importing the app-side init path and pointing it at # the caller's db_path via ATOCORE_DATA_DIR + ATOCORE_DB_DIR. os.environ["ATOCORE_DB_DIR"] = str(db_path.parent) os.environ["ATOCORE_PROJECT_REGISTRY_PATH"] = str(registry_path) import atocore.config as _config _config.settings = _config.Settings() from atocore.context.project_state import init_project_state_schema from atocore.models.database import init_db init_db() init_project_state_schema() conn = _open_db(db_path) try: plan = build_plan(conn, registry_path) if out_json: print(json.dumps(plan_to_json_dict(plan), indent=2, default=str)) else: print(render_plan_text(plan)) if not apply: # Dry-run always writes a report for audit write_report( plan, summary=None, db_path=db_path, registry_path=registry_path, mode="dry-run", report_dir=report_dir, ) return 0 # --apply path if plan.integrity_errors: print("\nRefused: integrity errors prevent apply.", file=sys.stderr) return 2 if plan.has_collisions: print( "\nRefused: project_state collisions prevent apply.", file=sys.stderr, ) return 2 if plan.is_empty and not allow_empty: print( "\nRefused: plan is empty. Pass --allow-empty to confirm.", file=sys.stderr, ) return 3 summary = apply_plan(conn, plan) print("\nApply complete:") for k, v in summary.items(): print(f" {k}: {v}") report_path = write_report( plan, summary=summary, db_path=db_path, registry_path=registry_path, mode="apply", report_dir=report_dir, ) print(f"\nReport: {report_path}") return 0 finally: conn.close() def main() -> int: parser = argparse.ArgumentParser(description=__doc__.split("\n\n", 1)[0]) parser.add_argument( "--registry", type=Path, default=None, help="override ATOCORE_PROJECT_REGISTRY_PATH for this run", ) parser.add_argument( "--db", type=Path, default=None, help="override the AtoCore SQLite DB path", ) parser.add_argument("--apply", action="store_true", help="actually mutate the DB") parser.add_argument( "--allow-empty", action="store_true", help="don't refuse when --apply is called with an empty plan", ) parser.add_argument( "--report-dir", type=Path, default=None, help="where to write the JSON report (default: data/migrations/)", ) parser.add_argument( "--json", action="store_true", help="emit the plan as JSON instead of human prose", ) args = parser.parse_args() # Resolve the registry and db paths. If the caller didn't override # them, fall back to what the app's config resolves today. import atocore.config as _config _config.settings = _config.Settings() registry_path = args.registry or _config.settings.resolved_project_registry_path db_path = args.db or _config.settings.db_path report_dir = ( args.report_dir or _config.settings.resolved_data_dir / "migrations" ) return run( db_path=Path(db_path), registry_path=Path(registry_path), apply=args.apply, allow_empty=args.allow_empty, report_dir=Path(report_dir), out_json=args.json, ) if __name__ == "__main__": raise SystemExit(main())