"""Registered project source metadata and refresh helpers.""" from __future__ import annotations import json from dataclasses import asdict, dataclass from pathlib import Path import atocore.config as _config from atocore.ingestion.pipeline import ingest_folder @dataclass(frozen=True) class ProjectSourceRef: source: str subpath: str label: str = "" @dataclass(frozen=True) class RegisteredProject: project_id: str aliases: tuple[str, ...] description: str ingest_roots: tuple[ProjectSourceRef, ...] def load_project_registry() -> list[RegisteredProject]: """Load project registry entries from JSON config.""" registry_path = _config.settings.resolved_project_registry_path if not registry_path.exists(): return [] payload = json.loads(registry_path.read_text(encoding="utf-8")) entries = payload.get("projects", []) projects: list[RegisteredProject] = [] for entry in entries: project_id = str(entry["id"]).strip() aliases = tuple( alias.strip() for alias in entry.get("aliases", []) if isinstance(alias, str) and alias.strip() ) description = str(entry.get("description", "")).strip() ingest_roots = tuple( ProjectSourceRef( source=str(root["source"]).strip(), subpath=str(root["subpath"]).strip(), label=str(root.get("label", "")).strip(), ) for root in entry.get("ingest_roots", []) if str(root.get("source", "")).strip() and str(root.get("subpath", "")).strip() ) projects.append( RegisteredProject( project_id=project_id, aliases=aliases, description=description, ingest_roots=ingest_roots, ) ) return projects def list_registered_projects() -> list[dict]: """Return registry entries with resolved source readiness.""" return [_project_to_dict(project) for project in load_project_registry()] def get_registered_project(project_name: str) -> RegisteredProject | None: """Resolve a registry entry by id or alias.""" needle = project_name.strip().lower() if not needle: return None for project in load_project_registry(): candidates = {project.project_id.lower(), *(alias.lower() for alias in project.aliases)} if needle in candidates: return project return None def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict: """Ingest all configured source roots for a registered project.""" project = get_registered_project(project_name) if project is None: raise ValueError(f"Unknown project: {project_name}") roots = [] for source_ref in project.ingest_roots: resolved = _resolve_ingest_root(source_ref) root_result = { "source": source_ref.source, "subpath": source_ref.subpath, "label": source_ref.label, "path": str(resolved), } if not resolved.exists(): roots.append({**root_result, "status": "missing"}) continue if not resolved.is_dir(): roots.append({**root_result, "status": "not_directory"}) continue roots.append( { **root_result, "status": "ingested", "results": ingest_folder(resolved, purge_deleted=purge_deleted), } ) return { "project": project.project_id, "aliases": list(project.aliases), "description": project.description, "purge_deleted": purge_deleted, "roots": roots, } def _project_to_dict(project: RegisteredProject) -> dict: return { "id": project.project_id, "aliases": list(project.aliases), "description": project.description, "ingest_roots": [ { **asdict(source_ref), "path": str(_resolve_ingest_root(source_ref)), "exists": _resolve_ingest_root(source_ref).exists(), "is_dir": _resolve_ingest_root(source_ref).is_dir(), } for source_ref in project.ingest_roots ], } def _resolve_ingest_root(source_ref: ProjectSourceRef) -> Path: base_map = { "vault": _config.settings.resolved_vault_source_dir, "drive": _config.settings.resolved_drive_source_dir, } try: base_dir = base_map[source_ref.source] except KeyError as exc: raise ValueError(f"Unsupported source root: {source_ref.source}") from exc return (base_dir / source_ref.subpath).resolve(strict=False)