71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
|
|
"""Create safe runtime backups for the AtoCore machine store."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import sqlite3
|
||
|
|
from datetime import datetime, UTC
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import atocore.config as _config
|
||
|
|
from atocore.models.database import init_db
|
||
|
|
from atocore.observability.logger import get_logger
|
||
|
|
|
||
|
|
log = get_logger("backup")
|
||
|
|
|
||
|
|
|
||
|
|
def create_runtime_backup(timestamp: datetime | None = None) -> dict:
|
||
|
|
"""Create a hot backup of the SQLite DB plus registry/config metadata."""
|
||
|
|
init_db()
|
||
|
|
now = timestamp or datetime.now(UTC)
|
||
|
|
stamp = now.strftime("%Y%m%dT%H%M%SZ")
|
||
|
|
|
||
|
|
backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp
|
||
|
|
db_backup_dir = backup_root / "db"
|
||
|
|
config_backup_dir = backup_root / "config"
|
||
|
|
metadata_path = backup_root / "backup-metadata.json"
|
||
|
|
|
||
|
|
db_backup_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
config_backup_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
db_snapshot_path = db_backup_dir / _config.settings.db_path.name
|
||
|
|
_backup_sqlite_db(_config.settings.db_path, db_snapshot_path)
|
||
|
|
|
||
|
|
registry_snapshot = None
|
||
|
|
registry_path = _config.settings.resolved_project_registry_path
|
||
|
|
if registry_path.exists():
|
||
|
|
registry_snapshot = config_backup_dir / registry_path.name
|
||
|
|
registry_snapshot.write_text(registry_path.read_text(encoding="utf-8"), encoding="utf-8")
|
||
|
|
|
||
|
|
metadata = {
|
||
|
|
"created_at": now.isoformat(),
|
||
|
|
"backup_root": str(backup_root),
|
||
|
|
"db_snapshot_path": str(db_snapshot_path),
|
||
|
|
"db_size_bytes": db_snapshot_path.stat().st_size,
|
||
|
|
"registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "",
|
||
|
|
"vector_store_note": "Chroma hot backup is not included in this script; use a cold snapshot or rebuild/export workflow.",
|
||
|
|
}
|
||
|
|
metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
|
||
|
|
|
||
|
|
log.info("runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path))
|
||
|
|
return metadata
|
||
|
|
|
||
|
|
|
||
|
|
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
|
||
|
|
source_conn = sqlite3.connect(str(source_path))
|
||
|
|
dest_conn = sqlite3.connect(str(dest_path))
|
||
|
|
try:
|
||
|
|
source_conn.backup(dest_conn)
|
||
|
|
finally:
|
||
|
|
dest_conn.close()
|
||
|
|
source_conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
result = create_runtime_backup()
|
||
|
|
print(json.dumps(result, indent=2, ensure_ascii=True))
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|