feat(assets): binary asset store + artifact entity + wiki evidence (Issue F)

Wires visual evidence into the knowledge graph. Images, PDFs, and CAD
exports can now be uploaded, deduped by SHA-256, thumbnailed, linked to
entities via EVIDENCED_BY, and rendered inline on wiki pages. Unblocks
AKC uploading voice-session screenshots alongside extracted entities.

- assets/ module: store_asset (hash dedup + MIME allowlist + 20 MB cap),
  get_asset_binary, get_thumbnail (Pillow, on-disk cache under
  .thumbnails/<size>/), list_orphan_assets, invalidate_asset
- models/database.py: new `assets` table + indexes
- engineering/service.py: `artifact` added to ENTITY_TYPES
- api/routes.py: POST /assets (multipart), GET /assets/{id},
  /assets/{id}/thumbnail, /assets/{id}/meta, /admin/assets/orphans,
  DELETE /assets/{id} (409 if still referenced),
  GET /entities/{id}/evidence (EVIDENCED_BY artifacts with asset meta)
- main.py: all new paths aliased under /v1
- engineering/wiki.py: entity pages render EVIDENCED_BY → artifact as a
  "Visual evidence" thumbnail strip; artifact pages render the full
  image + caption + capture_context
- deploy/dalidou/docker-compose.yml: bind-mount ${ATOCORE_ASSETS_DIR}
- config.py: assets_dir + assets_max_upload_bytes settings
- requirements.txt + pyproject.toml: python-multipart, Pillow>=10.0.0
- tests/test_assets.py: 16 tests (dedup, cap, thumbnail cache, orphan
  detection, invalidate gating, API upload/fetch, evidence, v1 aliases,
  wiki rendering)
- DEV-LEDGER.md: session log + cleanup note + test_count 478 -> 494

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-21 21:46:52 -04:00
parent b1a3dd071e
commit 069d155585
13 changed files with 1016 additions and 3 deletions

View File

