- create_runtime_backup() now auto-validates its output and includes validated/validation_errors fields in returned metadata - New cleanup_old_backups() with retention policy: 7 daily, 4 weekly (Sundays), 6 monthly (1st of month), dry-run by default - CLI `cleanup` subcommand added to backup module - 9 new tests (2 validation + 7 retention), 259 total passing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
691 lines
27 KiB
Python
691 lines
27 KiB
Python
"""Tests for runtime backup creation, restore, and retention cleanup."""
|
|
|
|
import json
|
|
import sqlite3
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
import pytest
|
|
|
|
import atocore.config as config
|
|
from atocore.models.database import init_db
|
|
from atocore.ops.backup import (
|
|
cleanup_old_backups,
|
|
create_runtime_backup,
|
|
list_runtime_backups,
|
|
restore_runtime_backup,
|
|
validate_backup,
|
|
)
|
|
|
|
|
|
def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
registry_path = tmp_path / "config" / "project-registry.json"
|
|
registry_path.parent.mkdir(parents=True)
|
|
registry_path.write_text('{"projects":[{"id":"p01-example","aliases":[],"ingest_roots":[{"source":"vault","subpath":"incoming/projects/p01-example"}]}]}\n', encoding="utf-8")
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
with sqlite3.connect(str(config.settings.db_path)) as conn:
|
|
conn.execute("INSERT INTO projects (id, name) VALUES (?, ?)", ("p01", "P01 Example"))
|
|
conn.commit()
|
|
|
|
result = create_runtime_backup(datetime(2026, 4, 6, 18, 0, 0, tzinfo=UTC))
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
db_snapshot = tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "db" / "atocore.db"
|
|
registry_snapshot = (
|
|
tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "config" / "project-registry.json"
|
|
)
|
|
metadata_path = (
|
|
tmp_path / "backups" / "snapshots" / "20260406T180000Z" / "backup-metadata.json"
|
|
)
|
|
|
|
assert result["db_snapshot_path"] == str(db_snapshot)
|
|
assert db_snapshot.exists()
|
|
assert registry_snapshot.exists()
|
|
assert metadata_path.exists()
|
|
|
|
with sqlite3.connect(str(db_snapshot)) as conn:
|
|
row = conn.execute("SELECT name FROM projects WHERE id = ?", ("p01",)).fetchone()
|
|
assert row[0] == "P01 Example"
|
|
|
|
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
|
|
assert metadata["registry_snapshot_path"] == str(registry_snapshot)
|
|
|
|
|
|
def test_create_runtime_backup_includes_chroma_when_requested(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
|
|
# Create a fake chroma directory tree with a couple of files.
|
|
chroma_dir = config.settings.chroma_path
|
|
(chroma_dir / "collection-a").mkdir(parents=True, exist_ok=True)
|
|
(chroma_dir / "collection-a" / "data.bin").write_bytes(b"\x00\x01\x02\x03")
|
|
(chroma_dir / "metadata.json").write_text('{"ok":true}', encoding="utf-8")
|
|
|
|
result = create_runtime_backup(
|
|
datetime(2026, 4, 6, 20, 0, 0, tzinfo=UTC),
|
|
include_chroma=True,
|
|
)
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
chroma_snapshot_root = (
|
|
tmp_path / "backups" / "snapshots" / "20260406T200000Z" / "chroma"
|
|
)
|
|
assert result["chroma_snapshot_included"] is True
|
|
assert result["chroma_snapshot_path"] == str(chroma_snapshot_root)
|
|
assert result["chroma_snapshot_files"] >= 2
|
|
assert result["chroma_snapshot_bytes"] > 0
|
|
assert (chroma_snapshot_root / "collection-a" / "data.bin").exists()
|
|
assert (chroma_snapshot_root / "metadata.json").exists()
|
|
|
|
|
|
def test_list_and_validate_runtime_backups(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
first = create_runtime_backup(datetime(2026, 4, 6, 21, 0, 0, tzinfo=UTC))
|
|
second = create_runtime_backup(datetime(2026, 4, 6, 22, 0, 0, tzinfo=UTC))
|
|
|
|
listing = list_runtime_backups()
|
|
first_validation = validate_backup("20260406T210000Z")
|
|
second_validation = validate_backup("20260406T220000Z")
|
|
missing_validation = validate_backup("20260101T000000Z")
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
assert len(listing) == 2
|
|
assert {entry["stamp"] for entry in listing} == {
|
|
"20260406T210000Z",
|
|
"20260406T220000Z",
|
|
}
|
|
for entry in listing:
|
|
assert entry["has_metadata"] is True
|
|
assert entry["metadata"]["db_snapshot_path"]
|
|
|
|
assert first_validation["valid"] is True
|
|
assert first_validation["db_ok"] is True
|
|
assert first_validation["errors"] == []
|
|
|
|
assert second_validation["valid"] is True
|
|
|
|
assert missing_validation["exists"] is False
|
|
assert "snapshot_directory_missing" in missing_validation["errors"]
|
|
|
|
# both metadata paths are reachable on disk
|
|
assert json.loads(
|
|
(tmp_path / "backups" / "snapshots" / "20260406T210000Z" / "backup-metadata.json")
|
|
.read_text(encoding="utf-8")
|
|
)["db_snapshot_path"] == first["db_snapshot_path"]
|
|
assert second["db_snapshot_path"].endswith("atocore.db")
|
|
|
|
|
|
def test_create_runtime_backup_handles_missing_registry(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
result = create_runtime_backup(datetime(2026, 4, 6, 19, 0, 0, tzinfo=UTC))
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
assert result["registry_snapshot_path"] == ""
|
|
|
|
|
|
def test_restore_refuses_without_confirm_service_stopped(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
create_runtime_backup(datetime(2026, 4, 9, 10, 0, 0, tzinfo=UTC))
|
|
|
|
with pytest.raises(RuntimeError, match="confirm_service_stopped"):
|
|
restore_runtime_backup("20260409T100000Z")
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
def test_restore_raises_on_invalid_backup(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
with pytest.raises(RuntimeError, match="failed validation"):
|
|
restore_runtime_backup(
|
|
"20250101T000000Z", confirm_service_stopped=True
|
|
)
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
def test_restore_round_trip_reverses_post_backup_mutations(tmp_path, monkeypatch):
|
|
"""Canonical drill: snapshot -> mutate -> restore -> mutation gone."""
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
registry_path = tmp_path / "config" / "project-registry.json"
|
|
registry_path.parent.mkdir(parents=True)
|
|
registry_path.write_text(
|
|
'{"projects":[{"id":"p01-example","aliases":[],'
|
|
'"ingest_roots":[{"source":"vault","subpath":"incoming/projects/p01-example"}]}]}\n',
|
|
encoding="utf-8",
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
|
|
# 1. Seed baseline state that should SURVIVE the restore.
|
|
with sqlite3.connect(str(config.settings.db_path)) as conn:
|
|
conn.execute(
|
|
"INSERT INTO projects (id, name) VALUES (?, ?)",
|
|
("p01", "Baseline Project"),
|
|
)
|
|
conn.commit()
|
|
|
|
# 2. Create the backup we're going to restore to.
|
|
create_runtime_backup(datetime(2026, 4, 9, 11, 0, 0, tzinfo=UTC))
|
|
stamp = "20260409T110000Z"
|
|
|
|
# 3. Mutate live state AFTER the backup — this is what the
|
|
# restore should reverse.
|
|
with sqlite3.connect(str(config.settings.db_path)) as conn:
|
|
conn.execute(
|
|
"INSERT INTO projects (id, name) VALUES (?, ?)",
|
|
("p99", "Post Backup Mutation"),
|
|
)
|
|
conn.commit()
|
|
|
|
# Confirm the mutation is present before restore.
|
|
with sqlite3.connect(str(config.settings.db_path)) as conn:
|
|
row = conn.execute(
|
|
"SELECT name FROM projects WHERE id = ?", ("p99",)
|
|
).fetchone()
|
|
assert row is not None and row[0] == "Post Backup Mutation"
|
|
|
|
# 4. Restore — the drill procedure. Explicit confirm_service_stopped.
|
|
result = restore_runtime_backup(
|
|
stamp, confirm_service_stopped=True
|
|
)
|
|
|
|
# 5. Verify restore report
|
|
assert result["stamp"] == stamp
|
|
assert result["db_restored"] is True
|
|
assert result["registry_restored"] is True
|
|
assert result["restored_integrity_ok"] is True
|
|
assert result["pre_restore_snapshot"] is not None
|
|
|
|
# 6. Verify live state reflects the restore: baseline survived,
|
|
# post-backup mutation is gone.
|
|
with sqlite3.connect(str(config.settings.db_path)) as conn:
|
|
baseline = conn.execute(
|
|
"SELECT name FROM projects WHERE id = ?", ("p01",)
|
|
).fetchone()
|
|
mutation = conn.execute(
|
|
"SELECT name FROM projects WHERE id = ?", ("p99",)
|
|
).fetchone()
|
|
assert baseline is not None and baseline[0] == "Baseline Project"
|
|
assert mutation is None
|
|
|
|
# 7. Pre-restore safety snapshot DOES contain the mutation —
|
|
# it captured current state before overwriting. This is the
|
|
# reversibility guarantee: the operator can restore back to
|
|
# it if the restore itself was a mistake.
|
|
pre_stamp = result["pre_restore_snapshot"]
|
|
pre_validation = validate_backup(pre_stamp)
|
|
assert pre_validation["valid"] is True
|
|
pre_db_path = pre_validation["metadata"]["db_snapshot_path"]
|
|
with sqlite3.connect(pre_db_path) as conn:
|
|
pre_mutation = conn.execute(
|
|
"SELECT name FROM projects WHERE id = ?", ("p99",)
|
|
).fetchone()
|
|
assert pre_mutation is not None and pre_mutation[0] == "Post Backup Mutation"
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
def test_restore_round_trip_with_chroma(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
|
|
# Seed baseline chroma state that should survive restore.
|
|
chroma_dir = config.settings.chroma_path
|
|
(chroma_dir / "coll-a").mkdir(parents=True, exist_ok=True)
|
|
(chroma_dir / "coll-a" / "baseline.bin").write_bytes(b"baseline")
|
|
|
|
create_runtime_backup(
|
|
datetime(2026, 4, 9, 12, 0, 0, tzinfo=UTC), include_chroma=True
|
|
)
|
|
stamp = "20260409T120000Z"
|
|
|
|
# Mutate chroma after backup: add a file + remove baseline.
|
|
(chroma_dir / "coll-a" / "post_backup.bin").write_bytes(b"post")
|
|
(chroma_dir / "coll-a" / "baseline.bin").unlink()
|
|
|
|
result = restore_runtime_backup(
|
|
stamp, confirm_service_stopped=True
|
|
)
|
|
|
|
assert result["chroma_restored"] is True
|
|
assert (chroma_dir / "coll-a" / "baseline.bin").exists()
|
|
assert not (chroma_dir / "coll-a" / "post_backup.bin").exists()
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
def test_restore_chroma_does_not_unlink_destination_directory(tmp_path, monkeypatch):
|
|
"""Regression: restore must not rmtree the chroma dir itself.
|
|
|
|
In a Dockerized deployment the chroma dir is a bind-mounted
|
|
volume. Calling shutil.rmtree on a mount point raises
|
|
``OSError [Errno 16] Device or resource busy``, which broke the
|
|
first real Dalidou drill on 2026-04-09. The fix clears the
|
|
directory's CONTENTS and copytree(dirs_exist_ok=True) into it,
|
|
keeping the directory inode (and any bind mount) intact.
|
|
|
|
This test captures the inode of the destination directory before
|
|
and after restore and asserts they match — that's what a
|
|
bind-mounted chroma dir would also see.
|
|
"""
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
|
|
chroma_dir = config.settings.chroma_path
|
|
(chroma_dir / "coll-a").mkdir(parents=True, exist_ok=True)
|
|
(chroma_dir / "coll-a" / "baseline.bin").write_bytes(b"baseline")
|
|
|
|
create_runtime_backup(
|
|
datetime(2026, 4, 9, 15, 0, 0, tzinfo=UTC), include_chroma=True
|
|
)
|
|
|
|
# Capture the destination directory's stat signature before restore.
|
|
chroma_stat_before = chroma_dir.stat()
|
|
|
|
# Add a file post-backup so restore has work to do.
|
|
(chroma_dir / "coll-a" / "post_backup.bin").write_bytes(b"post")
|
|
|
|
restore_runtime_backup(
|
|
"20260409T150000Z", confirm_service_stopped=True
|
|
)
|
|
|
|
# Directory still exists (would have failed on mount point) and
|
|
# its st_ino matches — the mount itself wasn't unlinked.
|
|
assert chroma_dir.exists()
|
|
chroma_stat_after = chroma_dir.stat()
|
|
assert chroma_stat_before.st_ino == chroma_stat_after.st_ino, (
|
|
"chroma directory inode changed — restore recreated the "
|
|
"directory instead of clearing its contents; this would "
|
|
"fail on a Docker bind-mounted volume"
|
|
)
|
|
# And the contents did actually get restored.
|
|
assert (chroma_dir / "coll-a" / "baseline.bin").exists()
|
|
assert not (chroma_dir / "coll-a" / "post_backup.bin").exists()
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
create_runtime_backup(datetime(2026, 4, 9, 13, 0, 0, tzinfo=UTC))
|
|
|
|
before_count = len(list_runtime_backups())
|
|
|
|
result = restore_runtime_backup(
|
|
"20260409T130000Z",
|
|
confirm_service_stopped=True,
|
|
pre_restore_snapshot=False,
|
|
)
|
|
|
|
after_count = len(list_runtime_backups())
|
|
assert result["pre_restore_snapshot"] is None
|
|
assert after_count == before_count
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
def test_create_backup_includes_validation_fields(tmp_path, monkeypatch):
|
|
"""Task B: create_runtime_backup auto-validates and reports result."""
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC))
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
assert "validated" in result
|
|
assert "validation_errors" in result
|
|
assert result["validated"] is True
|
|
assert result["validation_errors"] == []
|
|
|
|
|
|
def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch):
|
|
"""Task B: if post-backup validation fails, backup still returns metadata."""
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
def _broken_validate(stamp):
|
|
return {"valid": False, "errors": ["db_missing", "metadata_missing"]}
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate)
|
|
result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC))
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
# Should NOT have raised — backup still returned metadata
|
|
assert result["validated"] is False
|
|
assert result["validation_errors"] == ["db_missing", "metadata_missing"]
|
|
# Core backup fields still present
|
|
assert "db_snapshot_path" in result
|
|
assert "created_at" in result
|
|
|
|
|
|
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
|
|
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
|
|
|
|
Note: after restore runs, PRAGMA integrity_check reopens the
|
|
restored db which may legitimately recreate a fresh -wal. So we
|
|
assert that the STALE byte marker no longer appears in either
|
|
sidecar, not that the files are absent.
|
|
"""
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
create_runtime_backup(datetime(2026, 4, 9, 14, 0, 0, tzinfo=UTC))
|
|
|
|
# Write fake stale WAL/SHM next to the live db with an
|
|
# unmistakable marker.
|
|
target_db = config.settings.db_path
|
|
wal = target_db.with_name(target_db.name + "-wal")
|
|
shm = target_db.with_name(target_db.name + "-shm")
|
|
stale_marker = b"STALE-SIDECAR-MARKER-DO-NOT-SURVIVE"
|
|
wal.write_bytes(stale_marker)
|
|
shm.write_bytes(stale_marker)
|
|
assert wal.exists() and shm.exists()
|
|
|
|
restore_runtime_backup(
|
|
"20260409T140000Z", confirm_service_stopped=True
|
|
)
|
|
|
|
# The restored db must pass integrity check (tested elsewhere);
|
|
# here we just confirm that no file next to it still contains
|
|
# the stale marker from the old live process.
|
|
for sidecar in (wal, shm):
|
|
if sidecar.exists():
|
|
assert stale_marker not in sidecar.read_bytes(), (
|
|
f"{sidecar.name} still carries stale marker"
|
|
)
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Task C: Backup retention cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _setup_cleanup_env(tmp_path, monkeypatch):
|
|
"""Helper: configure env, init db, return snapshots_root."""
|
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
|
monkeypatch.setenv(
|
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
|
)
|
|
original = config.settings
|
|
config.settings = config.Settings()
|
|
init_db()
|
|
snapshots_root = config.settings.resolved_backup_dir / "snapshots"
|
|
snapshots_root.mkdir(parents=True, exist_ok=True)
|
|
return original, snapshots_root
|
|
|
|
|
|
def _seed_snapshots(snapshots_root, dates):
|
|
"""Create minimal valid snapshot dirs for the given datetimes."""
|
|
for dt in dates:
|
|
stamp = dt.strftime("%Y%m%dT%H%M%SZ")
|
|
snap_dir = snapshots_root / stamp
|
|
db_dir = snap_dir / "db"
|
|
db_dir.mkdir(parents=True, exist_ok=True)
|
|
db_path = db_dir / "atocore.db"
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)")
|
|
conn.close()
|
|
metadata = {
|
|
"created_at": dt.isoformat(),
|
|
"backup_root": str(snap_dir),
|
|
"db_snapshot_path": str(db_path),
|
|
"db_size_bytes": db_path.stat().st_size,
|
|
"registry_snapshot_path": "",
|
|
"chroma_snapshot_path": "",
|
|
"chroma_snapshot_bytes": 0,
|
|
"chroma_snapshot_files": 0,
|
|
"chroma_snapshot_included": False,
|
|
"vector_store_note": "",
|
|
}
|
|
(snap_dir / "backup-metadata.json").write_text(
|
|
json.dumps(metadata, indent=2) + "\n", encoding="utf-8"
|
|
)
|
|
|
|
|
|
def test_cleanup_empty_dir(tmp_path, monkeypatch):
|
|
original, _ = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
result = cleanup_old_backups()
|
|
assert result["kept"] == 0
|
|
assert result["would_delete"] == 0
|
|
assert result["dry_run"] is True
|
|
finally:
|
|
config.settings = original
|
|
|
|
|
|
def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch):
|
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
# 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly).
|
|
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
|
|
dates = [base + timedelta(days=i) for i in range(10)]
|
|
_seed_snapshots(snapshots_root, dates)
|
|
|
|
result = cleanup_old_backups()
|
|
assert result["dry_run"] is True
|
|
# 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily.
|
|
# Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly.
|
|
# So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily.
|
|
# But Apr 5 is the 8th newest day from Apr 11... wait.
|
|
# Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily.
|
|
# Remaining: Apr 4,3,2. Apr 5 is already in daily.
|
|
# None of Apr 4,3,2 are Sunday or 1st → all 3 deleted.
|
|
assert result["kept"] == 7
|
|
assert result["would_delete"] == 3
|
|
assert len(list(snapshots_root.iterdir())) == 10
|
|
finally:
|
|
config.settings = original
|
|
|
|
|
|
def test_cleanup_confirm_deletes(tmp_path, monkeypatch):
|
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
|
|
dates = [base + timedelta(days=i) for i in range(10)]
|
|
_seed_snapshots(snapshots_root, dates)
|
|
|
|
result = cleanup_old_backups(confirm=True)
|
|
assert result["dry_run"] is False
|
|
assert result["deleted"] == 3
|
|
assert result["kept"] == 7
|
|
assert len(list(snapshots_root.iterdir())) == 7
|
|
finally:
|
|
config.settings = original
|
|
|
|
|
|
def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch):
|
|
"""Exactly 7 snapshots on different days → all kept."""
|
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
|
dates = [base + timedelta(days=i) for i in range(7)]
|
|
_seed_snapshots(snapshots_root, dates)
|
|
|
|
result = cleanup_old_backups()
|
|
assert result["kept"] == 7
|
|
assert result["would_delete"] == 0
|
|
finally:
|
|
config.settings = original
|
|
|
|
|
|
def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch):
|
|
"""Snapshots on Sundays outside the 7-day window are kept as weekly."""
|
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
# 7 daily snapshots covering Apr 5-11
|
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
|
daily = [base + timedelta(days=i) for i in range(7)]
|
|
|
|
# 2 older Sunday snapshots
|
|
sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday
|
|
sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday
|
|
# A non-Sunday old snapshot that should be deleted
|
|
wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday
|
|
|
|
_seed_snapshots(snapshots_root, daily + [sun1, sun2, wed])
|
|
|
|
result = cleanup_old_backups()
|
|
# 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted
|
|
assert result["kept"] == 9
|
|
assert result["would_delete"] == 1
|
|
finally:
|
|
config.settings = original
|
|
|
|
|
|
def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch):
|
|
"""Snapshots on the 1st of a month outside daily+weekly are kept as monthly."""
|
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
# 7 daily in April 2026
|
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
|
daily = [base + timedelta(days=i) for i in range(7)]
|
|
|
|
# Old monthly 1st snapshots
|
|
m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC)
|
|
m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC)
|
|
# Old non-1st, non-Sunday snapshot — should be deleted
|
|
old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC)
|
|
|
|
_seed_snapshots(snapshots_root, daily + [m1, m2, old])
|
|
|
|
result = cleanup_old_backups()
|
|
# 7 daily + 2 monthly = 9 kept, 1 deleted
|
|
assert result["kept"] == 9
|
|
assert result["would_delete"] == 1
|
|
finally:
|
|
config.settings = original
|
|
|
|
|
|
def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch):
|
|
"""Directories with unparseable names are ignored, not deleted."""
|
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
|
try:
|
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
|
_seed_snapshots(snapshots_root, [base])
|
|
|
|
bad_dir = snapshots_root / "not-a-timestamp"
|
|
bad_dir.mkdir()
|
|
|
|
result = cleanup_old_backups(confirm=True)
|
|
assert result.get("unparseable") == ["not-a-timestamp"]
|
|
assert bad_dir.exists()
|
|
assert result["kept"] == 1
|
|
finally:
|
|
config.settings = original
|