feat: post-backup validation + retention cleanup (Tasks B & C)
- create_runtime_backup() now auto-validates its output and includes validated/validation_errors fields in returned metadata - New cleanup_old_backups() with retention policy: 7 daily, 4 weekly (Sundays), 6 monthly (1st of month), dry-run by default - CLI `cleanup` subcommand added to backup module - 9 new tests (2 validation + 7 retention), 259 total passing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -103,12 +103,27 @@ def create_runtime_backup(
|
|||||||
encoding="utf-8",
|
encoding="utf-8",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Automatic post-backup validation. Failures log a warning but do
|
||||||
|
# not raise — the backup files are still on disk and may be useful.
|
||||||
|
validation = validate_backup(stamp)
|
||||||
|
validated = validation.get("valid", False)
|
||||||
|
validation_errors = validation.get("errors", [])
|
||||||
|
if not validated:
|
||||||
|
log.warning(
|
||||||
|
"post_backup_validation_failed",
|
||||||
|
backup_root=str(backup_root),
|
||||||
|
errors=validation_errors,
|
||||||
|
)
|
||||||
|
metadata["validated"] = validated
|
||||||
|
metadata["validation_errors"] = validation_errors
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"runtime_backup_created",
|
"runtime_backup_created",
|
||||||
backup_root=str(backup_root),
|
backup_root=str(backup_root),
|
||||||
db_snapshot=str(db_snapshot_path),
|
db_snapshot=str(db_snapshot_path),
|
||||||
chroma_included=include_chroma,
|
chroma_included=include_chroma,
|
||||||
chroma_bytes=chroma_bytes_copied,
|
chroma_bytes=chroma_bytes_copied,
|
||||||
|
validated=validated,
|
||||||
)
|
)
|
||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
@@ -389,6 +404,113 @@ def restore_runtime_backup(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_old_backups(*, confirm: bool = False) -> dict:
|
||||||
|
"""Apply retention policy and remove old snapshots.
|
||||||
|
|
||||||
|
Retention keeps:
|
||||||
|
- Last 7 daily snapshots (most recent per calendar day)
|
||||||
|
- Last 4 weekly snapshots (most recent on each Sunday)
|
||||||
|
- Last 6 monthly snapshots (most recent on the 1st of each month)
|
||||||
|
|
||||||
|
All other snapshots are candidates for deletion. Runs as dry-run by
|
||||||
|
default; pass ``confirm=True`` to actually delete.
|
||||||
|
|
||||||
|
Returns a dict with kept/deleted counts and any errors.
|
||||||
|
"""
|
||||||
|
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
|
||||||
|
if not snapshots_root.exists() or not snapshots_root.is_dir():
|
||||||
|
return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []}
|
||||||
|
|
||||||
|
# Parse all stamp directories into (datetime, dir_path) pairs.
|
||||||
|
stamps: list[tuple[datetime, Path]] = []
|
||||||
|
unparseable: list[str] = []
|
||||||
|
for entry in sorted(snapshots_root.iterdir()):
|
||||||
|
if not entry.is_dir():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
|
||||||
|
stamps.append((dt, entry))
|
||||||
|
except ValueError:
|
||||||
|
unparseable.append(entry.name)
|
||||||
|
|
||||||
|
if not stamps:
|
||||||
|
return {
|
||||||
|
"kept": 0, "deleted": 0, "would_delete": 0,
|
||||||
|
"dry_run": not confirm, "errors": [],
|
||||||
|
"unparseable": unparseable,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Sort newest first so "most recent per bucket" is a simple first-seen.
|
||||||
|
stamps.sort(key=lambda t: t[0], reverse=True)
|
||||||
|
|
||||||
|
keep_set: set[Path] = set()
|
||||||
|
|
||||||
|
# Last 7 daily: most recent snapshot per calendar day.
|
||||||
|
seen_days: set[str] = set()
|
||||||
|
for dt, path in stamps:
|
||||||
|
day_key = dt.strftime("%Y-%m-%d")
|
||||||
|
if day_key not in seen_days:
|
||||||
|
seen_days.add(day_key)
|
||||||
|
keep_set.add(path)
|
||||||
|
if len(seen_days) >= 7:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Last 4 weekly: most recent snapshot that falls on a Sunday.
|
||||||
|
seen_weeks: set[str] = set()
|
||||||
|
for dt, path in stamps:
|
||||||
|
if dt.weekday() == 6: # Sunday
|
||||||
|
week_key = dt.strftime("%Y-W%W")
|
||||||
|
if week_key not in seen_weeks:
|
||||||
|
seen_weeks.add(week_key)
|
||||||
|
keep_set.add(path)
|
||||||
|
if len(seen_weeks) >= 4:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Last 6 monthly: most recent snapshot on the 1st of a month.
|
||||||
|
seen_months: set[str] = set()
|
||||||
|
for dt, path in stamps:
|
||||||
|
if dt.day == 1:
|
||||||
|
month_key = dt.strftime("%Y-%m")
|
||||||
|
if month_key not in seen_months:
|
||||||
|
seen_months.add(month_key)
|
||||||
|
keep_set.add(path)
|
||||||
|
if len(seen_months) >= 6:
|
||||||
|
break
|
||||||
|
|
||||||
|
to_delete = [path for _, path in stamps if path not in keep_set]
|
||||||
|
|
||||||
|
errors: list[str] = []
|
||||||
|
deleted_count = 0
|
||||||
|
if confirm:
|
||||||
|
for path in to_delete:
|
||||||
|
try:
|
||||||
|
shutil.rmtree(path)
|
||||||
|
deleted_count += 1
|
||||||
|
except OSError as exc:
|
||||||
|
errors.append(f"{path.name}: {exc}")
|
||||||
|
|
||||||
|
result: dict = {
|
||||||
|
"kept": len(keep_set),
|
||||||
|
"dry_run": not confirm,
|
||||||
|
"errors": errors,
|
||||||
|
}
|
||||||
|
if confirm:
|
||||||
|
result["deleted"] = deleted_count
|
||||||
|
else:
|
||||||
|
result["would_delete"] = len(to_delete)
|
||||||
|
if unparseable:
|
||||||
|
result["unparseable"] = unparseable
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"cleanup_old_backups",
|
||||||
|
kept=len(keep_set),
|
||||||
|
deleted=deleted_count if confirm else 0,
|
||||||
|
would_delete=len(to_delete) if not confirm else 0,
|
||||||
|
dry_run=not confirm,
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
|
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
|
||||||
source_conn = sqlite3.connect(str(source_path))
|
source_conn = sqlite3.connect(str(source_path))
|
||||||
dest_conn = sqlite3.connect(str(dest_path))
|
dest_conn = sqlite3.connect(str(dest_path))
|
||||||
@@ -448,6 +570,13 @@ def main() -> None:
|
|||||||
p_validate = sub.add_parser("validate", help="validate a snapshot by stamp")
|
p_validate = sub.add_parser("validate", help="validate a snapshot by stamp")
|
||||||
p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)")
|
p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)")
|
||||||
|
|
||||||
|
p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy")
|
||||||
|
p_cleanup.add_argument(
|
||||||
|
"--confirm",
|
||||||
|
action="store_true",
|
||||||
|
help="actually delete (default is dry-run)",
|
||||||
|
)
|
||||||
|
|
||||||
p_restore = sub.add_parser(
|
p_restore = sub.add_parser(
|
||||||
"restore",
|
"restore",
|
||||||
help="restore a snapshot by stamp (service must be stopped)",
|
help="restore a snapshot by stamp (service must be stopped)",
|
||||||
@@ -488,6 +617,8 @@ def main() -> None:
|
|||||||
result = {"backups": list_runtime_backups()}
|
result = {"backups": list_runtime_backups()}
|
||||||
elif command == "validate":
|
elif command == "validate":
|
||||||
result = validate_backup(args.stamp)
|
result = validate_backup(args.stamp)
|
||||||
|
elif command == "cleanup":
|
||||||
|
result = cleanup_old_backups(confirm=getattr(args, "confirm", False))
|
||||||
elif command == "restore":
|
elif command == "restore":
|
||||||
result = restore_runtime_backup(
|
result = restore_runtime_backup(
|
||||||
args.stamp,
|
args.stamp,
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
"""Tests for runtime backup creation and restore."""
|
"""Tests for runtime backup creation, restore, and retention cleanup."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from datetime import UTC, datetime
|
from datetime import UTC, datetime, timedelta
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import atocore.config as config
|
import atocore.config as config
|
||||||
from atocore.models.database import init_db
|
from atocore.models.database import init_db
|
||||||
from atocore.ops.backup import (
|
from atocore.ops.backup import (
|
||||||
|
cleanup_old_backups,
|
||||||
create_runtime_backup,
|
create_runtime_backup,
|
||||||
list_runtime_backups,
|
list_runtime_backups,
|
||||||
restore_runtime_backup,
|
restore_runtime_backup,
|
||||||
@@ -413,6 +414,56 @@ def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch):
|
|||||||
config.settings = original_settings
|
config.settings = original_settings
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_backup_includes_validation_fields(tmp_path, monkeypatch):
|
||||||
|
"""Task B: create_runtime_backup auto-validates and reports result."""
|
||||||
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
||||||
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
||||||
|
monkeypatch.setenv(
|
||||||
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
||||||
|
)
|
||||||
|
|
||||||
|
original_settings = config.settings
|
||||||
|
try:
|
||||||
|
config.settings = config.Settings()
|
||||||
|
init_db()
|
||||||
|
result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC))
|
||||||
|
finally:
|
||||||
|
config.settings = original_settings
|
||||||
|
|
||||||
|
assert "validated" in result
|
||||||
|
assert "validation_errors" in result
|
||||||
|
assert result["validated"] is True
|
||||||
|
assert result["validation_errors"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch):
|
||||||
|
"""Task B: if post-backup validation fails, backup still returns metadata."""
|
||||||
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
||||||
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
||||||
|
monkeypatch.setenv(
|
||||||
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _broken_validate(stamp):
|
||||||
|
return {"valid": False, "errors": ["db_missing", "metadata_missing"]}
|
||||||
|
|
||||||
|
original_settings = config.settings
|
||||||
|
try:
|
||||||
|
config.settings = config.Settings()
|
||||||
|
init_db()
|
||||||
|
monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate)
|
||||||
|
result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC))
|
||||||
|
finally:
|
||||||
|
config.settings = original_settings
|
||||||
|
|
||||||
|
# Should NOT have raised — backup still returned metadata
|
||||||
|
assert result["validated"] is False
|
||||||
|
assert result["validation_errors"] == ["db_missing", "metadata_missing"]
|
||||||
|
# Core backup fields still present
|
||||||
|
assert "db_snapshot_path" in result
|
||||||
|
assert "created_at" in result
|
||||||
|
|
||||||
|
|
||||||
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
|
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
|
||||||
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
|
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
|
||||||
|
|
||||||
@@ -457,3 +508,183 @@ def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
|
|||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
config.settings = original_settings
|
config.settings = original_settings
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Task C: Backup retention cleanup
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_cleanup_env(tmp_path, monkeypatch):
|
||||||
|
"""Helper: configure env, init db, return snapshots_root."""
|
||||||
|
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
|
||||||
|
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
|
||||||
|
monkeypatch.setenv(
|
||||||
|
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
|
||||||
|
)
|
||||||
|
original = config.settings
|
||||||
|
config.settings = config.Settings()
|
||||||
|
init_db()
|
||||||
|
snapshots_root = config.settings.resolved_backup_dir / "snapshots"
|
||||||
|
snapshots_root.mkdir(parents=True, exist_ok=True)
|
||||||
|
return original, snapshots_root
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_snapshots(snapshots_root, dates):
|
||||||
|
"""Create minimal valid snapshot dirs for the given datetimes."""
|
||||||
|
for dt in dates:
|
||||||
|
stamp = dt.strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
snap_dir = snapshots_root / stamp
|
||||||
|
db_dir = snap_dir / "db"
|
||||||
|
db_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
db_path = db_dir / "atocore.db"
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)")
|
||||||
|
conn.close()
|
||||||
|
metadata = {
|
||||||
|
"created_at": dt.isoformat(),
|
||||||
|
"backup_root": str(snap_dir),
|
||||||
|
"db_snapshot_path": str(db_path),
|
||||||
|
"db_size_bytes": db_path.stat().st_size,
|
||||||
|
"registry_snapshot_path": "",
|
||||||
|
"chroma_snapshot_path": "",
|
||||||
|
"chroma_snapshot_bytes": 0,
|
||||||
|
"chroma_snapshot_files": 0,
|
||||||
|
"chroma_snapshot_included": False,
|
||||||
|
"vector_store_note": "",
|
||||||
|
}
|
||||||
|
(snap_dir / "backup-metadata.json").write_text(
|
||||||
|
json.dumps(metadata, indent=2) + "\n", encoding="utf-8"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_empty_dir(tmp_path, monkeypatch):
|
||||||
|
original, _ = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
result = cleanup_old_backups()
|
||||||
|
assert result["kept"] == 0
|
||||||
|
assert result["would_delete"] == 0
|
||||||
|
assert result["dry_run"] is True
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch):
|
||||||
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
# 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly).
|
||||||
|
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
|
||||||
|
dates = [base + timedelta(days=i) for i in range(10)]
|
||||||
|
_seed_snapshots(snapshots_root, dates)
|
||||||
|
|
||||||
|
result = cleanup_old_backups()
|
||||||
|
assert result["dry_run"] is True
|
||||||
|
# 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily.
|
||||||
|
# Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly.
|
||||||
|
# So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily.
|
||||||
|
# But Apr 5 is the 8th newest day from Apr 11... wait.
|
||||||
|
# Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily.
|
||||||
|
# Remaining: Apr 4,3,2. Apr 5 is already in daily.
|
||||||
|
# None of Apr 4,3,2 are Sunday or 1st → all 3 deleted.
|
||||||
|
assert result["kept"] == 7
|
||||||
|
assert result["would_delete"] == 3
|
||||||
|
assert len(list(snapshots_root.iterdir())) == 10
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_confirm_deletes(tmp_path, monkeypatch):
|
||||||
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
|
||||||
|
dates = [base + timedelta(days=i) for i in range(10)]
|
||||||
|
_seed_snapshots(snapshots_root, dates)
|
||||||
|
|
||||||
|
result = cleanup_old_backups(confirm=True)
|
||||||
|
assert result["dry_run"] is False
|
||||||
|
assert result["deleted"] == 3
|
||||||
|
assert result["kept"] == 7
|
||||||
|
assert len(list(snapshots_root.iterdir())) == 7
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch):
|
||||||
|
"""Exactly 7 snapshots on different days → all kept."""
|
||||||
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
||||||
|
dates = [base + timedelta(days=i) for i in range(7)]
|
||||||
|
_seed_snapshots(snapshots_root, dates)
|
||||||
|
|
||||||
|
result = cleanup_old_backups()
|
||||||
|
assert result["kept"] == 7
|
||||||
|
assert result["would_delete"] == 0
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch):
|
||||||
|
"""Snapshots on Sundays outside the 7-day window are kept as weekly."""
|
||||||
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
# 7 daily snapshots covering Apr 5-11
|
||||||
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
||||||
|
daily = [base + timedelta(days=i) for i in range(7)]
|
||||||
|
|
||||||
|
# 2 older Sunday snapshots
|
||||||
|
sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday
|
||||||
|
sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday
|
||||||
|
# A non-Sunday old snapshot that should be deleted
|
||||||
|
wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday
|
||||||
|
|
||||||
|
_seed_snapshots(snapshots_root, daily + [sun1, sun2, wed])
|
||||||
|
|
||||||
|
result = cleanup_old_backups()
|
||||||
|
# 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted
|
||||||
|
assert result["kept"] == 9
|
||||||
|
assert result["would_delete"] == 1
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch):
|
||||||
|
"""Snapshots on the 1st of a month outside daily+weekly are kept as monthly."""
|
||||||
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
# 7 daily in April 2026
|
||||||
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
||||||
|
daily = [base + timedelta(days=i) for i in range(7)]
|
||||||
|
|
||||||
|
# Old monthly 1st snapshots
|
||||||
|
m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC)
|
||||||
|
m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC)
|
||||||
|
# Old non-1st, non-Sunday snapshot — should be deleted
|
||||||
|
old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC)
|
||||||
|
|
||||||
|
_seed_snapshots(snapshots_root, daily + [m1, m2, old])
|
||||||
|
|
||||||
|
result = cleanup_old_backups()
|
||||||
|
# 7 daily + 2 monthly = 9 kept, 1 deleted
|
||||||
|
assert result["kept"] == 9
|
||||||
|
assert result["would_delete"] == 1
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch):
|
||||||
|
"""Directories with unparseable names are ignored, not deleted."""
|
||||||
|
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
|
||||||
|
try:
|
||||||
|
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
|
||||||
|
_seed_snapshots(snapshots_root, [base])
|
||||||
|
|
||||||
|
bad_dir = snapshots_root / "not-a-timestamp"
|
||||||
|
bad_dir.mkdir()
|
||||||
|
|
||||||
|
result = cleanup_old_backups(confirm=True)
|
||||||
|
assert result.get("unparseable") == ["not-a-timestamp"]
|
||||||
|
assert bad_dir.exists()
|
||||||
|
assert result["kept"] == 1
|
||||||
|
finally:
|
||||||
|
config.settings = original
|
||||||
|
|||||||
Reference in New Issue
Block a user