@@ -2,8 +2,8 @@
from pathlib import Path
from fastapi import APIRouter, HTTPException
from fastapi.responses import HTMLResponse
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi.responses import HTMLResponse, Response
from pydantic import BaseModel
import atocore.config as _config
@@ -2377,3 +2377,177 @@ def api_debug_context() -> dict:
if pack is None:
return {"message": "No context pack built yet."}
return _pack_to_dict(pack)
# --- Issue F: binary asset store (visual evidence) ---
@router.post("/assets")
async def api_upload_asset(
file: UploadFile = File(...),
project: str = Form(""),
caption: str = Form(""),
source_refs: str = Form(""),
) -> dict:
"""Upload a binary asset (image, PDF, CAD export).
Idempotent on SHA-256 content hash. ``source_refs`` is a JSON-encoded
list of provenance pointers (e.g. ``["session:<id>"]``); pass an
empty string for none. MIME type is inferred from the upload's
Content-Type header.
"""
from atocore.assets import (
AssetTooLarge,
AssetTypeNotAllowed,
store_asset,
)
import json as _json
data = await file.read()
try:
refs = _json.loads(source_refs) if source_refs else []
if not isinstance(refs, list):
raise ValueError("source_refs must be a JSON array")
refs = [str(r) for r in refs]
except (ValueError, _json.JSONDecodeError) as e:
raise HTTPException(
status_code=400,
detail=f"source_refs must be a JSON array of strings: {e}",
)
mime_type = (file.content_type or "").split(";", 1)[0].strip()
if not mime_type:
raise HTTPException(
status_code=400,
detail="Upload missing Content-Type; cannot determine mime_type",
)
try:
asset = store_asset(
data=data,
mime_type=mime_type,
original_filename=file.filename or "",
project=project or "",
caption=caption or "",
source_refs=refs,
)
except AssetTooLarge as e:
raise HTTPException(status_code=413, detail=str(e))
except AssetTypeNotAllowed as e:
raise HTTPException(status_code=415, detail=str(e))
return asset.to_dict()
@router.get("/assets/{asset_id}")
def api_get_asset_binary(asset_id: str):
"""Return the original binary with its stored Content-Type."""
from atocore.assets import AssetNotFound, get_asset_binary
try:
asset, data = get_asset_binary(asset_id)
except AssetNotFound as e:
raise HTTPException(status_code=404, detail=str(e))
headers = {
"Cache-Control": "private, max-age=3600",
"ETag": f'"{asset.hash_sha256}"',
}
return Response(content=data, media_type=asset.mime_type, headers=headers)
@router.get("/assets/{asset_id}/thumbnail")
def api_get_asset_thumbnail(asset_id: str, size: int = 240):
"""Return a generated thumbnail (images only). Max side ``size`` px."""
from atocore.assets import AssetError, AssetNotFound, get_thumbnail
try:
asset, data = get_thumbnail(asset_id, size=size)
except AssetNotFound as e:
raise HTTPException(status_code=404, detail=str(e))
except AssetError as e:
raise HTTPException(status_code=415, detail=str(e))
headers = {
"Cache-Control": "private, max-age=86400",
"ETag": f'"{asset.hash_sha256}-{size}"',
}
return Response(content=data, media_type="image/jpeg", headers=headers)
@router.get("/assets/{asset_id}/meta")
def api_get_asset_meta(asset_id: str) -> dict:
"""Return asset metadata without the binary."""
from atocore.assets import get_asset
asset = get_asset(asset_id)
if asset is None:
raise HTTPException(status_code=404, detail=f"Asset not found: {asset_id}")
return asset.to_dict()
@router.get("/admin/assets/orphans")
def api_list_asset_orphans(limit: int = 200) -> dict:
"""List assets with no referencing active entity."""
from atocore.assets import list_orphan_assets
orphans = list_orphan_assets(limit=limit)
return {
"orphans": [a.to_dict() for a in orphans],
"count": len(orphans),
}
@router.delete("/assets/{asset_id}")
def api_invalidate_asset(asset_id: str) -> dict:
"""Tombstone an asset. No-op if still referenced by an active entity."""
from atocore.assets import get_asset, invalidate_asset
if get_asset(asset_id) is None:
raise HTTPException(status_code=404, detail=f"Asset not found: {asset_id}")
ok = invalidate_asset(asset_id, actor="api-http")
if not ok:
raise HTTPException(
status_code=409,
detail=f"Asset {asset_id} is still referenced; "
"unlink EVIDENCED_BY relationships or retarget entity.properties.asset_id first",
)
return {"status": "invalidated", "id": asset_id}
@router.get("/entities/{entity_id}/evidence")
def api_get_entity_evidence(entity_id: str) -> dict:
"""Return artifact entities linked to this one via EVIDENCED_BY.
Each entry carries the artifact entity plus its resolved asset
metadata so the caller can build thumbnail URLs without a second
query. Non-artifact evidenced_by targets are skipped (the assumption
is that visual evidence is always an artifact entity).
"""
from atocore.engineering.service import (
get_entity,
get_relationships,
)
from atocore.assets import get_asset
entity = get_entity(entity_id)
if entity is None:
raise HTTPException(status_code=404, detail=f"Entity not found: {entity_id}")
rels = get_relationships(entity_id, direction="outgoing")
evidence: list[dict] = []
for rel in rels:
if rel.relationship_type != "evidenced_by":
continue
target = get_entity(rel.target_entity_id)
if target is None or target.entity_type != "artifact":
continue
asset_id = (target.properties or {}).get("asset_id")
asset = get_asset(asset_id) if asset_id else None
evidence.append({
"entity_id": target.id,
"name": target.name,
"kind": (target.properties or {}).get("kind", "other"),
"caption": (target.properties or {}).get("caption", ""),
"capture_context": (target.properties or {}).get("capture_context", ""),
"asset": asset.to_dict() if asset else None,
"relationship_id": rel.id,
})
return {"entity_id": entity_id, "evidence": evidence, "count": len(evidence)}

View File

