"""Create safe runtime backups for the AtoCore machine store. This module is intentionally conservative: - The SQLite snapshot uses the online ``conn.backup()`` API and is safe to call while the database is in use. - The project registry snapshot is a simple file copy of the canonical registry JSON. - The Chroma snapshot is a *cold* directory copy. To stay safe it must be taken while no ingestion is running. The recommended pattern from the API layer is to acquire ``exclusive_ingestion()`` for the duration of the backup so refreshes and ingestions cannot run concurrently with the copy. The backup metadata file records what was actually included so restore tooling does not have to guess. """ from __future__ import annotations import json import shutil import sqlite3 from datetime import datetime, UTC from pathlib import Path import atocore.config as _config from atocore.models.database import init_db from atocore.observability.logger import get_logger log = get_logger("backup") def create_runtime_backup( timestamp: datetime | None = None, include_chroma: bool = False, ) -> dict: """Create a hot SQLite backup plus registry/config metadata. When ``include_chroma`` is true the Chroma persistence directory is also snapshotted as a cold directory copy. The caller is responsible for ensuring no ingestion is running concurrently. The HTTP layer enforces this by holding ``exclusive_ingestion()`` around the call. """ init_db() now = timestamp or datetime.now(UTC) stamp = now.strftime("%Y%m%dT%H%M%SZ") backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp db_backup_dir = backup_root / "db" config_backup_dir = backup_root / "config" chroma_backup_dir = backup_root / "chroma" metadata_path = backup_root / "backup-metadata.json" db_backup_dir.mkdir(parents=True, exist_ok=True) config_backup_dir.mkdir(parents=True, exist_ok=True) db_snapshot_path = db_backup_dir / _config.settings.db_path.name _backup_sqlite_db(_config.settings.db_path, db_snapshot_path) registry_snapshot = None registry_path = _config.settings.resolved_project_registry_path if registry_path.exists(): registry_snapshot = config_backup_dir / registry_path.name registry_snapshot.write_text( registry_path.read_text(encoding="utf-8"), encoding="utf-8" ) chroma_snapshot_path = "" chroma_files_copied = 0 chroma_bytes_copied = 0 if include_chroma: source_chroma = _config.settings.chroma_path if source_chroma.exists() and source_chroma.is_dir(): chroma_backup_dir.mkdir(parents=True, exist_ok=True) chroma_files_copied, chroma_bytes_copied = _copy_directory_tree( source_chroma, chroma_backup_dir ) chroma_snapshot_path = str(chroma_backup_dir) else: log.info( "chroma_snapshot_skipped_missing", path=str(source_chroma), ) metadata = { "created_at": now.isoformat(), "backup_root": str(backup_root), "db_snapshot_path": str(db_snapshot_path), "db_size_bytes": db_snapshot_path.stat().st_size, "registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "", "chroma_snapshot_path": chroma_snapshot_path, "chroma_snapshot_bytes": chroma_bytes_copied, "chroma_snapshot_files": chroma_files_copied, "chroma_snapshot_included": include_chroma, "vector_store_note": ( "Chroma snapshot included as cold directory copy." if include_chroma and chroma_snapshot_path else "Chroma hot backup is not included; rerun with include_chroma=True under exclusive_ingestion()." ), } metadata_path.write_text( json.dumps(metadata, indent=2, ensure_ascii=True) + "\n", encoding="utf-8", ) log.info( "runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path), chroma_included=include_chroma, chroma_bytes=chroma_bytes_copied, ) return metadata def list_runtime_backups() -> list[dict]: """List all runtime backups under the configured backup directory.""" snapshots_root = _config.settings.resolved_backup_dir / "snapshots" if not snapshots_root.exists() or not snapshots_root.is_dir(): return [] entries: list[dict] = [] for snapshot_dir in sorted(snapshots_root.iterdir()): if not snapshot_dir.is_dir(): continue metadata_path = snapshot_dir / "backup-metadata.json" entry: dict = { "stamp": snapshot_dir.name, "path": str(snapshot_dir), "has_metadata": metadata_path.exists(), } if metadata_path.exists(): try: entry["metadata"] = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError: entry["metadata"] = None entry["metadata_error"] = "invalid_json" entries.append(entry) return entries def validate_backup(stamp: str) -> dict: """Validate that a previously created backup is structurally usable. Checks: - the snapshot directory exists - the SQLite snapshot is openable and ``PRAGMA integrity_check`` returns ok - the registry snapshot, if recorded, parses as JSON - the chroma snapshot directory, if recorded, exists """ snapshot_dir = _config.settings.resolved_backup_dir / "snapshots" / stamp result: dict = { "stamp": stamp, "path": str(snapshot_dir), "exists": snapshot_dir.exists(), "db_ok": False, "registry_ok": None, "chroma_ok": None, "errors": [], } if not snapshot_dir.exists(): result["errors"].append("snapshot_directory_missing") return result metadata_path = snapshot_dir / "backup-metadata.json" if not metadata_path.exists(): result["errors"].append("metadata_missing") return result try: metadata = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: result["errors"].append(f"metadata_invalid_json: {exc}") return result result["metadata"] = metadata db_path = Path(metadata.get("db_snapshot_path", "")) if not db_path.exists(): result["errors"].append("db_snapshot_missing") else: try: with sqlite3.connect(str(db_path)) as conn: row = conn.execute("PRAGMA integrity_check").fetchone() result["db_ok"] = bool(row and row[0] == "ok") if not result["db_ok"]: result["errors"].append( f"db_integrity_check_failed: {row[0] if row else 'no_row'}" ) except sqlite3.DatabaseError as exc: result["errors"].append(f"db_open_failed: {exc}") registry_snapshot_path = metadata.get("registry_snapshot_path", "") if registry_snapshot_path: registry_path = Path(registry_snapshot_path) if not registry_path.exists(): result["registry_ok"] = False result["errors"].append("registry_snapshot_missing") else: try: json.loads(registry_path.read_text(encoding="utf-8")) result["registry_ok"] = True except json.JSONDecodeError as exc: result["registry_ok"] = False result["errors"].append(f"registry_invalid_json: {exc}") chroma_snapshot_path = metadata.get("chroma_snapshot_path", "") if chroma_snapshot_path: chroma_dir = Path(chroma_snapshot_path) if chroma_dir.exists() and chroma_dir.is_dir(): result["chroma_ok"] = True else: result["chroma_ok"] = False result["errors"].append("chroma_snapshot_missing") result["valid"] = not result["errors"] return result def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None: source_conn = sqlite3.connect(str(source_path)) dest_conn = sqlite3.connect(str(dest_path)) try: source_conn.backup(dest_conn) finally: dest_conn.close() source_conn.close() def _copy_directory_tree(source: Path, dest: Path) -> tuple[int, int]: """Copy a directory tree and return (file_count, total_bytes).""" if dest.exists(): shutil.rmtree(dest) shutil.copytree(source, dest) file_count = 0 total_bytes = 0 for path in dest.rglob("*"): if path.is_file(): file_count += 1 total_bytes += path.stat().st_size return file_count, total_bytes def main() -> None: result = create_runtime_backup() print(json.dumps(result, indent=2, ensure_ascii=True)) if __name__ == "__main__": main()