"""Tests for runtime backup creation, restore, and retention cleanup.""" import json import sqlite3 from datetime import UTC, datetime, timedelta import pytest import atocore.config as config from atocore.models.database import init_db from atocore.ops.backup import ( cleanup_old_backups, create_runtime_backup, list_runtime_backups, restore_runtime_backup, validate_backup, ) def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) registry_path = tmp_path / "config" / "project-registry.json" registry_path.parent.mkdir(parents=True) registry_path.write_text('{"projects":[{"id":"p01-example","aliases":[],"ingest_roots":[{"source":"vault","subpath":"incoming/projects/p01-example"}]}]}\n', encoding="utf-8") original_settings = config.settings try: config.settings = config.Settings() init_db() with sqlite3.connect(str(config.settings.db_path)) as conn: conn.execute("INSERT INTO projects (id, name) VALUES (?, ?)", ("p01", "P01 Example")) conn.commit() result = create_runtime_backup(datetime(2026, 4, 6, 18, 0, 0, tzinfo=UTC)) finally: config.settings = original_settings db_snapshot = tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "db" / "atocore.db" registry_snapshot = ( tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "config" / "project-registry.json" ) metadata_path = ( tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "backup-metadata.json" ) assert result["db_snapshot_path"] == str(db_snapshot) assert db_snapshot.exists() assert registry_snapshot.exists() assert metadata_path.exists() with sqlite3.connect(str(db_snapshot)) as conn: row = conn.execute("SELECT name FROM projects WHERE id = ?", ("p01",)).fetchone() assert row[0] == "P01 Example" metadata = json.loads(metadata_path.read_text(encoding="utf-8")) assert metadata["registry_snapshot_path"] == str(registry_snapshot) def test_create_runtime_backup_includes_chroma_when_requested(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() # Create a fake chroma directory tree with a couple of files. chroma_dir = config.settings.chroma_path (chroma_dir / "collection-a").mkdir(parents=True, exist_ok=True) (chroma_dir / "collection-a" / "data.bin").write_bytes(b"\x00\x01\x02\x03") (chroma_dir / "metadata.json").write_text('{"ok":true}', encoding="utf-8") result = create_runtime_backup( datetime(2026, 4, 6, 20, 0, 0, tzinfo=UTC), include_chroma=True, ) finally: config.settings = original_settings chroma_snapshot_root = ( tmp_path / "backups" / "snapshots" / "20260406T200000Z" / "chroma" ) assert result["chroma_snapshot_included"] is True assert result["chroma_snapshot_path"] == str(chroma_snapshot_root) assert result["chroma_snapshot_files"] >= 2 assert result["chroma_snapshot_bytes"] > 0 assert (chroma_snapshot_root / "collection-a" / "data.bin").exists() assert (chroma_snapshot_root / "metadata.json").exists() def test_list_and_validate_runtime_backups(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() first = create_runtime_backup(datetime(2026, 4, 6, 21, 0, 0, tzinfo=UTC)) second = create_runtime_backup(datetime(2026, 4, 6, 22, 0, 0, tzinfo=UTC)) listing = list_runtime_backups() first_validation = validate_backup("20260406T210000Z") second_validation = validate_backup("20260406T220000Z") missing_validation = validate_backup("20260101T000000Z") finally: config.settings = original_settings assert len(listing) == 2 assert {entry["stamp"] for entry in listing} == { "20260406T210000Z", "20260406T220000Z", } for entry in listing: assert entry["has_metadata"] is True assert entry["metadata"]["db_snapshot_path"] assert first_validation["valid"] is True assert first_validation["db_ok"] is True assert first_validation["errors"] == [] assert second_validation["valid"] is True assert missing_validation["exists"] is False assert "snapshot_directory_missing" in missing_validation["errors"] # both metadata paths are reachable on disk assert json.loads( (tmp_path / "backups" / "snapshots" / "20260406T210000Z" / "backup-metadata.json") .read_text(encoding="utf-8") )["db_snapshot_path"] == first["db_snapshot_path"] assert second["db_snapshot_path"].endswith("atocore.db") def test_create_runtime_backup_handles_missing_registry(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() result = create_runtime_backup(datetime(2026, 4, 6, 19, 0, 0, tzinfo=UTC)) finally: config.settings = original_settings assert result["registry_snapshot_path"] == "" def test_restore_refuses_without_confirm_service_stopped(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() create_runtime_backup(datetime(2026, 4, 9, 10, 0, 0, tzinfo=UTC)) with pytest.raises(RuntimeError, match="confirm_service_stopped"): restore_runtime_backup("20260409T100000Z") finally: config.settings = original_settings def test_restore_raises_on_invalid_backup(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() with pytest.raises(RuntimeError, match="failed validation"): restore_runtime_backup( "20250101T000000Z", confirm_service_stopped=True ) finally: config.settings = original_settings def test_restore_round_trip_reverses_post_backup_mutations(tmp_path, monkeypatch): """Canonical drill: snapshot -> mutate -> restore -> mutation gone.""" monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) registry_path = tmp_path / "config" / "project-registry.json" registry_path.parent.mkdir(parents=True) registry_path.write_text( '{"projects":[{"id":"p01-example","aliases":[],' '"ingest_roots":[{"source":"vault","subpath":"incoming/projects/p01-example"}]}]}\n', encoding="utf-8", ) original_settings = config.settings try: config.settings = config.Settings() init_db() # 1. Seed baseline state that should SURVIVE the restore. with sqlite3.connect(str(config.settings.db_path)) as conn: conn.execute( "INSERT INTO projects (id, name) VALUES (?, ?)", ("p01", "Baseline Project"), ) conn.commit() # 2. Create the backup we're going to restore to. create_runtime_backup(datetime(2026, 4, 9, 11, 0, 0, tzinfo=UTC)) stamp = "20260409T110000Z" # 3. Mutate live state AFTER the backup — this is what the # restore should reverse. with sqlite3.connect(str(config.settings.db_path)) as conn: conn.execute( "INSERT INTO projects (id, name) VALUES (?, ?)", ("p99", "Post Backup Mutation"), ) conn.commit() # Confirm the mutation is present before restore. with sqlite3.connect(str(config.settings.db_path)) as conn: row = conn.execute( "SELECT name FROM projects WHERE id = ?", ("p99",) ).fetchone() assert row is not None and row[0] == "Post Backup Mutation" # 4. Restore — the drill procedure. Explicit confirm_service_stopped. result = restore_runtime_backup( stamp, confirm_service_stopped=True ) # 5. Verify restore report assert result["stamp"] == stamp assert result["db_restored"] is True assert result["registry_restored"] is True assert result["restored_integrity_ok"] is True assert result["pre_restore_snapshot"] is not None # 6. Verify live state reflects the restore: baseline survived, # post-backup mutation is gone. with sqlite3.connect(str(config.settings.db_path)) as conn: baseline = conn.execute( "SELECT name FROM projects WHERE id = ?", ("p01",) ).fetchone() mutation = conn.execute( "SELECT name FROM projects WHERE id = ?", ("p99",) ).fetchone() assert baseline is not None and baseline[0] == "Baseline Project" assert mutation is None # 7. Pre-restore safety snapshot DOES contain the mutation — # it captured current state before overwriting. This is the # reversibility guarantee: the operator can restore back to # it if the restore itself was a mistake. pre_stamp = result["pre_restore_snapshot"] pre_validation = validate_backup(pre_stamp) assert pre_validation["valid"] is True pre_db_path = pre_validation["metadata"]["db_snapshot_path"] with sqlite3.connect(pre_db_path) as conn: pre_mutation = conn.execute( "SELECT name FROM projects WHERE id = ?", ("p99",) ).fetchone() assert pre_mutation is not None and pre_mutation[0] == "Post Backup Mutation" finally: config.settings = original_settings def test_restore_round_trip_with_chroma(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() # Seed baseline chroma state that should survive restore. chroma_dir = config.settings.chroma_path (chroma_dir / "coll-a").mkdir(parents=True, exist_ok=True) (chroma_dir / "coll-a" / "baseline.bin").write_bytes(b"baseline") create_runtime_backup( datetime(2026, 4, 9, 12, 0, 0, tzinfo=UTC), include_chroma=True ) stamp = "20260409T120000Z" # Mutate chroma after backup: add a file + remove baseline. (chroma_dir / "coll-a" / "post_backup.bin").write_bytes(b"post") (chroma_dir / "coll-a" / "baseline.bin").unlink() result = restore_runtime_backup( stamp, confirm_service_stopped=True ) assert result["chroma_restored"] is True assert (chroma_dir / "coll-a" / "baseline.bin").exists() assert not (chroma_dir / "coll-a" / "post_backup.bin").exists() finally: config.settings = original_settings def test_restore_chroma_does_not_unlink_destination_directory(tmp_path, monkeypatch): """Regression: restore must not rmtree the chroma dir itself. In a Dockerized deployment the chroma dir is a bind-mounted volume. Calling shutil.rmtree on a mount point raises ``OSError [Errno 16] Device or resource busy``, which broke the first real Dalidou drill on 2026-04-09. The fix clears the directory's CONTENTS and copytree(dirs_exist_ok=True) into it, keeping the directory inode (and any bind mount) intact. This test captures the inode of the destination directory before and after restore and asserts they match — that's what a bind-mounted chroma dir would also see. """ monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() chroma_dir = config.settings.chroma_path (chroma_dir / "coll-a").mkdir(parents=True, exist_ok=True) (chroma_dir / "coll-a" / "baseline.bin").write_bytes(b"baseline") create_runtime_backup( datetime(2026, 4, 9, 15, 0, 0, tzinfo=UTC), include_chroma=True ) # Capture the destination directory's stat signature before restore. chroma_stat_before = chroma_dir.stat() # Add a file post-backup so restore has work to do. (chroma_dir / "coll-a" / "post_backup.bin").write_bytes(b"post") restore_runtime_backup( "20260409T150000Z", confirm_service_stopped=True ) # Directory still exists (would have failed on mount point) and # its st_ino matches — the mount itself wasn't unlinked. assert chroma_dir.exists() chroma_stat_after = chroma_dir.stat() assert chroma_stat_before.st_ino == chroma_stat_after.st_ino, ( "chroma directory inode changed — restore recreated the " "directory instead of clearing its contents; this would " "fail on a Docker bind-mounted volume" ) # And the contents did actually get restored. assert (chroma_dir / "coll-a" / "baseline.bin").exists() assert not (chroma_dir / "coll-a" / "post_backup.bin").exists() finally: config.settings = original_settings def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch): monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() create_runtime_backup(datetime(2026, 4, 9, 13, 0, 0, tzinfo=UTC)) before_count = len(list_runtime_backups()) result = restore_runtime_backup( "20260409T130000Z", confirm_service_stopped=True, pre_restore_snapshot=False, ) after_count = len(list_runtime_backups()) assert result["pre_restore_snapshot"] is None assert after_count == before_count finally: config.settings = original_settings def test_create_backup_includes_validation_fields(tmp_path, monkeypatch): """Task B: create_runtime_backup auto-validates and reports result.""" monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC)) finally: config.settings = original_settings assert "validated" in result assert "validation_errors" in result assert result["validated"] is True assert result["validation_errors"] == [] def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch): """Task B: if post-backup validation fails, backup still returns metadata.""" monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) def _broken_validate(stamp): return {"valid": False, "errors": ["db_missing", "metadata_missing"]} original_settings = config.settings try: config.settings = config.Settings() init_db() monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate) result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC)) finally: config.settings = original_settings # Should NOT have raised — backup still returned metadata assert result["validated"] is False assert result["validation_errors"] == ["db_missing", "metadata_missing"] # Core backup fields still present assert "db_snapshot_path" in result assert "created_at" in result def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch): """Stale WAL/SHM sidecars must not carry bytes past the restore. Note: after restore runs, PRAGMA integrity_check reopens the restored db which may legitimately recreate a fresh -wal. So we assert that the STALE byte marker no longer appears in either sidecar, not that the files are absent. """ monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original_settings = config.settings try: config.settings = config.Settings() init_db() create_runtime_backup(datetime(2026, 4, 9, 14, 0, 0, tzinfo=UTC)) # Write fake stale WAL/SHM next to the live db with an # unmistakable marker. target_db = config.settings.db_path wal = target_db.with_name(target_db.name + "-wal") shm = target_db.with_name(target_db.name + "-shm") stale_marker = b"STALE-SIDECAR-MARKER-DO-NOT-SURVIVE" wal.write_bytes(stale_marker) shm.write_bytes(stale_marker) assert wal.exists() and shm.exists() restore_runtime_backup( "20260409T140000Z", confirm_service_stopped=True ) # The restored db must pass integrity check (tested elsewhere); # here we just confirm that no file next to it still contains # the stale marker from the old live process. for sidecar in (wal, shm): if sidecar.exists(): assert stale_marker not in sidecar.read_bytes(), ( f"{sidecar.name} still carries stale marker" ) finally: config.settings = original_settings # --------------------------------------------------------------------------- # Task C: Backup retention cleanup # --------------------------------------------------------------------------- def _setup_cleanup_env(tmp_path, monkeypatch): """Helper: configure env, init db, return snapshots_root.""" monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) monkeypatch.setenv( "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") ) original = config.settings config.settings = config.Settings() init_db() snapshots_root = config.settings.resolved_backup_dir / "snapshots" snapshots_root.mkdir(parents=True, exist_ok=True) return original, snapshots_root def _seed_snapshots(snapshots_root, dates): """Create minimal valid snapshot dirs for the given datetimes.""" for dt in dates: stamp = dt.strftime("%Y%m%dT%H%M%SZ") snap_dir = snapshots_root / stamp db_dir = snap_dir / "db" db_dir.mkdir(parents=True, exist_ok=True) db_path = db_dir / "atocore.db" conn = sqlite3.connect(str(db_path)) conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)") conn.close() metadata = { "created_at": dt.isoformat(), "backup_root": str(snap_dir), "db_snapshot_path": str(db_path), "db_size_bytes": db_path.stat().st_size, "registry_snapshot_path": "", "chroma_snapshot_path": "", "chroma_snapshot_bytes": 0, "chroma_snapshot_files": 0, "chroma_snapshot_included": False, "vector_store_note": "", } (snap_dir / "backup-metadata.json").write_text( json.dumps(metadata, indent=2) + "\n", encoding="utf-8" ) def test_cleanup_empty_dir(tmp_path, monkeypatch): original, _ = _setup_cleanup_env(tmp_path, monkeypatch) try: result = cleanup_old_backups() assert result["kept"] == 0 assert result["would_delete"] == 0 assert result["dry_run"] is True finally: config.settings = original def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch): original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) try: # 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly). base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC) dates = [base + timedelta(days=i) for i in range(10)] _seed_snapshots(snapshots_root, dates) result = cleanup_old_backups() assert result["dry_run"] is True # 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily. # Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly. # So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily. # But Apr 5 is the 8th newest day from Apr 11... wait. # Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily. # Remaining: Apr 4,3,2. Apr 5 is already in daily. # None of Apr 4,3,2 are Sunday or 1st → all 3 deleted. assert result["kept"] == 7 assert result["would_delete"] == 3 assert len(list(snapshots_root.iterdir())) == 10 finally: config.settings = original def test_cleanup_confirm_deletes(tmp_path, monkeypatch): original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) try: base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC) dates = [base + timedelta(days=i) for i in range(10)] _seed_snapshots(snapshots_root, dates) result = cleanup_old_backups(confirm=True) assert result["dry_run"] is False assert result["deleted"] == 3 assert result["kept"] == 7 assert len(list(snapshots_root.iterdir())) == 7 finally: config.settings = original def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch): """Exactly 7 snapshots on different days → all kept.""" original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) try: base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) dates = [base + timedelta(days=i) for i in range(7)] _seed_snapshots(snapshots_root, dates) result = cleanup_old_backups() assert result["kept"] == 7 assert result["would_delete"] == 0 finally: config.settings = original def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch): """Snapshots on Sundays outside the 7-day window are kept as weekly.""" original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) try: # 7 daily snapshots covering Apr 5-11 base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) daily = [base + timedelta(days=i) for i in range(7)] # 2 older Sunday snapshots sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday # A non-Sunday old snapshot that should be deleted wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday _seed_snapshots(snapshots_root, daily + [sun1, sun2, wed]) result = cleanup_old_backups() # 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted assert result["kept"] == 9 assert result["would_delete"] == 1 finally: config.settings = original def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch): """Snapshots on the 1st of a month outside daily+weekly are kept as monthly.""" original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) try: # 7 daily in April 2026 base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) daily = [base + timedelta(days=i) for i in range(7)] # Old monthly 1st snapshots m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC) m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC) # Old non-1st, non-Sunday snapshot — should be deleted old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC) _seed_snapshots(snapshots_root, daily + [m1, m2, old]) result = cleanup_old_backups() # 7 daily + 2 monthly = 9 kept, 1 deleted assert result["kept"] == 9 assert result["would_delete"] == 1 finally: config.settings = original def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch): """Directories with unparseable names are ignored, not deleted.""" original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) try: base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) _seed_snapshots(snapshots_root, [base]) bad_dir = snapshots_root / "not-a-timestamp" bad_dir.mkdir() result = cleanup_old_backups(confirm=True) assert result.get("unparseable") == ["not-a-timestamp"] assert bad_dir.exists() assert result["kept"] == 1 finally: config.settings = original