"""Binary asset storage with hash-dedup and on-demand thumbnails. Issue F — visual evidence. Stores uploaded images / PDFs / CAD exports under ``//.``. Re-uploads are idempotent on SHA-256. Thumbnails are generated on first request and cached under ``/.thumbnails//.jpg``. Kept deliberately small: no authentication, no background jobs, no image transformations beyond thumbnailing. Callers (API layer) own MIME validation and size caps. """ from __future__ import annotations import hashlib import json import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from io import BytesIO from pathlib import Path import atocore.config as _config from atocore.models.database import get_connection from atocore.observability.logger import get_logger log = get_logger("assets") # Whitelisted mime types. Start conservative; extend when a real use # case lands rather than speculatively. ALLOWED_MIME_TYPES: dict[str, str] = { "image/png": "png", "image/jpeg": "jpg", "image/webp": "webp", "image/gif": "gif", "application/pdf": "pdf", "model/step": "step", "model/iges": "iges", } class AssetError(Exception): """Base class for asset errors.""" class AssetTooLarge(AssetError): pass class AssetTypeNotAllowed(AssetError): pass class AssetNotFound(AssetError): pass @dataclass class Asset: id: str hash_sha256: str mime_type: str size_bytes: int stored_path: str width: int | None = None height: int | None = None original_filename: str = "" project: str = "" caption: str = "" source_refs: list[str] = field(default_factory=list) status: str = "active" created_at: str = "" updated_at: str = "" def to_dict(self) -> dict: return { "id": self.id, "hash_sha256": self.hash_sha256, "mime_type": self.mime_type, "size_bytes": self.size_bytes, "width": self.width, "height": self.height, "stored_path": self.stored_path, "original_filename": self.original_filename, "project": self.project, "caption": self.caption, "source_refs": self.source_refs, "status": self.status, "created_at": self.created_at, "updated_at": self.updated_at, } def _assets_root() -> Path: root = _config.settings.resolved_assets_dir root.mkdir(parents=True, exist_ok=True) return root def _blob_path(hash_sha256: str, ext: str) -> Path: root = _assets_root() return root / hash_sha256[:2] / f"{hash_sha256}.{ext}" def _thumbnails_root() -> Path: return _assets_root() / ".thumbnails" def _thumbnail_path(hash_sha256: str, size: int) -> Path: return _thumbnails_root() / str(size) / f"{hash_sha256}.jpg" def _image_dimensions(data: bytes, mime_type: str) -> tuple[int | None, int | None]: if not mime_type.startswith("image/"): return None, None try: from PIL import Image except Exception: return None, None try: with Image.open(BytesIO(data)) as img: return img.width, img.height except Exception as e: log.warning("asset_dimension_probe_failed", error=str(e)) return None, None def store_asset( data: bytes, mime_type: str, original_filename: str = "", project: str = "", caption: str = "", source_refs: list[str] | None = None, ) -> Asset: """Persist a binary blob and return the catalog row. Idempotent on SHA-256 — a re-upload returns the existing asset row without rewriting the blob or creating a duplicate catalog entry. Caption / project / source_refs on re-upload are ignored; update those via the owning entity's properties instead. """ max_bytes = _config.settings.assets_max_upload_bytes if len(data) > max_bytes: raise AssetTooLarge( f"Upload is {len(data)} bytes; limit is {max_bytes} bytes" ) if mime_type not in ALLOWED_MIME_TYPES: raise AssetTypeNotAllowed( f"mime_type {mime_type!r} not in allowlist. " f"Allowed: {sorted(ALLOWED_MIME_TYPES)}" ) hash_sha256 = hashlib.sha256(data).hexdigest() ext = ALLOWED_MIME_TYPES[mime_type] # Idempotency — if we already have this hash, return the existing row. existing = _fetch_by_hash(hash_sha256) if existing is not None: log.info("asset_dedup_hit", asset_id=existing.id, hash=hash_sha256[:12]) return existing width, height = _image_dimensions(data, mime_type) blob_path = _blob_path(hash_sha256, ext) blob_path.parent.mkdir(parents=True, exist_ok=True) blob_path.write_bytes(data) asset_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") refs = source_refs or [] with get_connection() as conn: conn.execute( """INSERT INTO assets (id, hash_sha256, mime_type, size_bytes, width, height, stored_path, original_filename, project, caption, source_refs, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', ?, ?)""", ( asset_id, hash_sha256, mime_type, len(data), width, height, str(blob_path), original_filename, project, caption, json.dumps(refs), now, now, ), ) log.info( "asset_stored", asset_id=asset_id, hash=hash_sha256[:12], mime_type=mime_type, size_bytes=len(data), ) return Asset( id=asset_id, hash_sha256=hash_sha256, mime_type=mime_type, size_bytes=len(data), width=width, height=height, stored_path=str(blob_path), original_filename=original_filename, project=project, caption=caption, source_refs=refs, status="active", created_at=now, updated_at=now, ) def _fetch_by_hash(hash_sha256: str) -> Asset | None: with get_connection() as conn: row = conn.execute( "SELECT * FROM assets WHERE hash_sha256 = ? AND status != 'invalid'", (hash_sha256,), ).fetchone() return _row_to_asset(row) if row else None def get_asset(asset_id: str) -> Asset | None: with get_connection() as conn: row = conn.execute( "SELECT * FROM assets WHERE id = ?", (asset_id,) ).fetchone() return _row_to_asset(row) if row else None def get_asset_binary(asset_id: str) -> tuple[Asset, bytes]: """Return (metadata, raw bytes). Raises AssetNotFound.""" asset = get_asset(asset_id) if asset is None or asset.status == "invalid": raise AssetNotFound(f"Asset not found: {asset_id}") path = Path(asset.stored_path) if not path.exists(): raise AssetNotFound( f"Asset {asset_id} row exists but blob is missing at {path}" ) return asset, path.read_bytes() def get_thumbnail(asset_id: str, size: int = 240) -> tuple[Asset, bytes]: """Return (metadata, thumbnail JPEG bytes). Thumbnails are only generated for image mime types. For non-images the caller should render a placeholder instead. Generated thumbs are cached on disk at ``/.thumbnails//.jpg``. """ asset = get_asset(asset_id) if asset is None or asset.status == "invalid": raise AssetNotFound(f"Asset not found: {asset_id}") if not asset.mime_type.startswith("image/"): raise AssetError( f"Thumbnails are only supported for images; " f"{asset.mime_type!r} is not an image" ) size = max(16, min(int(size), 2048)) thumb_path = _thumbnail_path(asset.hash_sha256, size) if thumb_path.exists(): return asset, thumb_path.read_bytes() try: from PIL import Image except Exception as e: raise AssetError(f"Pillow not available for thumbnailing: {e}") src_path = Path(asset.stored_path) if not src_path.exists(): raise AssetNotFound( f"Asset {asset_id} row exists but blob is missing at {src_path}" ) thumb_path.parent.mkdir(parents=True, exist_ok=True) with Image.open(src_path) as img: img = img.convert("RGB") if img.mode not in ("RGB", "L") else img img.thumbnail((size, size)) buf = BytesIO() img.save(buf, format="JPEG", quality=85, optimize=True) jpeg_bytes = buf.getvalue() thumb_path.write_bytes(jpeg_bytes) return asset, jpeg_bytes def list_orphan_assets(limit: int = 200) -> list[Asset]: """Assets not referenced by any active entity or memory. "Referenced" means: an active entity has ``properties.asset_id`` pointing at this asset, OR any active entity / memory's source_refs contains ``asset:``. """ with get_connection() as conn: asset_rows = conn.execute( "SELECT * FROM assets WHERE status = 'active' " "ORDER BY created_at DESC LIMIT ?", (min(limit, 1000),), ).fetchall() entities_with_asset = set() rows = conn.execute( "SELECT properties, source_refs FROM entities " "WHERE status = 'active'" ).fetchall() for r in rows: try: props = json.loads(r["properties"] or "{}") aid = props.get("asset_id") if aid: entities_with_asset.add(aid) except Exception: pass try: refs = json.loads(r["source_refs"] or "[]") for ref in refs: if isinstance(ref, str) and ref.startswith("asset:"): entities_with_asset.add(ref.split(":", 1)[1]) except Exception: pass # Memories don't have a properties dict, but source_refs may carry # asset: after Issue F lands for memory-level evidence. # The memories table has no source_refs column today — skip here # and extend once that lands. return [ _row_to_asset(r) for r in asset_rows if r["id"] not in entities_with_asset ] def invalidate_asset(asset_id: str, actor: str = "api", note: str = "") -> bool: """Tombstone an asset. No-op if still referenced. Returns True on success, False if the asset is missing or still referenced by an active entity (caller should get a 409 in that case). The blob file stays on disk until a future gc pass sweeps orphaned blobs — this function only flips the catalog status. """ asset = get_asset(asset_id) if asset is None: return False orphans = list_orphan_assets(limit=1000) if asset.id not in {o.id for o in orphans} and asset.status == "active": log.info("asset_invalidate_blocked_referenced", asset_id=asset_id) return False now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") with get_connection() as conn: conn.execute( "UPDATE assets SET status = 'invalid', updated_at = ? WHERE id = ?", (now, asset_id), ) log.info("asset_invalidated", asset_id=asset_id, actor=actor, note=note[:80]) return True def _row_to_asset(row) -> Asset: try: refs = json.loads(row["source_refs"] or "[]") except Exception: refs = [] return Asset( id=row["id"], hash_sha256=row["hash_sha256"], mime_type=row["mime_type"], size_bytes=row["size_bytes"], width=row["width"], height=row["height"], stored_path=row["stored_path"], original_filename=row["original_filename"] or "", project=row["project"] or "", caption=row["caption"] or "", source_refs=refs, status=row["status"], created_at=row["created_at"] or "", updated_at=row["updated_at"] or "", )