"""Create safe runtime backups for the AtoCore machine store. This module is intentionally conservative: - The SQLite snapshot uses the online ``conn.backup()`` API and is safe to call while the database is in use. - The project registry snapshot is a simple file copy of the canonical registry JSON. - The Chroma snapshot is a *cold* directory copy. To stay safe it must be taken while no ingestion is running. The recommended pattern from the API layer is to acquire ``exclusive_ingestion()`` for the duration of the backup so refreshes and ingestions cannot run concurrently with the copy. The backup metadata file records what was actually included so restore tooling does not have to guess. """ from __future__ import annotations import json import shutil import sqlite3 from datetime import datetime, UTC from pathlib import Path import atocore.config as _config from atocore.models.database import init_db from atocore.observability.logger import get_logger log = get_logger("backup") def create_runtime_backup( timestamp: datetime | None = None, include_chroma: bool = False, ) -> dict: """Create a hot SQLite backup plus registry/config metadata. When ``include_chroma`` is true the Chroma persistence directory is also snapshotted as a cold directory copy. The caller is responsible for ensuring no ingestion is running concurrently. The HTTP layer enforces this by holding ``exclusive_ingestion()`` around the call. """ init_db() now = timestamp or datetime.now(UTC) stamp = now.strftime("%Y%m%dT%H%M%SZ") backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp db_backup_dir = backup_root / "db" config_backup_dir = backup_root / "config" chroma_backup_dir = backup_root / "chroma" metadata_path = backup_root / "backup-metadata.json" db_backup_dir.mkdir(parents=True, exist_ok=True) config_backup_dir.mkdir(parents=True, exist_ok=True) db_snapshot_path = db_backup_dir / _config.settings.db_path.name _backup_sqlite_db(_config.settings.db_path, db_snapshot_path) registry_snapshot = None registry_path = _config.settings.resolved_project_registry_path if registry_path.exists(): registry_snapshot = config_backup_dir / registry_path.name registry_snapshot.write_text( registry_path.read_text(encoding="utf-8"), encoding="utf-8" ) chroma_snapshot_path = "" chroma_files_copied = 0 chroma_bytes_copied = 0 if include_chroma: source_chroma = _config.settings.chroma_path if source_chroma.exists() and source_chroma.is_dir(): chroma_backup_dir.mkdir(parents=True, exist_ok=True) chroma_files_copied, chroma_bytes_copied = _copy_directory_tree( source_chroma, chroma_backup_dir ) chroma_snapshot_path = str(chroma_backup_dir) else: log.info( "chroma_snapshot_skipped_missing", path=str(source_chroma), ) metadata = { "created_at": now.isoformat(), "backup_root": str(backup_root), "db_snapshot_path": str(db_snapshot_path), "db_size_bytes": db_snapshot_path.stat().st_size, "registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "", "chroma_snapshot_path": chroma_snapshot_path, "chroma_snapshot_bytes": chroma_bytes_copied, "chroma_snapshot_files": chroma_files_copied, "chroma_snapshot_included": include_chroma, "vector_store_note": ( "Chroma snapshot included as cold directory copy." if include_chroma and chroma_snapshot_path else "Chroma hot backup is not included; rerun with include_chroma=True under exclusive_ingestion()." ), } metadata_path.write_text( json.dumps(metadata, indent=2, ensure_ascii=True) + "\n", encoding="utf-8", ) log.info( "runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path), chroma_included=include_chroma, chroma_bytes=chroma_bytes_copied, ) return metadata def list_runtime_backups() -> list[dict]: """List all runtime backups under the configured backup directory.""" snapshots_root = _config.settings.resolved_backup_dir / "snapshots" if not snapshots_root.exists() or not snapshots_root.is_dir(): return [] entries: list[dict] = [] for snapshot_dir in sorted(snapshots_root.iterdir()): if not snapshot_dir.is_dir(): continue metadata_path = snapshot_dir / "backup-metadata.json" entry: dict = { "stamp": snapshot_dir.name, "path": str(snapshot_dir), "has_metadata": metadata_path.exists(), } if metadata_path.exists(): try: entry["metadata"] = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError: entry["metadata"] = None entry["metadata_error"] = "invalid_json" entries.append(entry) return entries def validate_backup(stamp: str) -> dict: """Validate that a previously created backup is structurally usable. Checks: - the snapshot directory exists - the SQLite snapshot is openable and ``PRAGMA integrity_check`` returns ok - the registry snapshot, if recorded, parses as JSON - the chroma snapshot directory, if recorded, exists """ snapshot_dir = _config.settings.resolved_backup_dir / "snapshots" / stamp result: dict = { "stamp": stamp, "path": str(snapshot_dir), "exists": snapshot_dir.exists(), "db_ok": False, "registry_ok": None, "chroma_ok": None, "errors": [], } if not snapshot_dir.exists(): result["errors"].append("snapshot_directory_missing") return result metadata_path = snapshot_dir / "backup-metadata.json" if not metadata_path.exists(): result["errors"].append("metadata_missing") return result try: metadata = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: result["errors"].append(f"metadata_invalid_json: {exc}") return result result["metadata"] = metadata db_path = Path(metadata.get("db_snapshot_path", "")) if not db_path.exists(): result["errors"].append("db_snapshot_missing") else: try: with sqlite3.connect(str(db_path)) as conn: row = conn.execute("PRAGMA integrity_check").fetchone() result["db_ok"] = bool(row and row[0] == "ok") if not result["db_ok"]: result["errors"].append( f"db_integrity_check_failed: {row[0] if row else 'no_row'}" ) except sqlite3.DatabaseError as exc: result["errors"].append(f"db_open_failed: {exc}") registry_snapshot_path = metadata.get("registry_snapshot_path", "") if registry_snapshot_path: registry_path = Path(registry_snapshot_path) if not registry_path.exists(): result["registry_ok"] = False result["errors"].append("registry_snapshot_missing") else: try: json.loads(registry_path.read_text(encoding="utf-8")) result["registry_ok"] = True except json.JSONDecodeError as exc: result["registry_ok"] = False result["errors"].append(f"registry_invalid_json: {exc}") chroma_snapshot_path = metadata.get("chroma_snapshot_path", "") if chroma_snapshot_path: chroma_dir = Path(chroma_snapshot_path) if chroma_dir.exists() and chroma_dir.is_dir(): result["chroma_ok"] = True else: result["chroma_ok"] = False result["errors"].append("chroma_snapshot_missing") result["valid"] = not result["errors"] return result def restore_runtime_backup( stamp: str, *, include_chroma: bool | None = None, pre_restore_snapshot: bool = True, confirm_service_stopped: bool = False, ) -> dict: """Restore a previously captured runtime backup. CRITICAL: the AtoCore service MUST be stopped before calling this. Overwriting a live SQLite database corrupts state and can break the running container's open connections. The caller must pass ``confirm_service_stopped=True`` as an explicit acknowledgment — otherwise this function refuses to run. The restore procedure: 1. Validate the backup via ``validate_backup``; refuse on any error. 2. (default) Create a pre-restore safety snapshot of the CURRENT state so the restore itself is reversible. The snapshot stamp is returned in the result for the operator to record. 3. Remove stale SQLite WAL/SHM sidecar files next to the target db before copying — the snapshot is a self-contained main-file image from ``conn.backup()``, and leftover WAL/SHM from the old live db would desync against the restored main file. 4. Copy the snapshot db over the target db path. 5. Restore the project registry file if the snapshot captured one. 6. Restore the Chroma directory if ``include_chroma`` resolves to true. When ``include_chroma is None`` the function defers to whether the snapshot captured Chroma (the common case). 7. Run ``PRAGMA integrity_check`` on the restored db and report the result. Returns a dict describing what was restored. On refused restore (service still running, validation failed) raises ``RuntimeError``. """ if not confirm_service_stopped: raise RuntimeError( "restore_runtime_backup refuses to run without " "confirm_service_stopped=True — stop the AtoCore container " "first (e.g. `docker compose down` from deploy/dalidou) " "before calling this function" ) validation = validate_backup(stamp) if not validation.get("valid"): raise RuntimeError( f"backup {stamp} failed validation: {validation.get('errors')}" ) metadata = validation.get("metadata") or {} pre_snapshot_stamp: str | None = None if pre_restore_snapshot: pre = create_runtime_backup(include_chroma=False) pre_snapshot_stamp = Path(pre["backup_root"]).name target_db = _config.settings.db_path source_db = Path(metadata.get("db_snapshot_path", "")) if not source_db.exists(): raise RuntimeError( f"db snapshot not found at {source_db} — backup " f"metadata may be stale" ) # Force sqlite to flush any lingering WAL into the main file and # release OS-level file handles on -wal/-shm before we swap the # main file. Passing through conn.backup() in the pre-restore # snapshot can leave sidecars momentarily locked on Windows; # an explicit checkpoint(TRUNCATE) is the reliable way to flush # and release. Best-effort: if the target db can't be opened # (missing, corrupt), fall through and trust the copy step. if target_db.exists(): try: with sqlite3.connect(str(target_db)) as checkpoint_conn: checkpoint_conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") except sqlite3.DatabaseError as exc: log.warning( "restore_pre_checkpoint_failed", target_db=str(target_db), error=str(exc), ) # Remove stale WAL/SHM sidecars from the old live db so SQLite # can't read inconsistent state on next open. Tolerant to # Windows file-lock races — the subsequent copy replaces the # main file anyway, and the integrity check afterward is the # actual correctness signal. wal_path = target_db.with_name(target_db.name + "-wal") shm_path = target_db.with_name(target_db.name + "-shm") for stale in (wal_path, shm_path): if stale.exists(): try: stale.unlink() except OSError as exc: log.warning( "restore_sidecar_unlink_failed", path=str(stale), error=str(exc), ) target_db.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source_db, target_db) registry_restored = False registry_snapshot_path = metadata.get("registry_snapshot_path", "") if registry_snapshot_path: src_reg = Path(registry_snapshot_path) if src_reg.exists(): dst_reg = _config.settings.resolved_project_registry_path dst_reg.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src_reg, dst_reg) registry_restored = True chroma_snapshot_path = metadata.get("chroma_snapshot_path", "") if include_chroma is None: include_chroma = bool(chroma_snapshot_path) chroma_restored = False if include_chroma and chroma_snapshot_path: src_chroma = Path(chroma_snapshot_path) if src_chroma.exists() and src_chroma.is_dir(): dst_chroma = _config.settings.chroma_path if dst_chroma.exists(): shutil.rmtree(dst_chroma) shutil.copytree(src_chroma, dst_chroma) chroma_restored = True restored_integrity_ok = False integrity_error: str | None = None try: with sqlite3.connect(str(target_db)) as conn: row = conn.execute("PRAGMA integrity_check").fetchone() restored_integrity_ok = bool(row and row[0] == "ok") if not restored_integrity_ok: integrity_error = row[0] if row else "no_row" except sqlite3.DatabaseError as exc: integrity_error = f"db_open_failed: {exc}" result: dict = { "stamp": stamp, "pre_restore_snapshot": pre_snapshot_stamp, "target_db": str(target_db), "db_restored": True, "registry_restored": registry_restored, "chroma_restored": chroma_restored, "restored_integrity_ok": restored_integrity_ok, } if integrity_error: result["integrity_error"] = integrity_error log.info( "runtime_backup_restored", stamp=stamp, pre_restore_snapshot=pre_snapshot_stamp, registry_restored=registry_restored, chroma_restored=chroma_restored, integrity_ok=restored_integrity_ok, ) return result def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None: source_conn = sqlite3.connect(str(source_path)) dest_conn = sqlite3.connect(str(dest_path)) try: source_conn.backup(dest_conn) finally: dest_conn.close() source_conn.close() def _copy_directory_tree(source: Path, dest: Path) -> tuple[int, int]: """Copy a directory tree and return (file_count, total_bytes).""" if dest.exists(): shutil.rmtree(dest) shutil.copytree(source, dest) file_count = 0 total_bytes = 0 for path in dest.rglob("*"): if path.is_file(): file_count += 1 total_bytes += path.stat().st_size return file_count, total_bytes def main() -> None: """CLI entry point for the backup module. Supports four subcommands: - ``create`` run ``create_runtime_backup`` (default if none given) - ``list`` list all runtime backup snapshots - ``validate`` validate a specific snapshot by stamp - ``restore`` restore a specific snapshot by stamp The restore subcommand is the one used by the backup/restore drill and MUST be run only when the AtoCore service is stopped. It takes ``--confirm-service-stopped`` as an explicit acknowledgment. """ import argparse parser = argparse.ArgumentParser( prog="python -m atocore.ops.backup", description="AtoCore runtime backup create/list/validate/restore", ) sub = parser.add_subparsers(dest="command") p_create = sub.add_parser("create", help="create a new runtime backup") p_create.add_argument( "--chroma", action="store_true", help="also snapshot the Chroma vector store (cold copy)", ) sub.add_parser("list", help="list runtime backup snapshots") p_validate = sub.add_parser("validate", help="validate a snapshot by stamp") p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)") p_restore = sub.add_parser( "restore", help="restore a snapshot by stamp (service must be stopped)", ) p_restore.add_argument("stamp", help="snapshot stamp to restore") p_restore.add_argument( "--confirm-service-stopped", action="store_true", help="explicit acknowledgment that the AtoCore container is stopped", ) p_restore.add_argument( "--no-pre-snapshot", action="store_true", help="skip the pre-restore safety snapshot of current state", ) chroma_group = p_restore.add_mutually_exclusive_group() chroma_group.add_argument( "--chroma", dest="include_chroma", action="store_true", default=None, help="force-restore the Chroma snapshot", ) chroma_group.add_argument( "--no-chroma", dest="include_chroma", action="store_false", help="skip the Chroma snapshot even if it was captured", ) args = parser.parse_args() command = args.command or "create" if command == "create": include_chroma = getattr(args, "chroma", False) result = create_runtime_backup(include_chroma=include_chroma) elif command == "list": result = {"backups": list_runtime_backups()} elif command == "validate": result = validate_backup(args.stamp) elif command == "restore": result = restore_runtime_backup( args.stamp, include_chroma=args.include_chroma, pre_restore_snapshot=not args.no_pre_snapshot, confirm_service_stopped=args.confirm_service_stopped, ) else: # pragma: no cover — argparse guards this parser.error(f"unknown command: {command}") print(json.dumps(result, indent=2, ensure_ascii=True)) if __name__ == "__main__": main()