2026-04-06 08:02:13 -04:00
|
|
|
"""Registered project source metadata and refresh helpers."""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
from dataclasses import asdict, dataclass
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
import atocore.config as _config
|
|
|
|
|
from atocore.ingestion.pipeline import ingest_folder
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class ProjectSourceRef:
|
|
|
|
|
source: str
|
|
|
|
|
subpath: str
|
|
|
|
|
label: str = ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class RegisteredProject:
|
|
|
|
|
project_id: str
|
|
|
|
|
aliases: tuple[str, ...]
|
|
|
|
|
description: str
|
|
|
|
|
ingest_roots: tuple[ProjectSourceRef, ...]
|
|
|
|
|
|
|
|
|
|
|
2026-04-06 08:46:37 -04:00
|
|
|
def get_project_registry_template() -> dict:
|
|
|
|
|
"""Return a minimal template for registering a new project."""
|
|
|
|
|
return {
|
|
|
|
|
"projects": [
|
|
|
|
|
{
|
|
|
|
|
"id": "p07-example",
|
|
|
|
|
"aliases": ["p07", "example-project"],
|
|
|
|
|
"description": "Short description of the project and staged corpus.",
|
|
|
|
|
"ingest_roots": [
|
|
|
|
|
{
|
|
|
|
|
"source": "vault",
|
|
|
|
|
"subpath": "incoming/projects/p07-example",
|
|
|
|
|
"label": "Primary staged project docs",
|
|
|
|
|
}
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2026-04-06 08:02:13 -04:00
|
|
|
def load_project_registry() -> list[RegisteredProject]:
|
|
|
|
|
"""Load project registry entries from JSON config."""
|
|
|
|
|
registry_path = _config.settings.resolved_project_registry_path
|
|
|
|
|
if not registry_path.exists():
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
payload = json.loads(registry_path.read_text(encoding="utf-8"))
|
|
|
|
|
entries = payload.get("projects", [])
|
|
|
|
|
projects: list[RegisteredProject] = []
|
|
|
|
|
|
|
|
|
|
for entry in entries:
|
|
|
|
|
project_id = str(entry["id"]).strip()
|
2026-04-06 08:46:37 -04:00
|
|
|
if not project_id:
|
|
|
|
|
raise ValueError("Project registry entry is missing a non-empty id")
|
2026-04-06 08:02:13 -04:00
|
|
|
aliases = tuple(
|
|
|
|
|
alias.strip()
|
|
|
|
|
for alias in entry.get("aliases", [])
|
|
|
|
|
if isinstance(alias, str) and alias.strip()
|
|
|
|
|
)
|
|
|
|
|
description = str(entry.get("description", "")).strip()
|
|
|
|
|
ingest_roots = tuple(
|
|
|
|
|
ProjectSourceRef(
|
|
|
|
|
source=str(root["source"]).strip(),
|
|
|
|
|
subpath=str(root["subpath"]).strip(),
|
|
|
|
|
label=str(root.get("label", "")).strip(),
|
|
|
|
|
)
|
|
|
|
|
for root in entry.get("ingest_roots", [])
|
|
|
|
|
if str(root.get("source", "")).strip()
|
|
|
|
|
and str(root.get("subpath", "")).strip()
|
|
|
|
|
)
|
2026-04-06 08:46:37 -04:00
|
|
|
if not ingest_roots:
|
|
|
|
|
raise ValueError(f"Project registry entry '{project_id}' has no ingest_roots")
|
2026-04-06 08:02:13 -04:00
|
|
|
projects.append(
|
|
|
|
|
RegisteredProject(
|
|
|
|
|
project_id=project_id,
|
|
|
|
|
aliases=aliases,
|
|
|
|
|
description=description,
|
|
|
|
|
ingest_roots=ingest_roots,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-06 08:46:37 -04:00
|
|
|
_validate_unique_project_names(projects)
|
2026-04-06 08:02:13 -04:00
|
|
|
return projects
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_registered_projects() -> list[dict]:
|
|
|
|
|
"""Return registry entries with resolved source readiness."""
|
|
|
|
|
return [_project_to_dict(project) for project in load_project_registry()]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_registered_project(project_name: str) -> RegisteredProject | None:
|
|
|
|
|
"""Resolve a registry entry by id or alias."""
|
|
|
|
|
needle = project_name.strip().lower()
|
|
|
|
|
if not needle:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
for project in load_project_registry():
|
|
|
|
|
candidates = {project.project_id.lower(), *(alias.lower() for alias in project.aliases)}
|
|
|
|
|
if needle in candidates:
|
|
|
|
|
return project
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict:
|
|
|
|
|
"""Ingest all configured source roots for a registered project."""
|
|
|
|
|
project = get_registered_project(project_name)
|
|
|
|
|
if project is None:
|
|
|
|
|
raise ValueError(f"Unknown project: {project_name}")
|
|
|
|
|
|
|
|
|
|
roots = []
|
|
|
|
|
for source_ref in project.ingest_roots:
|
|
|
|
|
resolved = _resolve_ingest_root(source_ref)
|
|
|
|
|
root_result = {
|
|
|
|
|
"source": source_ref.source,
|
|
|
|
|
"subpath": source_ref.subpath,
|
|
|
|
|
"label": source_ref.label,
|
|
|
|
|
"path": str(resolved),
|
|
|
|
|
}
|
|
|
|
|
if not resolved.exists():
|
|
|
|
|
roots.append({**root_result, "status": "missing"})
|
|
|
|
|
continue
|
|
|
|
|
if not resolved.is_dir():
|
|
|
|
|
roots.append({**root_result, "status": "not_directory"})
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
roots.append(
|
|
|
|
|
{
|
|
|
|
|
**root_result,
|
|
|
|
|
"status": "ingested",
|
|
|
|
|
"results": ingest_folder(resolved, purge_deleted=purge_deleted),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"project": project.project_id,
|
|
|
|
|
"aliases": list(project.aliases),
|
|
|
|
|
"description": project.description,
|
|
|
|
|
"purge_deleted": purge_deleted,
|
|
|
|
|
"roots": roots,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _project_to_dict(project: RegisteredProject) -> dict:
|
|
|
|
|
return {
|
|
|
|
|
"id": project.project_id,
|
|
|
|
|
"aliases": list(project.aliases),
|
|
|
|
|
"description": project.description,
|
|
|
|
|
"ingest_roots": [
|
|
|
|
|
{
|
|
|
|
|
**asdict(source_ref),
|
|
|
|
|
"path": str(_resolve_ingest_root(source_ref)),
|
|
|
|
|
"exists": _resolve_ingest_root(source_ref).exists(),
|
|
|
|
|
"is_dir": _resolve_ingest_root(source_ref).is_dir(),
|
|
|
|
|
}
|
|
|
|
|
for source_ref in project.ingest_roots
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _resolve_ingest_root(source_ref: ProjectSourceRef) -> Path:
|
|
|
|
|
base_map = {
|
|
|
|
|
"vault": _config.settings.resolved_vault_source_dir,
|
|
|
|
|
"drive": _config.settings.resolved_drive_source_dir,
|
|
|
|
|
}
|
|
|
|
|
try:
|
|
|
|
|
base_dir = base_map[source_ref.source]
|
|
|
|
|
except KeyError as exc:
|
|
|
|
|
raise ValueError(f"Unsupported source root: {source_ref.source}") from exc
|
|
|
|
|
|
|
|
|
|
return (base_dir / source_ref.subpath).resolve(strict=False)
|
2026-04-06 08:46:37 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_unique_project_names(projects: list[RegisteredProject]) -> None:
|
|
|
|
|
seen: dict[str, str] = {}
|
|
|
|
|
for project in projects:
|
|
|
|
|
names = [project.project_id, *project.aliases]
|
|
|
|
|
for name in names:
|
|
|
|
|
key = name.lower()
|
|
|
|
|
if key in seen and seen[key] != project.project_id:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Project registry name collision: '{name}' is used by both "
|
|
|
|
|
f"'{seen[key]}' and '{project.project_id}'"
|
|
|
|
|
)
|
|
|
|
|
seen[key] = project.project_id
|