"""Registered project source metadata and refresh helpers.""" from __future__ import annotations import json from dataclasses import asdict, dataclass from pathlib import Path import atocore.config as _config from atocore.ingestion.pipeline import ingest_folder @dataclass(frozen=True) class ProjectSourceRef: source: str subpath: str label: str = "" @dataclass(frozen=True) class RegisteredProject: project_id: str aliases: tuple[str, ...] description: str ingest_roots: tuple[ProjectSourceRef, ...] def get_project_registry_template() -> dict: """Return a minimal template for registering a new project.""" return { "projects": [ { "id": "p07-example", "aliases": ["p07", "example-project"], "description": "Short description of the project and staged corpus.", "ingest_roots": [ { "source": "vault", "subpath": "incoming/projects/p07-example", "label": "Primary staged project docs", } ], } ] } def build_project_registration_proposal( project_id: str, aliases: list[str] | tuple[str, ...] | None = None, description: str = "", ingest_roots: list[dict] | tuple[dict, ...] | None = None, ) -> dict: """Build a normalized project registration proposal without mutating state.""" normalized_id = project_id.strip() if not normalized_id: raise ValueError("Project id must be non-empty") normalized_aliases = _normalize_aliases(aliases or []) normalized_roots = _normalize_ingest_roots(ingest_roots or []) if not normalized_roots: raise ValueError("At least one ingest root is required") collisions = _find_name_collisions(normalized_id, normalized_aliases) resolved_roots = [] for root in normalized_roots: source_ref = ProjectSourceRef( source=root["source"], subpath=root["subpath"], label=root.get("label", ""), ) resolved_path = _resolve_ingest_root(source_ref) resolved_roots.append( { **root, "path": str(resolved_path), "exists": resolved_path.exists(), "is_dir": resolved_path.is_dir(), } ) return { "project": { "id": normalized_id, "aliases": normalized_aliases, "description": description.strip(), "ingest_roots": normalized_roots, }, "resolved_ingest_roots": resolved_roots, "collisions": collisions, "registry_path": str(_config.settings.resolved_project_registry_path), "valid": not collisions, } def register_project( project_id: str, aliases: list[str] | tuple[str, ...] | None = None, description: str = "", ingest_roots: list[dict] | tuple[dict, ...] | None = None, ) -> dict: """Persist a validated project registration to the registry file.""" proposal = build_project_registration_proposal( project_id=project_id, aliases=aliases, description=description, ingest_roots=ingest_roots, ) if not proposal["valid"]: collision_names = ", ".join(collision["name"] for collision in proposal["collisions"]) raise ValueError(f"Project registration has collisions: {collision_names}") registry_path = _config.settings.resolved_project_registry_path payload = _load_registry_payload(registry_path) payload.setdefault("projects", []).append(proposal["project"]) _write_registry_payload(registry_path, payload) return { **proposal, "status": "registered", } def load_project_registry() -> list[RegisteredProject]: """Load project registry entries from JSON config.""" registry_path = _config.settings.resolved_project_registry_path payload = _load_registry_payload(registry_path) entries = payload.get("projects", []) projects: list[RegisteredProject] = [] for entry in entries: project_id = str(entry["id"]).strip() if not project_id: raise ValueError("Project registry entry is missing a non-empty id") aliases = tuple( alias.strip() for alias in entry.get("aliases", []) if isinstance(alias, str) and alias.strip() ) description = str(entry.get("description", "")).strip() ingest_roots = tuple( ProjectSourceRef( source=str(root["source"]).strip(), subpath=str(root["subpath"]).strip(), label=str(root.get("label", "")).strip(), ) for root in entry.get("ingest_roots", []) if str(root.get("source", "")).strip() and str(root.get("subpath", "")).strip() ) if not ingest_roots: raise ValueError(f"Project registry entry '{project_id}' has no ingest_roots") projects.append( RegisteredProject( project_id=project_id, aliases=aliases, description=description, ingest_roots=ingest_roots, ) ) _validate_unique_project_names(projects) return projects def list_registered_projects() -> list[dict]: """Return registry entries with resolved source readiness.""" return [_project_to_dict(project) for project in load_project_registry()] def get_registered_project(project_name: str) -> RegisteredProject | None: """Resolve a registry entry by id or alias.""" needle = project_name.strip().lower() if not needle: return None for project in load_project_registry(): candidates = {project.project_id.lower(), *(alias.lower() for alias in project.aliases)} if needle in candidates: return project return None def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict: """Ingest all configured source roots for a registered project.""" project = get_registered_project(project_name) if project is None: raise ValueError(f"Unknown project: {project_name}") roots = [] for source_ref in project.ingest_roots: resolved = _resolve_ingest_root(source_ref) root_result = { "source": source_ref.source, "subpath": source_ref.subpath, "label": source_ref.label, "path": str(resolved), } if not resolved.exists(): roots.append({**root_result, "status": "missing"}) continue if not resolved.is_dir(): roots.append({**root_result, "status": "not_directory"}) continue roots.append( { **root_result, "status": "ingested", "results": ingest_folder(resolved, purge_deleted=purge_deleted), } ) return { "project": project.project_id, "aliases": list(project.aliases), "description": project.description, "purge_deleted": purge_deleted, "roots": roots, } def _normalize_aliases(aliases: list[str] | tuple[str, ...]) -> list[str]: deduped: list[str] = [] seen: set[str] = set() for alias in aliases: candidate = alias.strip() if not candidate: continue key = candidate.lower() if key in seen: continue seen.add(key) deduped.append(candidate) return deduped def _normalize_ingest_roots(ingest_roots: list[dict] | tuple[dict, ...]) -> list[dict]: normalized: list[dict] = [] for root in ingest_roots: source = str(root.get("source", "")).strip() subpath = str(root.get("subpath", "")).strip() label = str(root.get("label", "")).strip() if not source or not subpath: continue if source not in {"vault", "drive"}: raise ValueError(f"Unsupported source root: {source}") normalized.append({"source": source, "subpath": subpath, "label": label}) return normalized def _project_to_dict(project: RegisteredProject) -> dict: return { "id": project.project_id, "aliases": list(project.aliases), "description": project.description, "ingest_roots": [ { **asdict(source_ref), "path": str(_resolve_ingest_root(source_ref)), "exists": _resolve_ingest_root(source_ref).exists(), "is_dir": _resolve_ingest_root(source_ref).is_dir(), } for source_ref in project.ingest_roots ], } def _resolve_ingest_root(source_ref: ProjectSourceRef) -> Path: base_map = { "vault": _config.settings.resolved_vault_source_dir, "drive": _config.settings.resolved_drive_source_dir, } try: base_dir = base_map[source_ref.source] except KeyError as exc: raise ValueError(f"Unsupported source root: {source_ref.source}") from exc return (base_dir / source_ref.subpath).resolve(strict=False) def _validate_unique_project_names(projects: list[RegisteredProject]) -> None: seen: dict[str, str] = {} for project in projects: names = [project.project_id, *project.aliases] for name in names: key = name.lower() if key in seen and seen[key] != project.project_id: raise ValueError( f"Project registry name collision: '{name}' is used by both " f"'{seen[key]}' and '{project.project_id}'" ) seen[key] = project.project_id def _find_name_collisions(project_id: str, aliases: list[str]) -> list[dict]: collisions: list[dict] = [] existing = load_project_registry() requested_names = [project_id, *aliases] for requested in requested_names: requested_key = requested.lower() for project in existing: project_names = [project.project_id, *project.aliases] if requested_key in {name.lower() for name in project_names}: collisions.append( { "name": requested, "existing_project": project.project_id, } ) break return collisions def _load_registry_payload(registry_path: Path) -> dict: if not registry_path.exists(): return {"projects": []} return json.loads(registry_path.read_text(encoding="utf-8")) def _write_registry_payload(registry_path: Path, payload: dict) -> None: registry_path.parent.mkdir(parents=True, exist_ok=True) registry_path.write_text( json.dumps(payload, indent=2, ensure_ascii=True) + "\n", encoding="utf-8", )