Files
ATOCore/src/atocore/projects/registry.py

546 lines
18 KiB
Python

"""Registered project source metadata and refresh helpers."""
from __future__ import annotations
import json
import tempfile
from dataclasses import asdict, dataclass
from pathlib import Path
import atocore.config as _config
# Reserved pseudo-projects. `inbox` holds pre-project / lead / quote
# entities that don't yet belong to a real project. `""` (empty) is the
# cross-project bucket for facts that apply to every project (material
# properties, vendor capabilities). Neither may be registered, renamed,
# or deleted via the normal registry CRUD.
INBOX_PROJECT = "inbox"
GLOBAL_PROJECT = ""
_RESERVED_PROJECT_IDS = {INBOX_PROJECT}
def is_reserved_project(name: str) -> bool:
return (name or "").strip().lower() in _RESERVED_PROJECT_IDS
@dataclass(frozen=True)
class ProjectSourceRef:
source: str
subpath: str
label: str = ""
@dataclass(frozen=True)
class RegisteredProject:
project_id: str
aliases: tuple[str, ...]
description: str
ingest_roots: tuple[ProjectSourceRef, ...]
def get_project_registry_template() -> dict:
"""Return a minimal template for registering a new project."""
return {
"projects": [
{
"id": "p07-example",
"aliases": ["p07", "example-project"],
"description": "Short description of the project and staged corpus.",
"ingest_roots": [
{
"source": "vault",
"subpath": "incoming/projects/p07-example",
"label": "Primary staged project docs",
}
],
}
]
}
def build_project_registration_proposal(
project_id: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str = "",
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Build a normalized project registration proposal without mutating state."""
normalized_id = project_id.strip()
if not normalized_id:
raise ValueError("Project id must be non-empty")
if is_reserved_project(normalized_id):
raise ValueError(
f"Project id {normalized_id!r} is reserved and cannot be registered"
)
normalized_aliases = _normalize_aliases(aliases or [])
for alias in normalized_aliases:
if is_reserved_project(alias):
raise ValueError(
f"Alias {alias!r} is reserved and cannot be used"
)
normalized_roots = _normalize_ingest_roots(ingest_roots or [])
if not normalized_roots:
raise ValueError("At least one ingest root is required")
collisions = _find_name_collisions(normalized_id, normalized_aliases)
resolved_roots = []
for root in normalized_roots:
source_ref = ProjectSourceRef(
source=root["source"],
subpath=root["subpath"],
label=root.get("label", ""),
)
resolved_path = _resolve_ingest_root(source_ref)
resolved_roots.append(
{
**root,
"path": str(resolved_path),
"exists": resolved_path.exists(),
"is_dir": resolved_path.is_dir(),
}
)
return {
"project": {
"id": normalized_id,
"aliases": normalized_aliases,
"description": description.strip(),
"ingest_roots": normalized_roots,
},
"resolved_ingest_roots": resolved_roots,
"collisions": collisions,
"registry_path": str(_config.settings.resolved_project_registry_path),
"valid": not collisions,
}
def register_project(
project_id: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str = "",
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Persist a validated project registration to the registry file."""
proposal = build_project_registration_proposal(
project_id=project_id,
aliases=aliases,
description=description,
ingest_roots=ingest_roots,
)
if not proposal["valid"]:
collision_names = ", ".join(collision["name"] for collision in proposal["collisions"])
raise ValueError(f"Project registration has collisions: {collision_names}")
registry_path = _config.settings.resolved_project_registry_path
payload = _load_registry_payload(registry_path)
payload.setdefault("projects", []).append(proposal["project"])
_write_registry_payload(registry_path, payload)
return {
**proposal,
"status": "registered",
}
def update_project(
project_name: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str | None = None,
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Update an existing project registration in the registry file."""
if is_reserved_project(project_name):
raise ValueError(
f"Project {project_name!r} is reserved and cannot be modified"
)
existing = get_registered_project(project_name)
if existing is None:
raise ValueError(f"Unknown project: {project_name}")
final_aliases = _normalize_aliases(aliases) if aliases is not None else list(existing.aliases)
final_description = description.strip() if description is not None else existing.description
final_roots = (
_normalize_ingest_roots(ingest_roots)
if ingest_roots is not None
else [asdict(root) for root in existing.ingest_roots]
)
if not final_roots:
raise ValueError("At least one ingest root is required")
collisions = _find_name_collisions(
existing.project_id,
final_aliases,
exclude_project_id=existing.project_id,
)
if collisions:
collision_names = ", ".join(collision["name"] for collision in collisions)
raise ValueError(f"Project update has collisions: {collision_names}")
updated_entry = {
"id": existing.project_id,
"aliases": final_aliases,
"description": final_description,
"ingest_roots": final_roots,
}
resolved_roots = []
for root in final_roots:
source_ref = ProjectSourceRef(
source=root["source"],
subpath=root["subpath"],
label=root.get("label", ""),
)
resolved_path = _resolve_ingest_root(source_ref)
resolved_roots.append(
{
**root,
"path": str(resolved_path),
"exists": resolved_path.exists(),
"is_dir": resolved_path.is_dir(),
}
)
registry_path = _config.settings.resolved_project_registry_path
payload = _load_registry_payload(registry_path)
payload["projects"] = [
updated_entry if str(entry.get("id", "")).strip() == existing.project_id else entry
for entry in payload.get("projects", [])
]
_write_registry_payload(registry_path, payload)
return {
"project": updated_entry,
"resolved_ingest_roots": resolved_roots,
"collisions": [],
"registry_path": str(registry_path),
"valid": True,
"status": "updated",
}
def load_project_registry() -> list[RegisteredProject]:
"""Load project registry entries from JSON config."""
registry_path = _config.settings.resolved_project_registry_path
payload = _load_registry_payload(registry_path)
entries = payload.get("projects", [])
projects: list[RegisteredProject] = []
for entry in entries:
project_id = str(entry["id"]).strip()
if not project_id:
raise ValueError("Project registry entry is missing a non-empty id")
aliases = tuple(
alias.strip()
for alias in entry.get("aliases", [])
if isinstance(alias, str) and alias.strip()
)
description = str(entry.get("description", "")).strip()
ingest_roots = tuple(
ProjectSourceRef(
source=str(root["source"]).strip(),
subpath=str(root["subpath"]).strip(),
label=str(root.get("label", "")).strip(),
)
for root in entry.get("ingest_roots", [])
if str(root.get("source", "")).strip()
and str(root.get("subpath", "")).strip()
)
if not ingest_roots:
raise ValueError(f"Project registry entry '{project_id}' has no ingest_roots")
projects.append(
RegisteredProject(
project_id=project_id,
aliases=aliases,
description=description,
ingest_roots=ingest_roots,
)
)
_validate_unique_project_names(projects)
_validate_ingest_root_overlaps(projects)
return projects
def list_registered_projects() -> list[dict]:
"""Return registry entries with resolved source readiness."""
return [_project_to_dict(project) for project in load_project_registry()]
def get_registered_project(project_name: str) -> RegisteredProject | None:
"""Resolve a registry entry by id or alias."""
needle = project_name.strip().lower()
if not needle:
return None
for project in load_project_registry():
candidates = {project.project_id.lower(), *(alias.lower() for alias in project.aliases)}
if needle in candidates:
return project
return None
def resolve_project_name(name: str | None) -> str:
"""Canonicalize a project name through the registry.
Returns the canonical ``project_id`` if the input matches any
registered project's id or alias. Returns the input unchanged
when it's empty or not in the registry — the second case keeps
backwards compatibility with hand-curated state, memories, and
interactions that predate the registry, or for projects that
are intentionally not registered.
This helper is the single canonicalization boundary for project
names across the trust hierarchy. Every read/write that takes a
project name should pass it through ``resolve_project_name``
before storing or querying. The contract is documented in
``docs/architecture/representation-authority.md``.
"""
if not name:
return name or ""
if is_reserved_project(name):
return name.strip().lower()
project = get_registered_project(name)
if project is not None:
return project.project_id
return name
def derive_project_id_for_path(file_path: str | Path) -> str:
"""Return the registered project that owns a source path, if any."""
if not file_path:
return ""
doc_path = Path(file_path).resolve(strict=False)
matches: list[tuple[int, int, str]] = []
for project in load_project_registry():
for source_ref in project.ingest_roots:
root_path = _resolve_ingest_root(source_ref)
try:
doc_path.relative_to(root_path)
except ValueError:
continue
matches.append((len(root_path.parts), len(str(root_path)), project.project_id))
if not matches:
return ""
matches.sort(reverse=True)
return matches[0][2]
def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict:
"""Ingest all configured source roots for a registered project.
The returned dict carries an overall ``status`` so callers can tell at a
glance whether the refresh was fully successful, partial, or did nothing
at all because every configured root was missing or not a directory:
- ``ingested``: every root was a real directory and was ingested
- ``partial``: at least one root ingested and at least one was unusable
- ``nothing_to_ingest``: no roots were usable
"""
project = get_registered_project(project_name)
if project is None:
raise ValueError(f"Unknown project: {project_name}")
from atocore.ingestion.pipeline import ingest_project_folder
roots = []
ingested_count = 0
skipped_count = 0
for source_ref in project.ingest_roots:
resolved = _resolve_ingest_root(source_ref)
root_result = {
"source": source_ref.source,
"subpath": source_ref.subpath,
"label": source_ref.label,
"path": str(resolved),
}
if not resolved.exists():
roots.append({**root_result, "status": "missing"})
skipped_count += 1
continue
if not resolved.is_dir():
roots.append({**root_result, "status": "not_directory"})
skipped_count += 1
continue
roots.append(
{
**root_result,
"status": "ingested",
"results": ingest_project_folder(
resolved,
purge_deleted=purge_deleted,
project_id=project.project_id,
),
}
)
ingested_count += 1
if ingested_count == 0:
overall_status = "nothing_to_ingest"
elif skipped_count == 0:
overall_status = "ingested"
else:
overall_status = "partial"
return {
"project": project.project_id,
"aliases": list(project.aliases),
"description": project.description,
"purge_deleted": purge_deleted,
"status": overall_status,
"roots_ingested": ingested_count,
"roots_skipped": skipped_count,
"roots": roots,
}
def _normalize_aliases(aliases: list[str] | tuple[str, ...]) -> list[str]:
deduped: list[str] = []
seen: set[str] = set()
for alias in aliases:
candidate = alias.strip()
if not candidate:
continue
key = candidate.lower()
if key in seen:
continue
seen.add(key)
deduped.append(candidate)
return deduped
def _normalize_ingest_roots(ingest_roots: list[dict] | tuple[dict, ...]) -> list[dict]:
normalized: list[dict] = []
for root in ingest_roots:
source = str(root.get("source", "")).strip()
subpath = str(root.get("subpath", "")).strip()
label = str(root.get("label", "")).strip()
if not source or not subpath:
continue
if source not in {"vault", "drive"}:
raise ValueError(f"Unsupported source root: {source}")
normalized.append({"source": source, "subpath": subpath, "label": label})
return normalized
def _project_to_dict(project: RegisteredProject) -> dict:
return {
"id": project.project_id,
"aliases": list(project.aliases),
"description": project.description,
"ingest_roots": [
{
**asdict(source_ref),
"path": str(_resolve_ingest_root(source_ref)),
"exists": _resolve_ingest_root(source_ref).exists(),
"is_dir": _resolve_ingest_root(source_ref).is_dir(),
}
for source_ref in project.ingest_roots
],
}
def _resolve_ingest_root(source_ref: ProjectSourceRef) -> Path:
base_map = {
"vault": _config.settings.resolved_vault_source_dir,
"drive": _config.settings.resolved_drive_source_dir,
}
try:
base_dir = base_map[source_ref.source]
except KeyError as exc:
raise ValueError(f"Unsupported source root: {source_ref.source}") from exc
return (base_dir / source_ref.subpath).resolve(strict=False)
def _validate_unique_project_names(projects: list[RegisteredProject]) -> None:
seen: dict[str, str] = {}
for project in projects:
names = [project.project_id, *project.aliases]
for name in names:
key = name.lower()
if key in seen and seen[key] != project.project_id:
raise ValueError(
f"Project registry name collision: '{name}' is used by both "
f"'{seen[key]}' and '{project.project_id}'"
)
seen[key] = project.project_id
def _validate_ingest_root_overlaps(projects: list[RegisteredProject]) -> None:
roots: list[tuple[str, Path]] = []
for project in projects:
for source_ref in project.ingest_roots:
roots.append((project.project_id, _resolve_ingest_root(source_ref)))
for i, (left_project, left_root) in enumerate(roots):
for right_project, right_root in roots[i + 1:]:
if left_project == right_project:
continue
try:
left_root.relative_to(right_root)
overlaps = True
except ValueError:
try:
right_root.relative_to(left_root)
overlaps = True
except ValueError:
overlaps = False
if overlaps:
raise ValueError(
"Project registry ingest root overlap: "
f"'{left_root}' ({left_project}) and "
f"'{right_root}' ({right_project})"
)
def _find_name_collisions(
project_id: str,
aliases: list[str],
exclude_project_id: str | None = None,
) -> list[dict]:
collisions: list[dict] = []
existing = load_project_registry()
requested_names = [project_id, *aliases]
for requested in requested_names:
requested_key = requested.lower()
for project in existing:
if exclude_project_id is not None and project.project_id == exclude_project_id:
continue
project_names = [project.project_id, *project.aliases]
if requested_key in {name.lower() for name in project_names}:
collisions.append(
{
"name": requested,
"existing_project": project.project_id,
}
)
break
return collisions
def _load_registry_payload(registry_path: Path) -> dict:
if not registry_path.exists():
return {"projects": []}
return json.loads(registry_path.read_text(encoding="utf-8"))
def _write_registry_payload(registry_path: Path, payload: dict) -> None:
registry_path.parent.mkdir(parents=True, exist_ok=True)
rendered = json.dumps(payload, indent=2, ensure_ascii=True) + "\n"
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=registry_path.parent,
prefix=f"{registry_path.stem}.",
suffix=".tmp",
delete=False,
) as tmp_file:
tmp_file.write(rendered)
temp_path = Path(tmp_file.name)
temp_path.replace(registry_path)