@@ -0,0 +1,31 @@
"""Binary asset store (Issue F — visual evidence)."""
from atocore.assets.service import (
ALLOWED_MIME_TYPES,
Asset,
AssetError,
AssetNotFound,
AssetTooLarge,
AssetTypeNotAllowed,
get_asset,
get_asset_binary,
get_thumbnail,
invalidate_asset,
list_orphan_assets,
store_asset,
)
__all__ = [
"ALLOWED_MIME_TYPES",
"Asset",
"AssetError",
"AssetNotFound",
"AssetTooLarge",
"AssetTypeNotAllowed",
"get_asset",
"get_asset_binary",
"get_thumbnail",
"invalidate_asset",
"list_orphan_assets",
"store_asset",
]

View File

@@ -0,0 +1,367 @@
"""Binary asset storage with hash-dedup and on-demand thumbnails.
Issue F — visual evidence. Stores uploaded images / PDFs / CAD exports
under ``<assets_dir>/<hash[:2]>/<hash>.<ext>``. Re-uploads are idempotent
on SHA-256. Thumbnails are generated on first request and cached under
``<assets_dir>/.thumbnails/<size>/<hash>.jpg``.
Kept deliberately small: no authentication, no background jobs, no
image transformations beyond thumbnailing. Callers (API layer) own
MIME validation and size caps.
"""
from __future__ import annotations
import hashlib
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
import atocore.config as _config
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("assets")
# Whitelisted mime types. Start conservative; extend when a real use
# case lands rather than speculatively.
ALLOWED_MIME_TYPES: dict[str, str] = {
"image/png": "png",
"image/jpeg": "jpg",
"image/webp": "webp",
"image/gif": "gif",
"application/pdf": "pdf",
"model/step": "step",
"model/iges": "iges",
}
class AssetError(Exception):
"""Base class for asset errors."""
class AssetTooLarge(AssetError):
pass
class AssetTypeNotAllowed(AssetError):
pass
class AssetNotFound(AssetError):
pass
@dataclass
class Asset:
id: str
hash_sha256: str
mime_type: str
size_bytes: int
stored_path: str
width: int | None = None
height: int | None = None
original_filename: str = ""
project: str = ""
caption: str = ""
source_refs: list[str] = field(default_factory=list)
status: str = "active"
created_at: str = ""
updated_at: str = ""
def to_dict(self) -> dict:
return {
"id": self.id,
"hash_sha256": self.hash_sha256,
"mime_type": self.mime_type,
"size_bytes": self.size_bytes,
"width": self.width,
"height": self.height,
"stored_path": self.stored_path,
"original_filename": self.original_filename,
"project": self.project,
"caption": self.caption,
"source_refs": self.source_refs,
"status": self.status,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
def _assets_root() -> Path:
root = _config.settings.resolved_assets_dir
root.mkdir(parents=True, exist_ok=True)
return root
def _blob_path(hash_sha256: str, ext: str) -> Path:
root = _assets_root()
return root / hash_sha256[:2] / f"{hash_sha256}.{ext}"
def _thumbnails_root() -> Path:
return _assets_root() / ".thumbnails"
def _thumbnail_path(hash_sha256: str, size: int) -> Path:
return _thumbnails_root() / str(size) / f"{hash_sha256}.jpg"
def _image_dimensions(data: bytes, mime_type: str) -> tuple[int | None, int | None]:
if not mime_type.startswith("image/"):
return None, None
try:
from PIL import Image
except Exception:
return None, None
try:
with Image.open(BytesIO(data)) as img:
return img.width, img.height
except Exception as e:
log.warning("asset_dimension_probe_failed", error=str(e))
return None, None
def store_asset(
data: bytes,
mime_type: str,
original_filename: str = "",
project: str = "",
caption: str = "",
source_refs: list[str] | None = None,
) -> Asset:
"""Persist a binary blob and return the catalog row.
Idempotent on SHA-256 — a re-upload returns the existing asset row
without rewriting the blob or creating a duplicate catalog entry.
Caption / project / source_refs on re-upload are ignored; update
those via the owning entity's properties instead.
"""
max_bytes = _config.settings.assets_max_upload_bytes
if len(data) > max_bytes:
raise AssetTooLarge(
f"Upload is {len(data)} bytes; limit is {max_bytes} bytes"
)
if mime_type not in ALLOWED_MIME_TYPES:
raise AssetTypeNotAllowed(
f"mime_type {mime_type!r} not in allowlist. "
f"Allowed: {sorted(ALLOWED_MIME_TYPES)}"
)
hash_sha256 = hashlib.sha256(data).hexdigest()
ext = ALLOWED_MIME_TYPES[mime_type]
# Idempotency — if we already have this hash, return the existing row.
existing = _fetch_by_hash(hash_sha256)
if existing is not None:
log.info("asset_dedup_hit", asset_id=existing.id, hash=hash_sha256[:12])
return existing
width, height = _image_dimensions(data, mime_type)
blob_path = _blob_path(hash_sha256, ext)
blob_path.parent.mkdir(parents=True, exist_ok=True)
blob_path.write_bytes(data)
asset_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
refs = source_refs or []
with get_connection() as conn:
conn.execute(
"""INSERT INTO assets
(id, hash_sha256, mime_type, size_bytes, width, height,
stored_path, original_filename, project, caption,
source_refs, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', ?, ?)""",
(
asset_id, hash_sha256, mime_type, len(data), width, height,
str(blob_path), original_filename, project, caption,
json.dumps(refs), now, now,
),
)
log.info(
"asset_stored", asset_id=asset_id, hash=hash_sha256[:12],
mime_type=mime_type, size_bytes=len(data),
)
return Asset(
id=asset_id, hash_sha256=hash_sha256, mime_type=mime_type,
size_bytes=len(data), width=width, height=height,
stored_path=str(blob_path), original_filename=original_filename,
project=project, caption=caption, source_refs=refs,
status="active", created_at=now, updated_at=now,
)
def _fetch_by_hash(hash_sha256: str) -> Asset | None:
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM assets WHERE hash_sha256 = ? AND status != 'invalid'",
(hash_sha256,),
).fetchone()
return _row_to_asset(row) if row else None
def get_asset(asset_id: str) -> Asset | None:
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM assets WHERE id = ?", (asset_id,)
).fetchone()
return _row_to_asset(row) if row else None
def get_asset_binary(asset_id: str) -> tuple[Asset, bytes]:
"""Return (metadata, raw bytes). Raises AssetNotFound."""
asset = get_asset(asset_id)
if asset is None or asset.status == "invalid":
raise AssetNotFound(f"Asset not found: {asset_id}")
path = Path(asset.stored_path)
if not path.exists():
raise AssetNotFound(
f"Asset {asset_id} row exists but blob is missing at {path}"
)
return asset, path.read_bytes()
def get_thumbnail(asset_id: str, size: int = 240) -> tuple[Asset, bytes]:
"""Return (metadata, thumbnail JPEG bytes).
Thumbnails are only generated for image mime types. For non-images
the caller should render a placeholder instead. Generated thumbs
are cached on disk at ``<assets_dir>/.thumbnails/<size>/<hash>.jpg``.
"""
asset = get_asset(asset_id)
if asset is None or asset.status == "invalid":
raise AssetNotFound(f"Asset not found: {asset_id}")
if not asset.mime_type.startswith("image/"):
raise AssetError(
f"Thumbnails are only supported for images; "
f"{asset.mime_type!r} is not an image"
)
size = max(16, min(int(size), 2048))
thumb_path = _thumbnail_path(asset.hash_sha256, size)
if thumb_path.exists():
return asset, thumb_path.read_bytes()
try:
from PIL import Image
except Exception as e:
raise AssetError(f"Pillow not available for thumbnailing: {e}")
src_path = Path(asset.stored_path)
if not src_path.exists():
raise AssetNotFound(
f"Asset {asset_id} row exists but blob is missing at {src_path}"
)
thumb_path.parent.mkdir(parents=True, exist_ok=True)
with Image.open(src_path) as img:
img = img.convert("RGB") if img.mode not in ("RGB", "L") else img
img.thumbnail((size, size))
buf = BytesIO()
img.save(buf, format="JPEG", quality=85, optimize=True)
jpeg_bytes = buf.getvalue()
thumb_path.write_bytes(jpeg_bytes)
return asset, jpeg_bytes
def list_orphan_assets(limit: int = 200) -> list[Asset]:
"""Assets not referenced by any active entity or memory.
"Referenced" means: an active entity has ``properties.asset_id``
pointing at this asset, OR any active entity / memory's
source_refs contains ``asset:<id>``.
"""
with get_connection() as conn:
asset_rows = conn.execute(
"SELECT * FROM assets WHERE status = 'active' "
"ORDER BY created_at DESC LIMIT ?",
(min(limit, 1000),),
).fetchall()
entities_with_asset = set()
rows = conn.execute(
"SELECT properties, source_refs FROM entities "
"WHERE status = 'active'"
).fetchall()
for r in rows:
try:
props = json.loads(r["properties"] or "{}")
aid = props.get("asset_id")
if aid:
entities_with_asset.add(aid)
except Exception:
pass
try:
refs = json.loads(r["source_refs"] or "[]")
for ref in refs:
if isinstance(ref, str) and ref.startswith("asset:"):
entities_with_asset.add(ref.split(":", 1)[1])
except Exception:
pass
# Memories don't have a properties dict, but source_refs may carry
# asset:<id> after Issue F lands for memory-level evidence.
# The memories table has no source_refs column today — skip here
# and extend once that lands.
return [
_row_to_asset(r)
for r in asset_rows
if r["id"] not in entities_with_asset
]
def invalidate_asset(asset_id: str, actor: str = "api", note: str = "") -> bool:
"""Tombstone an asset. No-op if still referenced.
Returns True on success, False if the asset is missing or still
referenced by an active entity (caller should get a 409 in that
case). The blob file stays on disk until a future gc pass sweeps
orphaned blobs — this function only flips the catalog status.
"""
asset = get_asset(asset_id)
if asset is None:
return False
orphans = list_orphan_assets(limit=1000)
if asset.id not in {o.id for o in orphans} and asset.status == "active":
log.info("asset_invalidate_blocked_referenced", asset_id=asset_id)
return False
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
conn.execute(
"UPDATE assets SET status = 'invalid', updated_at = ? WHERE id = ?",
(now, asset_id),
)
log.info("asset_invalidated", asset_id=asset_id, actor=actor, note=note[:80])
return True
def _row_to_asset(row) -> Asset:
try:
refs = json.loads(row["source_refs"] or "[]")
except Exception:
refs = []
return Asset(
id=row["id"],
hash_sha256=row["hash_sha256"],
mime_type=row["mime_type"],
size_bytes=row["size_bytes"],
width=row["width"],
height=row["height"],
stored_path=row["stored_path"],
original_filename=row["original_filename"] or "",
project=row["project"] or "",
caption=row["caption"] or "",
source_refs=refs,
status=row["status"],
created_at=row["created_at"] or "",
updated_at=row["updated_at"] or "",
)

