Codex caught a real data-loss bug in the legacy alias migration
shipped in 7e60f5a. plan_state_migration filtered state rows to
status='active' only, then apply_plan deleted the shadow projects
row at the end. Because project_state.project_id has
ON DELETE CASCADE, any superseded or invalid state rows still
attached to the shadow project got silently cascade-deleted —
exactly the audit loss a cleanup migration must not cause.
This commit fixes the bug and adds regression tests that lock in
the invariant "shadow state of every status is accounted for".
Root cause
----------
scripts/migrate_legacy_aliases.py::plan_state_migration was:
"SELECT * FROM project_state WHERE project_id = ? AND status = 'active'"
which only found live rows. Any historical row (status in
'superseded' or 'invalid') was invisible to the plan, so the apply
step had nothing to rekey for it. Then the shadow project row was
deleted at the end, cascade-deleting every unplanned row.
The fix
-------
plan_state_migration now selects ALL state rows attached to the
shadow project regardless of status, and handles every row per a
per-status decision table:
| Shadow status | Canonical at same triple? | Values | Action |
|---------------|---------------------------|------------|--------------------------------|
| any | no | — | clean rekey |
| any | yes | same | shadow superseded in place |
| active | yes, active | different | COLLISION, apply refuses |
| active | yes, inactive | different | shadow wins, canonical deleted |
| inactive | yes, any | different | historical drop (logged) |
Four changes in the script:
1. SELECT drops the status filter so the plan walks every row.
2. New StateRekeyPlan.historical_drops list captures the shadow
rows that lose to a canonical row at the same triple because the
shadow is already inactive. These are the only unavoidable data
losses, and they happen because the UNIQUE(project_id, category,
key) constraint on project_state doesn't allow two rows per
triple regardless of status.
3. New apply action 'replace_inactive_canonical' for the
shadow-active-vs-canonical-inactive case. At apply time the
canonical inactive row is DELETEd first (SQLite's default
immediate constraint checking) and then the shadow is UPDATEd
into its place in two separate statements. Adds a new
state_rows_replaced_inactive_canonical counter.
4. New apply counter state_rows_historical_dropped for audit
transparency. The rows themselves are still cascade-deleted
when the shadow project row is dropped, but they're counted
and reported.
Five places render_plan_text and plan_to_json_dict updated:
- counts() gains state_historical_drops
- render_plan_text prints a 'historical drops' section with each
shadow-canonical pair and their statuses when there are any, so
the operator sees the audit loss BEFORE running --apply
- The new section explicitly tells the operator: "if any of these
values are worth keeping as separate audit records, manually copy
them out before running --apply"
- plan_to_json_dict carries historical_drops into the JSON report
- The state counts table in the human report now shows both
'state collisions (block)' and 'state historical drops' as
separate lines so the operator can distinguish
"apply will refuse" from "apply will drop historical rows"
Regression tests (3 new, all green)
-----------------------------------
tests/test_migrate_legacy_aliases.py:
- test_apply_preserves_superseded_shadow_state_when_no_collision:
the direct regression for the codex finding. Seeds a shadow with
a superseded state row on a triple the canonical doesn't have,
runs the migration, verifies via raw SQL that the row is now
attached to the canonical projects row and still has status
'superseded'. This is the test that would have failed before
the fix.
- test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple:
covers the unavoidable data-loss case. Seeds shadow superseded
+ canonical active at the same triple with different values,
verifies plan.counts() reports one historical_drop, runs apply,
verifies the canonical value is preserved and the shadow value
is gone.
- test_apply_replaces_inactive_canonical_with_active_shadow:
covers the cross-contamination case where shadow has live value
and canonical has a stale invalid row. Shadow wins by deleting
canonical and rekeying in its place. Verifies the counter and
the final state.
Plus _seed_state_row now accepts a status kwarg so the seeding
helper can create superseded/invalid rows directly.
test_dry_run_on_empty_registry_reports_empty_plan was updated to
include the new state_historical_drops key in the expected counts
dict (all zero for an empty plan, so the test shape is the same).
Full suite: 197 passing (was 194), 1 warning. The +3 is the three
new regression tests.
What this commit does NOT do
----------------------------
- Does NOT try to preserve historical shadow rows that collide
with a canonical row at the same triple. That would require a
schema change (adding (id) to the UNIQUE key, or a separate
history table) and isn't in scope for a cleanup migration.
The operator sees these as explicit 'historical drops' in the
plan output and can copy them out manually if any are worth
preserving.
- Does NOT change any behavior for rows that were already
reachable from the canonicalized read path. The fix only
affects legacy rows whose project_id points at a shadow row.
- Does NOT re-verify the earlier happy-path tests beyond the full
suite confirming them still green.
803 lines
26 KiB
Python
803 lines
26 KiB
Python
"""Tests for scripts/migrate_legacy_aliases.py.
|
|
|
|
The migration script closes the compatibility gap documented in
|
|
docs/architecture/project-identity-canonicalization.md. These tests
|
|
cover:
|
|
|
|
- empty/clean database behavior
|
|
- shadow projects detection
|
|
- state rekey without collisions
|
|
- state collision detection + apply refusal
|
|
- memory rekey + supersession of duplicates
|
|
- interaction rekey
|
|
- end-to-end apply on a realistic shadow
|
|
- idempotency (running twice produces the same final state)
|
|
- report artifact is written
|
|
- the pre-fix regression gap is actually closed after migration
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from atocore.context.project_state import (
|
|
get_state,
|
|
init_project_state_schema,
|
|
)
|
|
from atocore.models.database import init_db
|
|
|
|
# Make scripts/ importable
|
|
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(_REPO_ROOT / "scripts"))
|
|
|
|
import migrate_legacy_aliases as mig # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers that seed "legacy" rows the way they would have looked before fb6298a
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _open_db_connection():
|
|
"""Open a direct SQLite connection to the test data dir's DB."""
|
|
import atocore.config as config
|
|
|
|
conn = sqlite3.connect(str(config.settings.db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
return conn
|
|
|
|
|
|
def _seed_shadow_project(
|
|
conn: sqlite3.Connection, shadow_name: str
|
|
) -> str:
|
|
"""Insert a projects row keyed under an alias, like the old set_state would have."""
|
|
project_id = str(uuid.uuid4())
|
|
conn.execute(
|
|
"INSERT INTO projects (id, name, description) VALUES (?, ?, ?)",
|
|
(project_id, shadow_name, f"shadow row for {shadow_name}"),
|
|
)
|
|
conn.commit()
|
|
return project_id
|
|
|
|
|
|
def _seed_state_row(
|
|
conn: sqlite3.Connection,
|
|
project_id: str,
|
|
category: str,
|
|
key: str,
|
|
value: str,
|
|
status: str = "active",
|
|
) -> str:
|
|
row_id = str(uuid.uuid4())
|
|
conn.execute(
|
|
"INSERT INTO project_state "
|
|
"(id, project_id, category, key, value, source, confidence, status) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(row_id, project_id, category, key, value, "legacy-test", 1.0, status),
|
|
)
|
|
conn.commit()
|
|
return row_id
|
|
|
|
|
|
def _seed_memory_row(
|
|
conn: sqlite3.Connection,
|
|
memory_type: str,
|
|
content: str,
|
|
project: str,
|
|
status: str = "active",
|
|
) -> str:
|
|
row_id = str(uuid.uuid4())
|
|
conn.execute(
|
|
"INSERT INTO memories "
|
|
"(id, memory_type, content, project, source_chunk_id, confidence, status) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(row_id, memory_type, content, project, None, 1.0, status),
|
|
)
|
|
conn.commit()
|
|
return row_id
|
|
|
|
|
|
def _seed_interaction_row(
|
|
conn: sqlite3.Connection, prompt: str, project: str
|
|
) -> str:
|
|
row_id = str(uuid.uuid4())
|
|
conn.execute(
|
|
"INSERT INTO interactions "
|
|
"(id, prompt, context_pack, response_summary, response, "
|
|
" memories_used, chunks_used, client, session_id, project, created_at) "
|
|
"VALUES (?, ?, '{}', '', '', '[]', '[]', 'legacy-test', '', ?, '2026-04-01 12:00:00')",
|
|
(row_id, prompt, project),
|
|
)
|
|
conn.commit()
|
|
return row_id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# plan-building tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(tmp_data_dir):
|
|
init_db()
|
|
init_project_state_schema()
|
|
|
|
|
|
def test_dry_run_on_empty_registry_reports_empty_plan(tmp_data_dir):
|
|
"""Empty registry -> empty alias map -> empty plan."""
|
|
registry_path = tmp_data_dir / "empty-registry.json"
|
|
registry_path.write_text('{"projects": []}', encoding="utf-8")
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert plan.alias_map == {}
|
|
assert plan.is_empty
|
|
assert not plan.has_collisions
|
|
assert plan.counts() == {
|
|
"shadow_projects": 0,
|
|
"state_rekey_rows": 0,
|
|
"state_collisions": 0,
|
|
"state_historical_drops": 0,
|
|
"memory_rekey_rows": 0,
|
|
"memory_supersede_rows": 0,
|
|
"interaction_rekey_rows": 0,
|
|
}
|
|
|
|
|
|
def test_dry_run_on_clean_registered_db_reports_empty_plan(project_registry):
|
|
"""A registry with projects but no legacy rows -> empty plan."""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert plan.alias_map != {}
|
|
assert plan.is_empty
|
|
|
|
|
|
def test_dry_run_finds_shadow_project(project_registry):
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
_seed_shadow_project(conn, "p05")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert len(plan.shadow_projects) == 1
|
|
assert plan.shadow_projects[0].shadow_name == "p05"
|
|
assert plan.shadow_projects[0].canonical_project_id == "p05-interferometer"
|
|
|
|
|
|
def test_dry_run_plans_state_rekey_without_collisions(project_registry):
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
_seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1 ingestion")
|
|
_seed_state_row(conn, shadow_id, "decision", "lateral_support", "GF-PTFE")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert len(plan.state_plans) == 1
|
|
sp = plan.state_plans[0]
|
|
assert len(sp.rows_to_rekey) == 2
|
|
assert sp.collisions == []
|
|
assert not plan.has_collisions
|
|
|
|
|
|
def test_dry_run_detects_state_collision(project_registry):
|
|
"""Shadow and canonical both have state under the same (category, key) with different values."""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
canonical_id = _seed_shadow_project(conn, "p05-interferometer")
|
|
_seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1")
|
|
_seed_state_row(
|
|
conn, canonical_id, "status", "next_focus", "Wave 2"
|
|
)
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert plan.has_collisions
|
|
collision = plan.state_plans[0].collisions[0]
|
|
assert collision["shadow"]["value"] == "Wave 1"
|
|
assert collision["canonical"]["value"] == "Wave 2"
|
|
|
|
|
|
def test_dry_run_plans_memory_rekey_and_supersession(project_registry):
|
|
registry_path = project_registry(
|
|
("p04-gigabit", ["p04", "gigabit"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
# A clean memory under the alias that will just be rekeyed
|
|
_seed_memory_row(conn, "project", "clean rekey memory", "p04")
|
|
# A memory that collides with an existing canonical memory
|
|
_seed_memory_row(conn, "project", "duplicate content", "p04")
|
|
_seed_memory_row(conn, "project", "duplicate content", "p04-gigabit")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
# There's exactly one memory plan (one alias matched)
|
|
assert len(plan.memory_plans) == 1
|
|
mp = plan.memory_plans[0]
|
|
# Two rows are candidates for rekey or supersession — one clean,
|
|
# one duplicate. The duplicate is handled via to_supersede; the
|
|
# other via rows_to_rekey.
|
|
total_affected = len(mp.rows_to_rekey) + len(mp.to_supersede)
|
|
assert total_affected == 2
|
|
|
|
|
|
def test_dry_run_plans_interaction_rekey(project_registry):
|
|
registry_path = project_registry(
|
|
("p06-polisher", ["p06", "polisher"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
_seed_interaction_row(conn, "quick capture under alias", "polisher")
|
|
_seed_interaction_row(conn, "another alias-keyed row", "p06")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
total = sum(len(p.rows_to_rekey) for p in plan.interaction_plans)
|
|
assert total == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_apply_refuses_on_state_collision(project_registry):
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
canonical_id = _seed_shadow_project(conn, "p05-interferometer")
|
|
_seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1")
|
|
_seed_state_row(conn, canonical_id, "status", "next_focus", "Wave 2")
|
|
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert plan.has_collisions
|
|
|
|
with pytest.raises(mig.MigrationRefused):
|
|
mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_apply_migrates_clean_shadow_end_to_end(project_registry):
|
|
"""The happy path: one shadow project with clean state rows, rekey into a freshly-created canonical row, verify reachability via get_state."""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
_seed_state_row(
|
|
conn, shadow_id, "status", "next_focus", "Wave 1 ingestion"
|
|
)
|
|
_seed_state_row(
|
|
conn, shadow_id, "decision", "lateral_support", "GF-PTFE"
|
|
)
|
|
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert not plan.has_collisions
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary["state_rows_rekeyed"] == 2
|
|
assert summary["shadow_projects_deleted"] == 1
|
|
assert summary["canonical_rows_created"] == 1
|
|
|
|
# The regression gap is now closed: the service layer can see
|
|
# the state under the canonical id via either the alias OR the
|
|
# canonical.
|
|
via_alias = get_state("p05")
|
|
via_canonical = get_state("p05-interferometer")
|
|
assert len(via_alias) == 2
|
|
assert len(via_canonical) == 2
|
|
values = {entry.value for entry in via_canonical}
|
|
assert values == {"Wave 1 ingestion", "GF-PTFE"}
|
|
|
|
|
|
def test_apply_drops_shadow_state_duplicate_without_collision(project_registry):
|
|
"""Shadow and canonical both have the same (category, key, value) — shadow gets marked superseded rather than hitting the UNIQUE constraint."""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
canonical_id = _seed_shadow_project(conn, "p05-interferometer")
|
|
_seed_state_row(
|
|
conn, shadow_id, "status", "next_focus", "Wave 1 ingestion"
|
|
)
|
|
_seed_state_row(
|
|
conn, canonical_id, "status", "next_focus", "Wave 1 ingestion"
|
|
)
|
|
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert not plan.has_collisions
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary["state_rows_merged_as_duplicate"] == 1
|
|
|
|
via_canonical = get_state("p05-interferometer")
|
|
# Exactly one active row survives
|
|
assert len(via_canonical) == 1
|
|
assert via_canonical[0].value == "Wave 1 ingestion"
|
|
|
|
|
|
def test_apply_preserves_superseded_shadow_state_when_no_collision(project_registry):
|
|
"""Regression test for the codex-flagged data-loss bug.
|
|
|
|
Before the fix, plan_state_migration only selected status='active'
|
|
rows. Any superseded or invalid row on the shadow project was
|
|
invisible to the plan and got silently cascade-deleted when the
|
|
shadow projects row was dropped at the end of apply. That's
|
|
exactly the kind of audit loss a cleanup migration must not cause.
|
|
|
|
This test seeds a shadow project with a superseded state row on
|
|
a triple the canonical project doesn't have, runs the migration,
|
|
and verifies the row survived and is now attached to the
|
|
canonical project (still with status='superseded').
|
|
"""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
# Superseded row on a triple the canonical won't have
|
|
_seed_state_row(
|
|
conn,
|
|
shadow_id,
|
|
"status",
|
|
"historical_phase",
|
|
"Phase 0 legacy",
|
|
status="superseded",
|
|
)
|
|
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert not plan.has_collisions
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
# The superseded row should have been rekeyed, not dropped
|
|
assert summary["state_rows_rekeyed"] == 1
|
|
assert summary["state_rows_historical_dropped"] == 0
|
|
|
|
# Verify via raw SQL that the row is now attached to the canonical
|
|
# projects row and still has status='superseded'
|
|
conn = _open_db_connection()
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT ps.status, ps.value, p.name "
|
|
"FROM project_state ps JOIN projects p ON ps.project_id = p.id "
|
|
"WHERE ps.category = ? AND ps.key = ?",
|
|
("status", "historical_phase"),
|
|
).fetchone()
|
|
finally:
|
|
conn.close()
|
|
|
|
assert row is not None, "superseded shadow row was lost during migration"
|
|
assert row["status"] == "superseded"
|
|
assert row["value"] == "Phase 0 legacy"
|
|
assert row["name"] == "p05-interferometer"
|
|
|
|
|
|
def test_apply_drops_shadow_inactive_row_when_canonical_holds_same_triple(project_registry):
|
|
"""Shadow is inactive (superseded) and collides with an active canonical row.
|
|
|
|
The canonical wins by definition of the UPSERT schema. The shadow
|
|
row is recorded as a historical_drop in the plan so the operator
|
|
sees the audit loss, and the apply cascade-deletes it via the
|
|
shadow projects row. This is the unavoidable data-loss case
|
|
documented in the migration module docstring.
|
|
"""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
canonical_id = _seed_shadow_project(conn, "p05-interferometer")
|
|
|
|
# Shadow has a superseded value on a triple where the canonical
|
|
# has a different active value. Can't preserve both: UNIQUE
|
|
# allows only one row per triple.
|
|
_seed_state_row(
|
|
conn,
|
|
shadow_id,
|
|
"status",
|
|
"next_focus",
|
|
"Old wave 1",
|
|
status="superseded",
|
|
)
|
|
_seed_state_row(
|
|
conn,
|
|
canonical_id,
|
|
"status",
|
|
"next_focus",
|
|
"Wave 2 trusted-operational",
|
|
status="active",
|
|
)
|
|
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert not plan.has_collisions # not an active-vs-active collision
|
|
assert plan.counts()["state_historical_drops"] == 1
|
|
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary["state_rows_historical_dropped"] == 1
|
|
|
|
# The canonical's active row survives unchanged
|
|
via_canonical = get_state("p05-interferometer")
|
|
active_next_focus = [
|
|
e
|
|
for e in via_canonical
|
|
if e.category == "status" and e.key == "next_focus"
|
|
]
|
|
assert len(active_next_focus) == 1
|
|
assert active_next_focus[0].value == "Wave 2 trusted-operational"
|
|
|
|
|
|
def test_apply_replaces_inactive_canonical_with_active_shadow(project_registry):
|
|
"""Shadow is active, canonical has an inactive row at the same triple.
|
|
|
|
The shadow wins: canonical inactive row is deleted, shadow is
|
|
rekeyed into canonical's project_id. This covers the
|
|
cross-contamination case where the old alias path was used for
|
|
the live value while the canonical path had a stale row.
|
|
"""
|
|
registry_path = project_registry(
|
|
("p06-polisher", ["p06", "polisher"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p06")
|
|
canonical_id = _seed_shadow_project(conn, "p06-polisher")
|
|
|
|
# Canonical has a stale invalid row; shadow has the live value.
|
|
_seed_state_row(
|
|
conn,
|
|
canonical_id,
|
|
"decision",
|
|
"frame",
|
|
"Old frame (no longer current)",
|
|
status="invalid",
|
|
)
|
|
_seed_state_row(
|
|
conn,
|
|
shadow_id,
|
|
"decision",
|
|
"frame",
|
|
"kinematic mount frame",
|
|
status="active",
|
|
)
|
|
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert not plan.has_collisions
|
|
assert plan.counts()["state_historical_drops"] == 0
|
|
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary["state_rows_replaced_inactive_canonical"] == 1
|
|
|
|
# The active shadow value now lives on the canonical row
|
|
via_canonical = get_state("p06-polisher")
|
|
frame_entries = [
|
|
e for e in via_canonical if e.category == "decision" and e.key == "frame"
|
|
]
|
|
assert len(frame_entries) == 1
|
|
assert frame_entries[0].value == "kinematic mount frame"
|
|
|
|
# Confirm via raw SQL that the previously-inactive canonical row
|
|
# no longer exists
|
|
conn = _open_db_connection()
|
|
try:
|
|
stale = conn.execute(
|
|
"SELECT COUNT(*) AS c FROM project_state WHERE value = ?",
|
|
("Old frame (no longer current)",),
|
|
).fetchone()
|
|
finally:
|
|
conn.close()
|
|
assert stale["c"] == 0
|
|
|
|
|
|
def test_apply_migrates_memories(project_registry):
|
|
registry_path = project_registry(
|
|
("p04-gigabit", ["p04", "gigabit"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
_seed_memory_row(conn, "project", "lateral support uses GF-PTFE", "p04")
|
|
_seed_memory_row(conn, "preference", "I prefer descriptive commits", "gigabit")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary["memory_rows_rekeyed"] == 2
|
|
|
|
# Both memories should now read as living under the canonical id
|
|
from atocore.memory.service import get_memories
|
|
|
|
rows = get_memories(project="p04-gigabit", limit=50)
|
|
contents = {m.content for m in rows}
|
|
assert "lateral support uses GF-PTFE" in contents
|
|
assert "I prefer descriptive commits" in contents
|
|
|
|
|
|
def test_apply_migrates_interactions(project_registry):
|
|
registry_path = project_registry(
|
|
("p06-polisher", ["p06", "polisher"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
_seed_interaction_row(conn, "alias-keyed 1", "polisher")
|
|
_seed_interaction_row(conn, "alias-keyed 2", "p06")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
summary = mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary["interaction_rows_rekeyed"] == 2
|
|
|
|
from atocore.interactions.service import list_interactions
|
|
|
|
rows = list_interactions(project="p06-polisher", limit=50)
|
|
prompts = {i.prompt for i in rows}
|
|
assert prompts == {"alias-keyed 1", "alias-keyed 2"}
|
|
|
|
|
|
def test_apply_is_idempotent(project_registry):
|
|
"""Running apply twice produces the same final state as running it once."""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
_seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1")
|
|
_seed_memory_row(conn, "project", "m1", "p05")
|
|
_seed_interaction_row(conn, "i1", "p05")
|
|
|
|
# first apply
|
|
plan_a = mig.build_plan(conn, registry_path)
|
|
summary_a = mig.apply_plan(conn, plan_a)
|
|
|
|
# second apply: plan should be empty
|
|
plan_b = mig.build_plan(conn, registry_path)
|
|
assert plan_b.is_empty
|
|
|
|
# forcing a second apply on the empty plan via the function
|
|
# directly should also succeed as a no-op (caller normally
|
|
# has to pass --allow-empty through the CLI, but apply_plan
|
|
# itself doesn't enforce that — the refusal is in run())
|
|
summary_b = mig.apply_plan(conn, plan_b)
|
|
finally:
|
|
conn.close()
|
|
|
|
assert summary_a["state_rows_rekeyed"] == 1
|
|
assert summary_a["memory_rows_rekeyed"] == 1
|
|
assert summary_a["interaction_rows_rekeyed"] == 1
|
|
assert summary_b["state_rows_rekeyed"] == 0
|
|
assert summary_b["memory_rows_rekeyed"] == 0
|
|
assert summary_b["interaction_rows_rekeyed"] == 0
|
|
|
|
|
|
def test_apply_refuses_with_integrity_errors(project_registry):
|
|
"""If the projects table has two case-variant rows for the canonical id, refuse.
|
|
|
|
The projects.name column has a case-sensitive UNIQUE constraint,
|
|
so exact duplicates can't exist. But case-variant rows
|
|
``p05-interferometer`` and ``P05-Interferometer`` can both
|
|
survive the UNIQUE constraint while both matching the
|
|
case-insensitive ``lower(name) = lower(?)`` lookup that the
|
|
migration uses to find the canonical row. That ambiguity
|
|
(which canonical row should dependents rekey into?) is exactly
|
|
the integrity failure the migration is guarding against.
|
|
"""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
_seed_shadow_project(conn, "p05-interferometer")
|
|
_seed_shadow_project(conn, "P05-Interferometer")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
assert plan.integrity_errors
|
|
with pytest.raises(mig.MigrationRefused):
|
|
mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# reporting tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_plan_to_json_dict_is_serializable(project_registry):
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
_seed_state_row(conn, shadow_id, "status", "next_focus", "Wave 1")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
payload = mig.plan_to_json_dict(plan)
|
|
# Must be JSON-serializable
|
|
json_str = json.dumps(payload, default=str)
|
|
assert "p05-interferometer" in json_str
|
|
assert payload["counts"]["state_rekey_rows"] == 1
|
|
|
|
|
|
def test_write_report_creates_file(tmp_path, project_registry):
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
report_dir = tmp_path / "reports"
|
|
report_path = mig.write_report(
|
|
plan,
|
|
summary=None,
|
|
db_path=Path("/tmp/fake.db"),
|
|
registry_path=registry_path,
|
|
mode="dry-run",
|
|
report_dir=report_dir,
|
|
)
|
|
assert report_path.exists()
|
|
payload = json.loads(report_path.read_text(encoding="utf-8"))
|
|
assert payload["mode"] == "dry-run"
|
|
assert "plan" in payload
|
|
|
|
|
|
def test_render_plan_text_on_empty_plan(project_registry):
|
|
registry_path = project_registry() # empty
|
|
conn = _open_db_connection()
|
|
try:
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
text = mig.render_plan_text(plan)
|
|
assert "nothing to plan" in text.lower()
|
|
|
|
|
|
def test_render_plan_text_on_collision(project_registry):
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
canonical_id = _seed_shadow_project(conn, "p05-interferometer")
|
|
_seed_state_row(conn, shadow_id, "status", "phase", "A")
|
|
_seed_state_row(conn, canonical_id, "status", "phase", "B")
|
|
plan = mig.build_plan(conn, registry_path)
|
|
finally:
|
|
conn.close()
|
|
|
|
text = mig.render_plan_text(plan)
|
|
assert "COLLISION" in text.upper()
|
|
assert "REFUSE" in text.upper() or "refuse" in text.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# gap-closed companion test — the flip side of
|
|
# test_legacy_alias_keyed_state_is_invisible_until_migrated in
|
|
# test_project_state.py. After running this migration, the legacy row
|
|
# IS reachable via the canonical id.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_legacy_alias_gap_is_closed_after_migration(project_registry):
|
|
"""End-to-end regression test for the canonicalization gap.
|
|
|
|
Simulates the exact scenario from
|
|
test_legacy_alias_keyed_state_is_invisible_until_migrated in
|
|
test_project_state.py — a shadow projects row with a state row
|
|
pointing at it. Runs the migration. Verifies the state is now
|
|
reachable via the canonical id.
|
|
"""
|
|
registry_path = project_registry(
|
|
("p05-interferometer", ["p05", "interferometer"])
|
|
)
|
|
|
|
conn = _open_db_connection()
|
|
try:
|
|
shadow_id = _seed_shadow_project(conn, "p05")
|
|
_seed_state_row(
|
|
conn, shadow_id, "status", "legacy_focus", "Wave 1 ingestion"
|
|
)
|
|
|
|
# Before migration: the legacy row is invisible to get_state
|
|
# (this is the documented gap, covered in test_project_state.py)
|
|
assert all(
|
|
entry.value != "Wave 1 ingestion" for entry in get_state("p05")
|
|
)
|
|
assert all(
|
|
entry.value != "Wave 1 ingestion"
|
|
for entry in get_state("p05-interferometer")
|
|
)
|
|
|
|
# Run the migration
|
|
plan = mig.build_plan(conn, registry_path)
|
|
mig.apply_plan(conn, plan)
|
|
finally:
|
|
conn.close()
|
|
|
|
# After migration: the row is reachable via canonical AND alias
|
|
via_canonical = get_state("p05-interferometer")
|
|
via_alias = get_state("p05")
|
|
assert any(e.value == "Wave 1 ingestion" for e in via_canonical)
|
|
assert any(e.value == "Wave 1 ingestion" for e in via_alias)
|