"""Create safe runtime backups for the AtoCore machine store. This module is intentionally conservative: - The SQLite snapshot uses the online ``conn.backup()`` API and is safe to call while the database is in use. - The project registry snapshot is a simple file copy of the canonical registry JSON. - The Chroma snapshot is a *cold* directory copy. To stay safe it must be taken while no ingestion is running. The recommended pattern from the API layer is to acquire ``exclusive_ingestion()`` for the duration of the backup so refreshes and ingestions cannot run concurrently with the copy. The backup metadata file records what was actually included so restore tooling does not have to guess. """ from __future__ import annotations import json import shutil import sqlite3 from datetime import datetime, UTC from pathlib import Path import atocore.config as _config from atocore.models.database import init_db from atocore.observability.logger import get_logger log = get_logger("backup") def create_runtime_backup( timestamp: datetime | None = None, include_chroma: bool = False, ) -> dict: """Create a hot SQLite backup plus registry/config metadata. When ``include_chroma`` is true the Chroma persistence directory is also snapshotted as a cold directory copy. The caller is responsible for ensuring no ingestion is running concurrently. The HTTP layer enforces this by holding ``exclusive_ingestion()`` around the call. """ init_db() now = timestamp or datetime.now(UTC) stamp = now.strftime("%Y%m%dT%H%M%SZ") backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp db_backup_dir = backup_root / "db" config_backup_dir = backup_root / "config" chroma_backup_dir = backup_root / "chroma" metadata_path = backup_root / "backup-metadata.json" db_backup_dir.mkdir(parents=True, exist_ok=True) config_backup_dir.mkdir(parents=True, exist_ok=True) db_snapshot_path = db_backup_dir / _config.settings.db_path.name _backup_sqlite_db(_config.settings.db_path, db_snapshot_path) registry_snapshot = None registry_path = _config.settings.resolved_project_registry_path if registry_path.exists(): registry_snapshot = config_backup_dir / registry_path.name registry_snapshot.write_text( registry_path.read_text(encoding="utf-8"), encoding="utf-8" ) chroma_snapshot_path = "" chroma_files_copied = 0 chroma_bytes_copied = 0 if include_chroma: source_chroma = _config.settings.chroma_path if source_chroma.exists() and source_chroma.is_dir(): chroma_backup_dir.mkdir(parents=True, exist_ok=True) chroma_files_copied, chroma_bytes_copied = _copy_directory_tree( source_chroma, chroma_backup_dir ) chroma_snapshot_path = str(chroma_backup_dir) else: log.info( "chroma_snapshot_skipped_missing", path=str(source_chroma), ) metadata = { "created_at": now.isoformat(), "backup_root": str(backup_root), "db_snapshot_path": str(db_snapshot_path), "db_size_bytes": db_snapshot_path.stat().st_size, "registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "", "chroma_snapshot_path": chroma_snapshot_path, "chroma_snapshot_bytes": chroma_bytes_copied, "chroma_snapshot_files": chroma_files_copied, "chroma_snapshot_included": include_chroma, "vector_store_note": ( "Chroma snapshot included as cold directory copy." if include_chroma and chroma_snapshot_path else "Chroma hot backup is not included; rerun with include_chroma=True under exclusive_ingestion()." ), } metadata_path.write_text( json.dumps(metadata, indent=2, ensure_ascii=True) + "\n", encoding="utf-8", ) # Automatic post-backup validation. Failures log a warning but do # not raise — the backup files are still on disk and may be useful. validation = validate_backup(stamp) validated = validation.get("valid", False) validation_errors = validation.get("errors", []) if not validated: log.warning( "post_backup_validation_failed", backup_root=str(backup_root), errors=validation_errors, ) metadata["validated"] = validated metadata["validation_errors"] = validation_errors log.info( "runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path), chroma_included=include_chroma, chroma_bytes=chroma_bytes_copied, validated=validated, ) return metadata def list_runtime_backups() -> list[dict]: """List all runtime backups under the configured backup directory.""" snapshots_root = _config.settings.resolved_backup_dir / "snapshots" if not snapshots_root.exists() or not snapshots_root.is_dir(): return [] entries: list[dict] = [] for snapshot_dir in sorted(snapshots_root.iterdir()): if not snapshot_dir.is_dir(): continue metadata_path = snapshot_dir / "backup-metadata.json" entry: dict = { "stamp": snapshot_dir.name, "path": str(snapshot_dir), "has_metadata": metadata_path.exists(), } if metadata_path.exists(): try: entry["metadata"] = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError: entry["metadata"] = None entry["metadata_error"] = "invalid_json" entries.append(entry) return entries def validate_backup(stamp: str) -> dict: """Validate that a previously created backup is structurally usable. Checks: - the snapshot directory exists - the SQLite snapshot is openable and ``PRAGMA integrity_check`` returns ok - the registry snapshot, if recorded, parses as JSON - the chroma snapshot directory, if recorded, exists """ snapshot_dir = _config.settings.resolved_backup_dir / "snapshots" / stamp result: dict = { "stamp": stamp, "path": str(snapshot_dir), "exists": snapshot_dir.exists(), "db_ok": False, "registry_ok": None, "chroma_ok": None, "errors": [], } if not snapshot_dir.exists(): result["errors"].append("snapshot_directory_missing") return result metadata_path = snapshot_dir / "backup-metadata.json" if not metadata_path.exists(): result["errors"].append("metadata_missing") return result try: metadata = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: result["errors"].append(f"metadata_invalid_json: {exc}") return result result["metadata"] = metadata db_path = Path(metadata.get("db_snapshot_path", "")) if not db_path.exists(): result["errors"].append("db_snapshot_missing") else: try: with sqlite3.connect(str(db_path)) as conn: row = conn.execute("PRAGMA integrity_check").fetchone() result["db_ok"] = bool(row and row[0] == "ok") if not result["db_ok"]: result["errors"].append( f"db_integrity_check_failed: {row[0] if row else 'no_row'}" ) except sqlite3.DatabaseError as exc: result["errors"].append(f"db_open_failed: {exc}") registry_snapshot_path = metadata.get("registry_snapshot_path", "") if registry_snapshot_path: registry_path = Path(registry_snapshot_path) if not registry_path.exists(): result["registry_ok"] = False result["errors"].append("registry_snapshot_missing") else: try: json.loads(registry_path.read_text(encoding="utf-8")) result["registry_ok"] = True except json.JSONDecodeError as exc: result["registry_ok"] = False result["errors"].append(f"registry_invalid_json: {exc}") chroma_snapshot_path = metadata.get("chroma_snapshot_path", "") if chroma_snapshot_path: chroma_dir = Path(chroma_snapshot_path) if chroma_dir.exists() and chroma_dir.is_dir(): result["chroma_ok"] = True else: result["chroma_ok"] = False result["errors"].append("chroma_snapshot_missing") result["valid"] = not result["errors"] return result def restore_runtime_backup( stamp: str, *, include_chroma: bool | None = None, pre_restore_snapshot: bool = True, confirm_service_stopped: bool = False, ) -> dict: """Restore a previously captured runtime backup. CRITICAL: the AtoCore service MUST be stopped before calling this. Overwriting a live SQLite database corrupts state and can break the running container's open connections. The caller must pass ``confirm_service_stopped=True`` as an explicit acknowledgment — otherwise this function refuses to run. The restore procedure: 1. Validate the backup via ``validate_backup``; refuse on any error. 2. (default) Create a pre-restore safety snapshot of the CURRENT state so the restore itself is reversible. The snapshot stamp is returned in the result for the operator to record. 3. Remove stale SQLite WAL/SHM sidecar files next to the target db before copying — the snapshot is a self-contained main-file image from ``conn.backup()``, and leftover WAL/SHM from the old live db would desync against the restored main file. 4. Copy the snapshot db over the target db path. 5. Restore the project registry file if the snapshot captured one. 6. Restore the Chroma directory if ``include_chroma`` resolves to true. When ``include_chroma is None`` the function defers to whether the snapshot captured Chroma (the common case). 7. Run ``PRAGMA integrity_check`` on the restored db and report the result. Returns a dict describing what was restored. On refused restore (service still running, validation failed) raises ``RuntimeError``. """ if not confirm_service_stopped: raise RuntimeError( "restore_runtime_backup refuses to run without " "confirm_service_stopped=True — stop the AtoCore container " "first (e.g. `docker compose down` from deploy/dalidou) " "before calling this function" ) validation = validate_backup(stamp) if not validation.get("valid"): raise RuntimeError( f"backup {stamp} failed validation: {validation.get('errors')}" ) metadata = validation.get("metadata") or {} pre_snapshot_stamp: str | None = None if pre_restore_snapshot: pre = create_runtime_backup(include_chroma=False) pre_snapshot_stamp = Path(pre["backup_root"]).name target_db = _config.settings.db_path source_db = Path(metadata.get("db_snapshot_path", "")) if not source_db.exists(): raise RuntimeError( f"db snapshot not found at {source_db} — backup " f"metadata may be stale" ) # Force sqlite to flush any lingering WAL into the main file and # release OS-level file handles on -wal/-shm before we swap the # main file. Passing through conn.backup() in the pre-restore # snapshot can leave sidecars momentarily locked on Windows; # an explicit checkpoint(TRUNCATE) is the reliable way to flush # and release. Best-effort: if the target db can't be opened # (missing, corrupt), fall through and trust the copy step. if target_db.exists(): try: with sqlite3.connect(str(target_db)) as checkpoint_conn: checkpoint_conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") except sqlite3.DatabaseError as exc: log.warning( "restore_pre_checkpoint_failed", target_db=str(target_db), error=str(exc), ) # Remove stale WAL/SHM sidecars from the old live db so SQLite # can't read inconsistent state on next open. Tolerant to # Windows file-lock races — the subsequent copy replaces the # main file anyway, and the integrity check afterward is the # actual correctness signal. wal_path = target_db.with_name(target_db.name + "-wal") shm_path = target_db.with_name(target_db.name + "-shm") for stale in (wal_path, shm_path): if stale.exists(): try: stale.unlink() except OSError as exc: log.warning( "restore_sidecar_unlink_failed", path=str(stale), error=str(exc), ) target_db.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source_db, target_db) registry_restored = False registry_snapshot_path = metadata.get("registry_snapshot_path", "") if registry_snapshot_path: src_reg = Path(registry_snapshot_path) if src_reg.exists(): dst_reg = _config.settings.resolved_project_registry_path dst_reg.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src_reg, dst_reg) registry_restored = True chroma_snapshot_path = metadata.get("chroma_snapshot_path", "") if include_chroma is None: include_chroma = bool(chroma_snapshot_path) chroma_restored = False if include_chroma and chroma_snapshot_path: src_chroma = Path(chroma_snapshot_path) if src_chroma.exists() and src_chroma.is_dir(): dst_chroma = _config.settings.chroma_path # Do NOT rmtree the destination itself: in a Dockerized # deployment the chroma dir is a bind-mounted volume, and # unlinking a mount point raises # OSError [Errno 16] Device or resource busy. # Instead, clear the directory's CONTENTS and copytree into # it with dirs_exist_ok=True. This is equivalent to an # rmtree+copytree for restore purposes but stays inside the # mount boundary. Discovered during the first real restore # drill on Dalidou (2026-04-09). dst_chroma.mkdir(parents=True, exist_ok=True) for item in dst_chroma.iterdir(): if item.is_dir() and not item.is_symlink(): shutil.rmtree(item) else: item.unlink() shutil.copytree(src_chroma, dst_chroma, dirs_exist_ok=True) chroma_restored = True restored_integrity_ok = False integrity_error: str | None = None try: with sqlite3.connect(str(target_db)) as conn: row = conn.execute("PRAGMA integrity_check").fetchone() restored_integrity_ok = bool(row and row[0] == "ok") if not restored_integrity_ok: integrity_error = row[0] if row else "no_row" except sqlite3.DatabaseError as exc: integrity_error = f"db_open_failed: {exc}" result: dict = { "stamp": stamp, "pre_restore_snapshot": pre_snapshot_stamp, "target_db": str(target_db), "db_restored": True, "registry_restored": registry_restored, "chroma_restored": chroma_restored, "restored_integrity_ok": restored_integrity_ok, } if integrity_error: result["integrity_error"] = integrity_error log.info( "runtime_backup_restored", stamp=stamp, pre_restore_snapshot=pre_snapshot_stamp, registry_restored=registry_restored, chroma_restored=chroma_restored, integrity_ok=restored_integrity_ok, ) return result def cleanup_old_backups(*, confirm: bool = False) -> dict: """Apply retention policy and remove old snapshots. Retention keeps: - Last 7 daily snapshots (most recent per calendar day) - Last 4 weekly snapshots (most recent on each Sunday) - Last 6 monthly snapshots (most recent on the 1st of each month) All other snapshots are candidates for deletion. Runs as dry-run by default; pass ``confirm=True`` to actually delete. Returns a dict with kept/deleted counts and any errors. """ snapshots_root = _config.settings.resolved_backup_dir / "snapshots" if not snapshots_root.exists() or not snapshots_root.is_dir(): return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []} # Parse all stamp directories into (datetime, dir_path) pairs. stamps: list[tuple[datetime, Path]] = [] unparseable: list[str] = [] for entry in sorted(snapshots_root.iterdir()): if not entry.is_dir(): continue try: dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC) stamps.append((dt, entry)) except ValueError: unparseable.append(entry.name) if not stamps: return { "kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": [], "unparseable": unparseable, } # Sort newest first so "most recent per bucket" is a simple first-seen. stamps.sort(key=lambda t: t[0], reverse=True) keep_set: set[Path] = set() # Last 7 daily: most recent snapshot per calendar day. seen_days: set[str] = set() for dt, path in stamps: day_key = dt.strftime("%Y-%m-%d") if day_key not in seen_days: seen_days.add(day_key) keep_set.add(path) if len(seen_days) >= 7: break # Last 4 weekly: most recent snapshot that falls on a Sunday. seen_weeks: set[str] = set() for dt, path in stamps: if dt.weekday() == 6: # Sunday week_key = dt.strftime("%Y-W%W") if week_key not in seen_weeks: seen_weeks.add(week_key) keep_set.add(path) if len(seen_weeks) >= 4: break # Last 6 monthly: most recent snapshot on the 1st of a month. seen_months: set[str] = set() for dt, path in stamps: if dt.day == 1: month_key = dt.strftime("%Y-%m") if month_key not in seen_months: seen_months.add(month_key) keep_set.add(path) if len(seen_months) >= 6: break to_delete = [path for _, path in stamps if path not in keep_set] errors: list[str] = [] deleted_count = 0 if confirm: for path in to_delete: try: shutil.rmtree(path) deleted_count += 1 except OSError as exc: errors.append(f"{path.name}: {exc}") result: dict = { "kept": len(keep_set), "dry_run": not confirm, "errors": errors, } if confirm: result["deleted"] = deleted_count else: result["would_delete"] = len(to_delete) if unparseable: result["unparseable"] = unparseable log.info( "cleanup_old_backups", kept=len(keep_set), deleted=deleted_count if confirm else 0, would_delete=len(to_delete) if not confirm else 0, dry_run=not confirm, ) return result def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None: source_conn = sqlite3.connect(str(source_path)) dest_conn = sqlite3.connect(str(dest_path)) try: source_conn.backup(dest_conn) finally: dest_conn.close() source_conn.close() def _copy_directory_tree(source: Path, dest: Path) -> tuple[int, int]: """Copy a directory tree and return (file_count, total_bytes).""" if dest.exists(): shutil.rmtree(dest) shutil.copytree(source, dest) file_count = 0 total_bytes = 0 for path in dest.rglob("*"): if path.is_file(): file_count += 1 total_bytes += path.stat().st_size return file_count, total_bytes def main() -> None: """CLI entry point for the backup module. Supports four subcommands: - ``create`` run ``create_runtime_backup`` (default if none given) - ``list`` list all runtime backup snapshots - ``validate`` validate a specific snapshot by stamp - ``restore`` restore a specific snapshot by stamp The restore subcommand is the one used by the backup/restore drill and MUST be run only when the AtoCore service is stopped. It takes ``--confirm-service-stopped`` as an explicit acknowledgment. """ import argparse parser = argparse.ArgumentParser( prog="python -m atocore.ops.backup", description="AtoCore runtime backup create/list/validate/restore", ) sub = parser.add_subparsers(dest="command") p_create = sub.add_parser("create", help="create a new runtime backup") p_create.add_argument( "--chroma", action="store_true", help="also snapshot the Chroma vector store (cold copy)", ) sub.add_parser("list", help="list runtime backup snapshots") p_validate = sub.add_parser("validate", help="validate a snapshot by stamp") p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)") p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy") p_cleanup.add_argument( "--confirm", action="store_true", help="actually delete (default is dry-run)", ) p_restore = sub.add_parser( "restore", help="restore a snapshot by stamp (service must be stopped)", ) p_restore.add_argument("stamp", help="snapshot stamp to restore") p_restore.add_argument( "--confirm-service-stopped", action="store_true", help="explicit acknowledgment that the AtoCore container is stopped", ) p_restore.add_argument( "--no-pre-snapshot", action="store_true", help="skip the pre-restore safety snapshot of current state", ) chroma_group = p_restore.add_mutually_exclusive_group() chroma_group.add_argument( "--chroma", dest="include_chroma", action="store_true", default=None, help="force-restore the Chroma snapshot", ) chroma_group.add_argument( "--no-chroma", dest="include_chroma", action="store_false", help="skip the Chroma snapshot even if it was captured", ) args = parser.parse_args() command = args.command or "create" if command == "create": include_chroma = getattr(args, "chroma", False) result = create_runtime_backup(include_chroma=include_chroma) elif command == "list": result = {"backups": list_runtime_backups()} elif command == "validate": result = validate_backup(args.stamp) elif command == "cleanup": result = cleanup_old_backups(confirm=getattr(args, "confirm", False)) elif command == "restore": result = restore_runtime_backup( args.stamp, include_chroma=args.include_chroma, pre_restore_snapshot=not args.no_pre_snapshot, confirm_service_stopped=args.confirm_service_stopped, ) else: # pragma: no cover — argparse guards this parser.error(f"unknown command: {command}") print(json.dumps(result, indent=2, ensure_ascii=True)) if __name__ == "__main__": main()