Files
ATOCore/tests/test_backup.py
Anto01 58c744fd2f feat: post-backup validation + retention cleanup (Tasks B & C)
- create_runtime_backup() now auto-validates its output and includes
  validated/validation_errors fields in returned metadata
- New cleanup_old_backups() with retention policy: 7 daily, 4 weekly
  (Sundays), 6 monthly (1st of month), dry-run by default
- CLI `cleanup` subcommand added to backup module
- 9 new tests (2 validation + 7 retention), 259 total passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-11 09:46:46 -04:00

691 lines
27 KiB
Python

"""Tests for runtime backup creation, restore, and retention cleanup."""
import json
import sqlite3
from datetime import UTC, datetime, timedelta
import pytest
import atocore.config as config
from atocore.models.database import init_db
from atocore.ops.backup import (
cleanup_old_backups,
create_runtime_backup,
list_runtime_backups,
restore_runtime_backup,
validate_backup,
)
def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
registry_path = tmp_path / "config" / "project-registry.json"
registry_path.parent.mkdir(parents=True)
registry_path.write_text('{"projects":[{"id":"p01-example","aliases":[],"ingest_roots":[{"source":"vault","subpath":"incoming/projects/p01-example"}]}]}\n', encoding="utf-8")
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
with sqlite3.connect(str(config.settings.db_path)) as conn:
conn.execute("INSERT INTO projects (id, name) VALUES (?, ?)", ("p01", "P01 Example"))
conn.commit()
result = create_runtime_backup(datetime(2026, 4, 6, 18, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
db_snapshot = tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "db" / "atocore.db"
registry_snapshot = (
tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "config" / "project-registry.json"
)
metadata_path = (
tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "backup-metadata.json"
)
assert result["db_snapshot_path"] == str(db_snapshot)
assert db_snapshot.exists()
assert registry_snapshot.exists()
assert metadata_path.exists()
with sqlite3.connect(str(db_snapshot)) as conn:
row = conn.execute("SELECT name FROM projects WHERE id = ?", ("p01",)).fetchone()
assert row[0] == "P01 Example"
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
assert metadata["registry_snapshot_path"] == str(registry_snapshot)
def test_create_runtime_backup_includes_chroma_when_requested(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
# Create a fake chroma directory tree with a couple of files.
chroma_dir = config.settings.chroma_path
(chroma_dir / "collection-a").mkdir(parents=True, exist_ok=True)
(chroma_dir / "collection-a" / "data.bin").write_bytes(b"\x00\x01\x02\x03")
(chroma_dir / "metadata.json").write_text('{"ok":true}', encoding="utf-8")
result = create_runtime_backup(
datetime(2026, 4, 6, 20, 0, 0, tzinfo=UTC),
include_chroma=True,
)
finally:
config.settings = original_settings
chroma_snapshot_root = (
tmp_path / "backups" / "snapshots" / "20260406T200000Z" / "chroma"
)
assert result["chroma_snapshot_included"] is True
assert result["chroma_snapshot_path"] == str(chroma_snapshot_root)
assert result["chroma_snapshot_files"] >= 2
assert result["chroma_snapshot_bytes"] > 0
assert (chroma_snapshot_root / "collection-a" / "data.bin").exists()
assert (chroma_snapshot_root / "metadata.json").exists()
def test_list_and_validate_runtime_backups(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
first = create_runtime_backup(datetime(2026, 4, 6, 21, 0, 0, tzinfo=UTC))
second = create_runtime_backup(datetime(2026, 4, 6, 22, 0, 0, tzinfo=UTC))
listing = list_runtime_backups()
first_validation = validate_backup("20260406T210000Z")
second_validation = validate_backup("20260406T220000Z")
missing_validation = validate_backup("20260101T000000Z")
finally:
config.settings = original_settings
assert len(listing) == 2
assert {entry["stamp"] for entry in listing} == {
"20260406T210000Z",
"20260406T220000Z",
}
for entry in listing:
assert entry["has_metadata"] is True
assert entry["metadata"]["db_snapshot_path"]
assert first_validation["valid"] is True
assert first_validation["db_ok"] is True
assert first_validation["errors"] == []
assert second_validation["valid"] is True
assert missing_validation["exists"] is False
assert "snapshot_directory_missing" in missing_validation["errors"]
# both metadata paths are reachable on disk
assert json.loads(
(tmp_path / "backups" / "snapshots" / "20260406T210000Z" / "backup-metadata.json")
.read_text(encoding="utf-8")
)["db_snapshot_path"] == first["db_snapshot_path"]
assert second["db_snapshot_path"].endswith("atocore.db")
def test_create_runtime_backup_handles_missing_registry(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
result = create_runtime_backup(datetime(2026, 4, 6, 19, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
assert result["registry_snapshot_path"] == ""
def test_restore_refuses_without_confirm_service_stopped(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
create_runtime_backup(datetime(2026, 4, 9, 10, 0, 0, tzinfo=UTC))
with pytest.raises(RuntimeError, match="confirm_service_stopped"):
restore_runtime_backup("20260409T100000Z")
finally:
config.settings = original_settings
def test_restore_raises_on_invalid_backup(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
with pytest.raises(RuntimeError, match="failed validation"):
restore_runtime_backup(
"20250101T000000Z", confirm_service_stopped=True
)
finally:
config.settings = original_settings
def test_restore_round_trip_reverses_post_backup_mutations(tmp_path, monkeypatch):
"""Canonical drill: snapshot -> mutate -> restore -> mutation gone."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
registry_path = tmp_path / "config" / "project-registry.json"
registry_path.parent.mkdir(parents=True)
registry_path.write_text(
'{"projects":[{"id":"p01-example","aliases":[],'
'"ingest_roots":[{"source":"vault","subpath":"incoming/projects/p01-example"}]}]}\n',
encoding="utf-8",
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
# 1. Seed baseline state that should SURVIVE the restore.
with sqlite3.connect(str(config.settings.db_path)) as conn:
conn.execute(
"INSERT INTO projects (id, name) VALUES (?, ?)",
("p01", "Baseline Project"),
)
conn.commit()
# 2. Create the backup we're going to restore to.
create_runtime_backup(datetime(2026, 4, 9, 11, 0, 0, tzinfo=UTC))
stamp = "20260409T110000Z"
# 3. Mutate live state AFTER the backup — this is what the
# restore should reverse.
with sqlite3.connect(str(config.settings.db_path)) as conn:
conn.execute(
"INSERT INTO projects (id, name) VALUES (?, ?)",
("p99", "Post Backup Mutation"),
)
conn.commit()
# Confirm the mutation is present before restore.
with sqlite3.connect(str(config.settings.db_path)) as conn:
row = conn.execute(
"SELECT name FROM projects WHERE id = ?", ("p99",)
).fetchone()
assert row is not None and row[0] == "Post Backup Mutation"
# 4. Restore — the drill procedure. Explicit confirm_service_stopped.
result = restore_runtime_backup(
stamp, confirm_service_stopped=True
)
# 5. Verify restore report
assert result["stamp"] == stamp
assert result["db_restored"] is True
assert result["registry_restored"] is True
assert result["restored_integrity_ok"] is True
assert result["pre_restore_snapshot"] is not None
# 6. Verify live state reflects the restore: baseline survived,
# post-backup mutation is gone.
with sqlite3.connect(str(config.settings.db_path)) as conn:
baseline = conn.execute(
"SELECT name FROM projects WHERE id = ?", ("p01",)
).fetchone()
mutation = conn.execute(
"SELECT name FROM projects WHERE id = ?", ("p99",)
).fetchone()
assert baseline is not None and baseline[0] == "Baseline Project"
assert mutation is None
# 7. Pre-restore safety snapshot DOES contain the mutation —
# it captured current state before overwriting. This is the
# reversibility guarantee: the operator can restore back to
# it if the restore itself was a mistake.
pre_stamp = result["pre_restore_snapshot"]
pre_validation = validate_backup(pre_stamp)
assert pre_validation["valid"] is True
pre_db_path = pre_validation["metadata"]["db_snapshot_path"]
with sqlite3.connect(pre_db_path) as conn:
pre_mutation = conn.execute(
"SELECT name FROM projects WHERE id = ?", ("p99",)
).fetchone()
assert pre_mutation is not None and pre_mutation[0] == "Post Backup Mutation"
finally:
config.settings = original_settings
def test_restore_round_trip_with_chroma(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
# Seed baseline chroma state that should survive restore.
chroma_dir = config.settings.chroma_path
(chroma_dir / "coll-a").mkdir(parents=True, exist_ok=True)
(chroma_dir / "coll-a" / "baseline.bin").write_bytes(b"baseline")
create_runtime_backup(
datetime(2026, 4, 9, 12, 0, 0, tzinfo=UTC), include_chroma=True
)
stamp = "20260409T120000Z"
# Mutate chroma after backup: add a file + remove baseline.
(chroma_dir / "coll-a" / "post_backup.bin").write_bytes(b"post")
(chroma_dir / "coll-a" / "baseline.bin").unlink()
result = restore_runtime_backup(
stamp, confirm_service_stopped=True
)
assert result["chroma_restored"] is True
assert (chroma_dir / "coll-a" / "baseline.bin").exists()
assert not (chroma_dir / "coll-a" / "post_backup.bin").exists()
finally:
config.settings = original_settings
def test_restore_chroma_does_not_unlink_destination_directory(tmp_path, monkeypatch):
"""Regression: restore must not rmtree the chroma dir itself.
In a Dockerized deployment the chroma dir is a bind-mounted
volume. Calling shutil.rmtree on a mount point raises
``OSError [Errno 16] Device or resource busy``, which broke the
first real Dalidou drill on 2026-04-09. The fix clears the
directory's CONTENTS and copytree(dirs_exist_ok=True) into it,
keeping the directory inode (and any bind mount) intact.
This test captures the inode of the destination directory before
and after restore and asserts they match — that's what a
bind-mounted chroma dir would also see.
"""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
chroma_dir = config.settings.chroma_path
(chroma_dir / "coll-a").mkdir(parents=True, exist_ok=True)
(chroma_dir / "coll-a" / "baseline.bin").write_bytes(b"baseline")
create_runtime_backup(
datetime(2026, 4, 9, 15, 0, 0, tzinfo=UTC), include_chroma=True
)
# Capture the destination directory's stat signature before restore.
chroma_stat_before = chroma_dir.stat()
# Add a file post-backup so restore has work to do.
(chroma_dir / "coll-a" / "post_backup.bin").write_bytes(b"post")
restore_runtime_backup(
"20260409T150000Z", confirm_service_stopped=True
)
# Directory still exists (would have failed on mount point) and
# its st_ino matches — the mount itself wasn't unlinked.
assert chroma_dir.exists()
chroma_stat_after = chroma_dir.stat()
assert chroma_stat_before.st_ino == chroma_stat_after.st_ino, (
"chroma directory inode changed — restore recreated the "
"directory instead of clearing its contents; this would "
"fail on a Docker bind-mounted volume"
)
# And the contents did actually get restored.
assert (chroma_dir / "coll-a" / "baseline.bin").exists()
assert not (chroma_dir / "coll-a" / "post_backup.bin").exists()
finally:
config.settings = original_settings
def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
create_runtime_backup(datetime(2026, 4, 9, 13, 0, 0, tzinfo=UTC))
before_count = len(list_runtime_backups())
result = restore_runtime_backup(
"20260409T130000Z",
confirm_service_stopped=True,
pre_restore_snapshot=False,
)
after_count = len(list_runtime_backups())
assert result["pre_restore_snapshot"] is None
assert after_count == before_count
finally:
config.settings = original_settings
def test_create_backup_includes_validation_fields(tmp_path, monkeypatch):
"""Task B: create_runtime_backup auto-validates and reports result."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
assert "validated" in result
assert "validation_errors" in result
assert result["validated"] is True
assert result["validation_errors"] == []
def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch):
"""Task B: if post-backup validation fails, backup still returns metadata."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
def _broken_validate(stamp):
return {"valid": False, "errors": ["db_missing", "metadata_missing"]}
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate)
result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
# Should NOT have raised — backup still returned metadata
assert result["validated"] is False
assert result["validation_errors"] == ["db_missing", "metadata_missing"]
# Core backup fields still present
assert "db_snapshot_path" in result
assert "created_at" in result
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
Note: after restore runs, PRAGMA integrity_check reopens the
restored db which may legitimately recreate a fresh -wal. So we
assert that the STALE byte marker no longer appears in either
sidecar, not that the files are absent.
"""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
create_runtime_backup(datetime(2026, 4, 9, 14, 0, 0, tzinfo=UTC))
# Write fake stale WAL/SHM next to the live db with an
# unmistakable marker.
target_db = config.settings.db_path
wal = target_db.with_name(target_db.name + "-wal")
shm = target_db.with_name(target_db.name + "-shm")
stale_marker = b"STALE-SIDECAR-MARKER-DO-NOT-SURVIVE"
wal.write_bytes(stale_marker)
shm.write_bytes(stale_marker)
assert wal.exists() and shm.exists()
restore_runtime_backup(
"20260409T140000Z", confirm_service_stopped=True
)
# The restored db must pass integrity check (tested elsewhere);
# here we just confirm that no file next to it still contains
# the stale marker from the old live process.
for sidecar in (wal, shm):
if sidecar.exists():
assert stale_marker not in sidecar.read_bytes(), (
f"{sidecar.name} still carries stale marker"
)
finally:
config.settings = original_settings
# ---------------------------------------------------------------------------
# Task C: Backup retention cleanup
# ---------------------------------------------------------------------------
def _setup_cleanup_env(tmp_path, monkeypatch):
"""Helper: configure env, init db, return snapshots_root."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original = config.settings
config.settings = config.Settings()
init_db()
snapshots_root = config.settings.resolved_backup_dir / "snapshots"
snapshots_root.mkdir(parents=True, exist_ok=True)
return original, snapshots_root
def _seed_snapshots(snapshots_root, dates):
"""Create minimal valid snapshot dirs for the given datetimes."""
for dt in dates:
stamp = dt.strftime("%Y%m%dT%H%M%SZ")
snap_dir = snapshots_root / stamp
db_dir = snap_dir / "db"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "atocore.db"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)")
conn.close()
metadata = {
"created_at": dt.isoformat(),
"backup_root": str(snap_dir),
"db_snapshot_path": str(db_path),
"db_size_bytes": db_path.stat().st_size,
"registry_snapshot_path": "",
"chroma_snapshot_path": "",
"chroma_snapshot_bytes": 0,
"chroma_snapshot_files": 0,
"chroma_snapshot_included": False,
"vector_store_note": "",
}
(snap_dir / "backup-metadata.json").write_text(
json.dumps(metadata, indent=2) + "\n", encoding="utf-8"
)
def test_cleanup_empty_dir(tmp_path, monkeypatch):
original, _ = _setup_cleanup_env(tmp_path, monkeypatch)
try:
result = cleanup_old_backups()
assert result["kept"] == 0
assert result["would_delete"] == 0
assert result["dry_run"] is True
finally:
config.settings = original
def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch):
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly).
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(10)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups()
assert result["dry_run"] is True
# 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily.
# Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly.
# So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily.
# But Apr 5 is the 8th newest day from Apr 11... wait.
# Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily.
# Remaining: Apr 4,3,2. Apr 5 is already in daily.
# None of Apr 4,3,2 are Sunday or 1st → all 3 deleted.
assert result["kept"] == 7
assert result["would_delete"] == 3
assert len(list(snapshots_root.iterdir())) == 10
finally:
config.settings = original
def test_cleanup_confirm_deletes(tmp_path, monkeypatch):
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(10)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups(confirm=True)
assert result["dry_run"] is False
assert result["deleted"] == 3
assert result["kept"] == 7
assert len(list(snapshots_root.iterdir())) == 7
finally:
config.settings = original
def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch):
"""Exactly 7 snapshots on different days → all kept."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(7)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups()
assert result["kept"] == 7
assert result["would_delete"] == 0
finally:
config.settings = original
def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch):
"""Snapshots on Sundays outside the 7-day window are kept as weekly."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 7 daily snapshots covering Apr 5-11
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
daily = [base + timedelta(days=i) for i in range(7)]
# 2 older Sunday snapshots
sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday
sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday
# A non-Sunday old snapshot that should be deleted
wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday
_seed_snapshots(snapshots_root, daily + [sun1, sun2, wed])
result = cleanup_old_backups()
# 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted
assert result["kept"] == 9
assert result["would_delete"] == 1
finally:
config.settings = original
def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch):
"""Snapshots on the 1st of a month outside daily+weekly are kept as monthly."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 7 daily in April 2026
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
daily = [base + timedelta(days=i) for i in range(7)]
# Old monthly 1st snapshots
m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC)
m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC)
# Old non-1st, non-Sunday snapshot — should be deleted
old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC)
_seed_snapshots(snapshots_root, daily + [m1, m2, old])
result = cleanup_old_backups()
# 7 daily + 2 monthly = 9 kept, 1 deleted
assert result["kept"] == 9
assert result["would_delete"] == 1
finally:
config.settings = original
def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch):
"""Directories with unparseable names are ignored, not deleted."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
_seed_snapshots(snapshots_root, [base])
bad_dir = snapshots_root / "not-a-timestamp"
bad_dir.mkdir()
result = cleanup_old_backups(confirm=True)
assert result.get("unparseable") == ["not-a-timestamp"]
assert bad_dir.exists()
assert result["kept"] == 1
finally:
config.settings = original