Files
ATOCore/src/atocore/ops/backup.py
Anto01 58c744fd2f feat: post-backup validation + retention cleanup (Tasks B & C)
- create_runtime_backup() now auto-validates its output and includes
  validated/validation_errors fields in returned metadata
- New cleanup_old_backups() with retention policy: 7 daily, 4 weekly
  (Sundays), 6 monthly (1st of month), dry-run by default
- CLI `cleanup` subcommand added to backup module
- 9 new tests (2 validation + 7 retention), 259 total passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-11 09:46:46 -04:00

637 lines
23 KiB
Python

"""Create safe runtime backups for the AtoCore machine store.
This module is intentionally conservative:
- The SQLite snapshot uses the online ``conn.backup()`` API and is safe to
call while the database is in use.
- The project registry snapshot is a simple file copy of the canonical
registry JSON.
- The Chroma snapshot is a *cold* directory copy. To stay safe it must be
taken while no ingestion is running. The recommended pattern from the API
layer is to acquire ``exclusive_ingestion()`` for the duration of the
backup so refreshes and ingestions cannot run concurrently with the copy.
The backup metadata file records what was actually included so restore
tooling does not have to guess.
"""
from __future__ import annotations
import json
import shutil
import sqlite3
from datetime import datetime, UTC
from pathlib import Path
import atocore.config as _config
from atocore.models.database import init_db
from atocore.observability.logger import get_logger
log = get_logger("backup")
def create_runtime_backup(
timestamp: datetime | None = None,
include_chroma: bool = False,
) -> dict:
"""Create a hot SQLite backup plus registry/config metadata.
When ``include_chroma`` is true the Chroma persistence directory is also
snapshotted as a cold directory copy. The caller is responsible for
ensuring no ingestion is running concurrently. The HTTP layer enforces
this by holding ``exclusive_ingestion()`` around the call.
"""
init_db()
now = timestamp or datetime.now(UTC)
stamp = now.strftime("%Y%m%dT%H%M%SZ")
backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp
db_backup_dir = backup_root / "db"
config_backup_dir = backup_root / "config"
chroma_backup_dir = backup_root / "chroma"
metadata_path = backup_root / "backup-metadata.json"
db_backup_dir.mkdir(parents=True, exist_ok=True)
config_backup_dir.mkdir(parents=True, exist_ok=True)
db_snapshot_path = db_backup_dir / _config.settings.db_path.name
_backup_sqlite_db(_config.settings.db_path, db_snapshot_path)
registry_snapshot = None
registry_path = _config.settings.resolved_project_registry_path
if registry_path.exists():
registry_snapshot = config_backup_dir / registry_path.name
registry_snapshot.write_text(
registry_path.read_text(encoding="utf-8"), encoding="utf-8"
)
chroma_snapshot_path = ""
chroma_files_copied = 0
chroma_bytes_copied = 0
if include_chroma:
source_chroma = _config.settings.chroma_path
if source_chroma.exists() and source_chroma.is_dir():
chroma_backup_dir.mkdir(parents=True, exist_ok=True)
chroma_files_copied, chroma_bytes_copied = _copy_directory_tree(
source_chroma, chroma_backup_dir
)
chroma_snapshot_path = str(chroma_backup_dir)
else:
log.info(
"chroma_snapshot_skipped_missing",
path=str(source_chroma),
)
metadata = {
"created_at": now.isoformat(),
"backup_root": str(backup_root),
"db_snapshot_path": str(db_snapshot_path),
"db_size_bytes": db_snapshot_path.stat().st_size,
"registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "",
"chroma_snapshot_path": chroma_snapshot_path,
"chroma_snapshot_bytes": chroma_bytes_copied,
"chroma_snapshot_files": chroma_files_copied,
"chroma_snapshot_included": include_chroma,
"vector_store_note": (
"Chroma snapshot included as cold directory copy."
if include_chroma and chroma_snapshot_path
else "Chroma hot backup is not included; rerun with include_chroma=True under exclusive_ingestion()."
),
}
metadata_path.write_text(
json.dumps(metadata, indent=2, ensure_ascii=True) + "\n",
encoding="utf-8",
)
# Automatic post-backup validation. Failures log a warning but do
# not raise — the backup files are still on disk and may be useful.
validation = validate_backup(stamp)
validated = validation.get("valid", False)
validation_errors = validation.get("errors", [])
if not validated:
log.warning(
"post_backup_validation_failed",
backup_root=str(backup_root),
errors=validation_errors,
)
metadata["validated"] = validated
metadata["validation_errors"] = validation_errors
log.info(
"runtime_backup_created",
backup_root=str(backup_root),
db_snapshot=str(db_snapshot_path),
chroma_included=include_chroma,
chroma_bytes=chroma_bytes_copied,
validated=validated,
)
return metadata
def list_runtime_backups() -> list[dict]:
"""List all runtime backups under the configured backup directory."""
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
if not snapshots_root.exists() or not snapshots_root.is_dir():
return []
entries: list[dict] = []
for snapshot_dir in sorted(snapshots_root.iterdir()):
if not snapshot_dir.is_dir():
continue
metadata_path = snapshot_dir / "backup-metadata.json"
entry: dict = {
"stamp": snapshot_dir.name,
"path": str(snapshot_dir),
"has_metadata": metadata_path.exists(),
}
if metadata_path.exists():
try:
entry["metadata"] = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
entry["metadata"] = None
entry["metadata_error"] = "invalid_json"
entries.append(entry)
return entries
def validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable.
Checks:
- the snapshot directory exists
- the SQLite snapshot is openable and ``PRAGMA integrity_check`` returns ok
- the registry snapshot, if recorded, parses as JSON
- the chroma snapshot directory, if recorded, exists
"""
snapshot_dir = _config.settings.resolved_backup_dir / "snapshots" / stamp
result: dict = {
"stamp": stamp,
"path": str(snapshot_dir),
"exists": snapshot_dir.exists(),
"db_ok": False,
"registry_ok": None,
"chroma_ok": None,
"errors": [],
}
if not snapshot_dir.exists():
result["errors"].append("snapshot_directory_missing")
return result
metadata_path = snapshot_dir / "backup-metadata.json"
if not metadata_path.exists():
result["errors"].append("metadata_missing")
return result
try:
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
result["errors"].append(f"metadata_invalid_json: {exc}")
return result
result["metadata"] = metadata
db_path = Path(metadata.get("db_snapshot_path", ""))
if not db_path.exists():
result["errors"].append("db_snapshot_missing")
else:
try:
with sqlite3.connect(str(db_path)) as conn:
row = conn.execute("PRAGMA integrity_check").fetchone()
result["db_ok"] = bool(row and row[0] == "ok")
if not result["db_ok"]:
result["errors"].append(
f"db_integrity_check_failed: {row[0] if row else 'no_row'}"
)
except sqlite3.DatabaseError as exc:
result["errors"].append(f"db_open_failed: {exc}")
registry_snapshot_path = metadata.get("registry_snapshot_path", "")
if registry_snapshot_path:
registry_path = Path(registry_snapshot_path)
if not registry_path.exists():
result["registry_ok"] = False
result["errors"].append("registry_snapshot_missing")
else:
try:
json.loads(registry_path.read_text(encoding="utf-8"))
result["registry_ok"] = True
except json.JSONDecodeError as exc:
result["registry_ok"] = False
result["errors"].append(f"registry_invalid_json: {exc}")
chroma_snapshot_path = metadata.get("chroma_snapshot_path", "")
if chroma_snapshot_path:
chroma_dir = Path(chroma_snapshot_path)
if chroma_dir.exists() and chroma_dir.is_dir():
result["chroma_ok"] = True
else:
result["chroma_ok"] = False
result["errors"].append("chroma_snapshot_missing")
result["valid"] = not result["errors"]
return result
def restore_runtime_backup(
stamp: str,
*,
include_chroma: bool | None = None,
pre_restore_snapshot: bool = True,
confirm_service_stopped: bool = False,
) -> dict:
"""Restore a previously captured runtime backup.
CRITICAL: the AtoCore service MUST be stopped before calling this.
Overwriting a live SQLite database corrupts state and can break
the running container's open connections. The caller must pass
``confirm_service_stopped=True`` as an explicit acknowledgment —
otherwise this function refuses to run.
The restore procedure:
1. Validate the backup via ``validate_backup``; refuse on any error.
2. (default) Create a pre-restore safety snapshot of the CURRENT
state so the restore itself is reversible. The snapshot stamp
is returned in the result for the operator to record.
3. Remove stale SQLite WAL/SHM sidecar files next to the target db
before copying — the snapshot is a self-contained main-file
image from ``conn.backup()``, and leftover WAL/SHM from the old
live db would desync against the restored main file.
4. Copy the snapshot db over the target db path.
5. Restore the project registry file if the snapshot captured one.
6. Restore the Chroma directory if ``include_chroma`` resolves to
true. When ``include_chroma is None`` the function defers to
whether the snapshot captured Chroma (the common case).
7. Run ``PRAGMA integrity_check`` on the restored db and report
the result.
Returns a dict describing what was restored. On refused restore
(service still running, validation failed) raises ``RuntimeError``.
"""
if not confirm_service_stopped:
raise RuntimeError(
"restore_runtime_backup refuses to run without "
"confirm_service_stopped=True — stop the AtoCore container "
"first (e.g. `docker compose down` from deploy/dalidou) "
"before calling this function"
)
validation = validate_backup(stamp)
if not validation.get("valid"):
raise RuntimeError(
f"backup {stamp} failed validation: {validation.get('errors')}"
)
metadata = validation.get("metadata") or {}
pre_snapshot_stamp: str | None = None
if pre_restore_snapshot:
pre = create_runtime_backup(include_chroma=False)
pre_snapshot_stamp = Path(pre["backup_root"]).name
target_db = _config.settings.db_path
source_db = Path(metadata.get("db_snapshot_path", ""))
if not source_db.exists():
raise RuntimeError(
f"db snapshot not found at {source_db} — backup "
f"metadata may be stale"
)
# Force sqlite to flush any lingering WAL into the main file and
# release OS-level file handles on -wal/-shm before we swap the
# main file. Passing through conn.backup() in the pre-restore
# snapshot can leave sidecars momentarily locked on Windows;
# an explicit checkpoint(TRUNCATE) is the reliable way to flush
# and release. Best-effort: if the target db can't be opened
# (missing, corrupt), fall through and trust the copy step.
if target_db.exists():
try:
with sqlite3.connect(str(target_db)) as checkpoint_conn:
checkpoint_conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
except sqlite3.DatabaseError as exc:
log.warning(
"restore_pre_checkpoint_failed",
target_db=str(target_db),
error=str(exc),
)
# Remove stale WAL/SHM sidecars from the old live db so SQLite
# can't read inconsistent state on next open. Tolerant to
# Windows file-lock races — the subsequent copy replaces the
# main file anyway, and the integrity check afterward is the
# actual correctness signal.
wal_path = target_db.with_name(target_db.name + "-wal")
shm_path = target_db.with_name(target_db.name + "-shm")
for stale in (wal_path, shm_path):
if stale.exists():
try:
stale.unlink()
except OSError as exc:
log.warning(
"restore_sidecar_unlink_failed",
path=str(stale),
error=str(exc),
)
target_db.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_db, target_db)
registry_restored = False
registry_snapshot_path = metadata.get("registry_snapshot_path", "")
if registry_snapshot_path:
src_reg = Path(registry_snapshot_path)
if src_reg.exists():
dst_reg = _config.settings.resolved_project_registry_path
dst_reg.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_reg, dst_reg)
registry_restored = True
chroma_snapshot_path = metadata.get("chroma_snapshot_path", "")
if include_chroma is None:
include_chroma = bool(chroma_snapshot_path)
chroma_restored = False
if include_chroma and chroma_snapshot_path:
src_chroma = Path(chroma_snapshot_path)
if src_chroma.exists() and src_chroma.is_dir():
dst_chroma = _config.settings.chroma_path
# Do NOT rmtree the destination itself: in a Dockerized
# deployment the chroma dir is a bind-mounted volume, and
# unlinking a mount point raises
# OSError [Errno 16] Device or resource busy.
# Instead, clear the directory's CONTENTS and copytree into
# it with dirs_exist_ok=True. This is equivalent to an
# rmtree+copytree for restore purposes but stays inside the
# mount boundary. Discovered during the first real restore
# drill on Dalidou (2026-04-09).
dst_chroma.mkdir(parents=True, exist_ok=True)
for item in dst_chroma.iterdir():
if item.is_dir() and not item.is_symlink():
shutil.rmtree(item)
else:
item.unlink()
shutil.copytree(src_chroma, dst_chroma, dirs_exist_ok=True)
chroma_restored = True
restored_integrity_ok = False
integrity_error: str | None = None
try:
with sqlite3.connect(str(target_db)) as conn:
row = conn.execute("PRAGMA integrity_check").fetchone()
restored_integrity_ok = bool(row and row[0] == "ok")
if not restored_integrity_ok:
integrity_error = row[0] if row else "no_row"
except sqlite3.DatabaseError as exc:
integrity_error = f"db_open_failed: {exc}"
result: dict = {
"stamp": stamp,
"pre_restore_snapshot": pre_snapshot_stamp,
"target_db": str(target_db),
"db_restored": True,
"registry_restored": registry_restored,
"chroma_restored": chroma_restored,
"restored_integrity_ok": restored_integrity_ok,
}
if integrity_error:
result["integrity_error"] = integrity_error
log.info(
"runtime_backup_restored",
stamp=stamp,
pre_restore_snapshot=pre_snapshot_stamp,
registry_restored=registry_restored,
chroma_restored=chroma_restored,
integrity_ok=restored_integrity_ok,
)
return result
def cleanup_old_backups(*, confirm: bool = False) -> dict:
"""Apply retention policy and remove old snapshots.
Retention keeps:
- Last 7 daily snapshots (most recent per calendar day)
- Last 4 weekly snapshots (most recent on each Sunday)
- Last 6 monthly snapshots (most recent on the 1st of each month)
All other snapshots are candidates for deletion. Runs as dry-run by
default; pass ``confirm=True`` to actually delete.
Returns a dict with kept/deleted counts and any errors.
"""
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
if not snapshots_root.exists() or not snapshots_root.is_dir():
return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []}
# Parse all stamp directories into (datetime, dir_path) pairs.
stamps: list[tuple[datetime, Path]] = []
unparseable: list[str] = []
for entry in sorted(snapshots_root.iterdir()):
if not entry.is_dir():
continue
try:
dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
stamps.append((dt, entry))
except ValueError:
unparseable.append(entry.name)
if not stamps:
return {
"kept": 0, "deleted": 0, "would_delete": 0,
"dry_run": not confirm, "errors": [],
"unparseable": unparseable,
}
# Sort newest first so "most recent per bucket" is a simple first-seen.
stamps.sort(key=lambda t: t[0], reverse=True)
keep_set: set[Path] = set()
# Last 7 daily: most recent snapshot per calendar day.
seen_days: set[str] = set()
for dt, path in stamps:
day_key = dt.strftime("%Y-%m-%d")
if day_key not in seen_days:
seen_days.add(day_key)
keep_set.add(path)
if len(seen_days) >= 7:
break
# Last 4 weekly: most recent snapshot that falls on a Sunday.
seen_weeks: set[str] = set()
for dt, path in stamps:
if dt.weekday() == 6: # Sunday
week_key = dt.strftime("%Y-W%W")
if week_key not in seen_weeks:
seen_weeks.add(week_key)
keep_set.add(path)
if len(seen_weeks) >= 4:
break
# Last 6 monthly: most recent snapshot on the 1st of a month.
seen_months: set[str] = set()
for dt, path in stamps:
if dt.day == 1:
month_key = dt.strftime("%Y-%m")
if month_key not in seen_months:
seen_months.add(month_key)
keep_set.add(path)
if len(seen_months) >= 6:
break
to_delete = [path for _, path in stamps if path not in keep_set]
errors: list[str] = []
deleted_count = 0
if confirm:
for path in to_delete:
try:
shutil.rmtree(path)
deleted_count += 1
except OSError as exc:
errors.append(f"{path.name}: {exc}")
result: dict = {
"kept": len(keep_set),
"dry_run": not confirm,
"errors": errors,
}
if confirm:
result["deleted"] = deleted_count
else:
result["would_delete"] = len(to_delete)
if unparseable:
result["unparseable"] = unparseable
log.info(
"cleanup_old_backups",
kept=len(keep_set),
deleted=deleted_count if confirm else 0,
would_delete=len(to_delete) if not confirm else 0,
dry_run=not confirm,
)
return result
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn = sqlite3.connect(str(source_path))
dest_conn = sqlite3.connect(str(dest_path))
try:
source_conn.backup(dest_conn)
finally:
dest_conn.close()
source_conn.close()
def _copy_directory_tree(source: Path, dest: Path) -> tuple[int, int]:
"""Copy a directory tree and return (file_count, total_bytes)."""
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(source, dest)
file_count = 0
total_bytes = 0
for path in dest.rglob("*"):
if path.is_file():
file_count += 1
total_bytes += path.stat().st_size
return file_count, total_bytes
def main() -> None:
"""CLI entry point for the backup module.
Supports four subcommands:
- ``create`` run ``create_runtime_backup`` (default if none given)
- ``list`` list all runtime backup snapshots
- ``validate`` validate a specific snapshot by stamp
- ``restore`` restore a specific snapshot by stamp
The restore subcommand is the one used by the backup/restore drill
and MUST be run only when the AtoCore service is stopped. It takes
``--confirm-service-stopped`` as an explicit acknowledgment.
"""
import argparse
parser = argparse.ArgumentParser(
prog="python -m atocore.ops.backup",
description="AtoCore runtime backup create/list/validate/restore",
)
sub = parser.add_subparsers(dest="command")
p_create = sub.add_parser("create", help="create a new runtime backup")
p_create.add_argument(
"--chroma",
action="store_true",
help="also snapshot the Chroma vector store (cold copy)",
)
sub.add_parser("list", help="list runtime backup snapshots")
p_validate = sub.add_parser("validate", help="validate a snapshot by stamp")
p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)")
p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy")
p_cleanup.add_argument(
"--confirm",
action="store_true",
help="actually delete (default is dry-run)",
)
p_restore = sub.add_parser(
"restore",
help="restore a snapshot by stamp (service must be stopped)",
)
p_restore.add_argument("stamp", help="snapshot stamp to restore")
p_restore.add_argument(
"--confirm-service-stopped",
action="store_true",
help="explicit acknowledgment that the AtoCore container is stopped",
)
p_restore.add_argument(
"--no-pre-snapshot",
action="store_true",
help="skip the pre-restore safety snapshot of current state",
)
chroma_group = p_restore.add_mutually_exclusive_group()
chroma_group.add_argument(
"--chroma",
dest="include_chroma",
action="store_true",
default=None,
help="force-restore the Chroma snapshot",
)
chroma_group.add_argument(
"--no-chroma",
dest="include_chroma",
action="store_false",
help="skip the Chroma snapshot even if it was captured",
)
args = parser.parse_args()
command = args.command or "create"
if command == "create":
include_chroma = getattr(args, "chroma", False)
result = create_runtime_backup(include_chroma=include_chroma)
elif command == "list":
result = {"backups": list_runtime_backups()}
elif command == "validate":
result = validate_backup(args.stamp)
elif command == "cleanup":
result = cleanup_old_backups(confirm=getattr(args, "confirm", False))
elif command == "restore":
result = restore_runtime_backup(
args.stamp,
include_chroma=args.include_chroma,
pre_restore_snapshot=not args.no_pre_snapshot,
confirm_service_stopped=args.confirm_service_stopped,
)
else: # pragma: no cover — argparse guards this
parser.error(f"unknown command: {command}")
print(json.dumps(result, indent=2, ensure_ascii=True))
if __name__ == "__main__":
main()