View File

@@ -22,6 +22,8 @@ class Settings(BaseSettings):
backup_dir: Path = Path("./backups")
run_dir: Path = Path("./run")
project_registry_path: Path = Path("./config/project-registry.json")
assets_dir: Path | None = None
assets_max_upload_bytes: int = 20 * 1024 * 1024 # 20 MB per upload
host: str = "127.0.0.1"
port: int = 8100
db_busy_timeout_ms: int = 5000
@@ -76,6 +78,10 @@ class Settings(BaseSettings):
def resolved_data_dir(self) -> Path:
return self._resolve_path(self.data_dir)
@property
def resolved_assets_dir(self) -> Path:
return self._resolve_path(self.assets_dir or (self.resolved_data_dir / "assets"))
@property
def resolved_db_dir(self) -> Path:
return self._resolve_path(self.db_dir or (self.resolved_data_dir / "db"))
@@ -132,6 +138,7 @@ class Settings(BaseSettings):
self.resolved_backup_dir,
self.resolved_run_dir,
self.resolved_project_registry_path.parent,
self.resolved_assets_dir,
]
@property

View File

@@ -29,6 +29,10 @@ ENTITY_TYPES = [
"validation_claim",
"vendor",
"process",
# Issue F (visual evidence): images, PDFs, CAD exports attached to
# other entities via EVIDENCED_BY. properties carries kind +
# asset_id + caption + capture_context.
"artifact",
]
RELATIONSHIP_TYPES = [

View File

@@ -277,6 +277,115 @@ def render_project(project: str) -> str:
)
def _render_visual_evidence(entity_id: str, ctx: dict) -> str:
"""Render EVIDENCED_BY → artifact links as an inline thumbnail strip."""
from atocore.assets import get_asset
artifacts = []
for rel in ctx["relationships"]:
if rel.source_entity_id != entity_id or rel.relationship_type != "evidenced_by":
continue
target = ctx["related_entities"].get(rel.target_entity_id)
if target is None or target.entity_type != "artifact":
continue
artifacts.append(target)
if not artifacts:
return ""
tiles = []
for art in artifacts:
props = art.properties or {}
kind = props.get("kind", "other")
caption = props.get("caption", art.name)
asset_id = props.get("asset_id")
asset = get_asset(asset_id) if asset_id else None
detail_href = f"/wiki/entities/{art.id}"
if kind == "image" and asset and asset.mime_type.startswith("image/"):
full_href = f"/assets/{asset.id}"
thumb = f"/assets/{asset.id}/thumbnail?size=240"
tiles.append(
f'<figure class="evidence-tile">'
f'<a href="{full_href}" target="_blank" rel="noopener">'
f'<img src="{thumb}" alt="{_escape_attr(caption)}" loading="lazy">'
f'</a>'
f'<figcaption><a href="{detail_href}">{_escape_html(caption)}</a></figcaption>'
f'</figure>'
)
elif kind == "pdf" and asset:
full_href = f"/assets/{asset.id}"
tiles.append(
f'<div class="evidence-tile evidence-pdf">'
f'<a href="{full_href}" target="_blank" rel="noopener">'
f'📄 PDF: {_escape_html(caption)}</a>'
f' · <a href="{detail_href}">details</a>'
f'</div>'
)
else:
tiles.append(
f'<div class="evidence-tile evidence-other">'
f'<a href="{detail_href}">📎 {_escape_html(caption)}</a>'
f' <span class="tag">{kind}</span>'
f'</div>'
)
return (
'<h2>Visual evidence</h2>'
f'<div class="evidence-strip">{"".join(tiles)}</div>'
)
def _render_artifact_body(ent) -> list[str]:
"""Render an artifact entity's own image/pdf/caption."""
from atocore.assets import get_asset
props = ent.properties or {}
kind = props.get("kind", "other")
caption = props.get("caption", "")
capture_context = props.get("capture_context", "")
asset_id = props.get("asset_id")
asset = get_asset(asset_id) if asset_id else None
out: list[str] = []
if kind == "image" and asset and asset.mime_type.startswith("image/"):
out.append(
f'<figure class="artifact-full">'
f'<a href="/assets/{asset.id}" target="_blank" rel="noopener">'
f'<img src="/assets/{asset.id}/thumbnail?size=1024" '
f'alt="{_escape_attr(caption or ent.name)}">'
f'</a>'
f'<figcaption>{_escape_html(caption)}</figcaption>'
f'</figure>'
)
elif kind == "pdf" and asset:
out.append(
f'<p>📄 <a href="/assets/{asset.id}" target="_blank" rel="noopener">'
f'Open PDF ({asset.size_bytes // 1024} KB)</a></p>'
)
elif asset_id:
out.append(f'<p class="meta">asset_id: <code>{asset_id}</code> — blob missing</p>')
if capture_context:
out.append('<h2>Capture context</h2>')
out.append(f'<blockquote>{_escape_html(capture_context)}</blockquote>')
return out
def _escape_html(s: str) -> str:
if s is None:
return ""
return (str(s)
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;"))
def _escape_attr(s: str) -> str:
return _escape_html(s).replace('"', "&quot;")
def render_entity(entity_id: str) -> str | None:
ctx = get_entity_with_context(entity_id)
if ctx is None:
@@ -297,6 +406,15 @@ def render_entity(entity_id: str) -> str | None:
lines.append(f'<p class="meta">confidence: {ent.confidence} · status: {ent.status} · created: {ent.created_at}</p>')
# Issue F: artifact entities render their own image inline; other
# entities render their EVIDENCED_BY artifacts as a visual strip.
if ent.entity_type == "artifact":
lines.extend(_render_artifact_body(ent))
else:
evidence_html = _render_visual_evidence(ent.id, ctx)
if evidence_html:
lines.append(evidence_html)
if ctx["relationships"]:
lines.append('<h2>Relationships</h2><ul>')
for rel in ctx["relationships"]:
@@ -799,6 +917,14 @@ _TEMPLATE = """<!DOCTYPE html>
.stat-row { display: flex; gap: 1rem; flex-wrap: wrap; font-size: 0.9rem; margin: 0.4rem 0; }
.stat-row span { padding: 0.1rem 0.4rem; background: var(--hover); border-radius: 4px; }
.meta { font-size: 0.8em; opacity: 0.5; margin-top: 0.5rem; }
.evidence-strip { display: flex; flex-wrap: wrap; gap: 0.75rem; margin: 0.75rem 0 1.25rem; }
.evidence-tile { margin: 0; background: var(--card); border: 1px solid var(--border); border-radius: 6px; padding: 0.4rem; max-width: 270px; }
.evidence-tile img { display: block; max-width: 100%; height: auto; border-radius: 3px; }
.evidence-tile figcaption { font-size: 0.8rem; margin-top: 0.35rem; opacity: 0.85; }
.evidence-pdf, .evidence-other { padding: 0.6rem 0.8rem; font-size: 0.9rem; }
.artifact-full figure, .artifact-full { margin: 0 0 1rem; }
.artifact-full img { display: block; max-width: 100%; height: auto; border: 1px solid var(--border); border-radius: 4px; }
.artifact-full figcaption { font-size: 0.9rem; margin-top: 0.5rem; opacity: 0.85; }
.tag { background: var(--accent); color: var(--bg); padding: 0.1rem 0.4rem; border-radius: 3px; font-size: 0.75em; margin-left: 0.3rem; }
.search-box { display: flex; gap: 0.5rem; margin: 1.5rem 0; }
.search-box input {

View File

@@ -88,6 +88,12 @@ _V1_PUBLIC_PATHS = {
"/health",
"/sources",
"/stats",
# Issue F: asset store + evidence query
"/assets",
"/assets/{asset_id}",
"/assets/{asset_id}/thumbnail",
"/assets/{asset_id}/meta",
"/entities/{entity_id}/evidence",
}
_v1_router = APIRouter(prefix="/v1", tags=["v1"])

View File

@@ -314,6 +314,38 @@ def _apply_migrations(conn: sqlite3.Connection) -> None:
conn.execute("CREATE INDEX IF NOT EXISTS idx_tag_aliases_status ON tag_aliases(status)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tag_aliases_alias ON tag_aliases(alias)")
# Issue F (visual evidence): binary asset store. One row per unique
# content hash — re-uploading the same file is idempotent. The blob
# itself lives on disk under stored_path; this table is the catalog.
# width/height are populated for image mime types (NULL otherwise).
# source_refs is a JSON array of free-form provenance pointers
# (e.g. "session:<id>", "interaction:<id>") that survive independent
# of the EVIDENCED_BY graph. status=invalid tombstones an asset
# without dropping the row so audit trails stay intact.
conn.execute(
"""
CREATE TABLE IF NOT EXISTS assets (
id TEXT PRIMARY KEY,
hash_sha256 TEXT UNIQUE NOT NULL,
mime_type TEXT NOT NULL,
size_bytes INTEGER NOT NULL,
width INTEGER,
height INTEGER,
stored_path TEXT NOT NULL,
original_filename TEXT DEFAULT '',
project TEXT DEFAULT '',
caption TEXT DEFAULT '',
source_refs TEXT DEFAULT '[]',
status TEXT DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_hash ON assets(hash_sha256)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_project ON assets(project)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_status ON assets(status)")
def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()