Files
ATOCore/src/atocore/projects/registry.py

438 lines
14 KiB
Python
Raw Normal View History

"""Registered project source metadata and refresh helpers."""
from __future__ import annotations
import json
import tempfile
from dataclasses import asdict, dataclass
from pathlib import Path
import atocore.config as _config
from atocore.ingestion.pipeline import ingest_folder
@dataclass(frozen=True)
class ProjectSourceRef:
source: str
subpath: str
label: str = ""
@dataclass(frozen=True)
class RegisteredProject:
project_id: str
aliases: tuple[str, ...]
description: str
ingest_roots: tuple[ProjectSourceRef, ...]
def get_project_registry_template() -> dict:
"""Return a minimal template for registering a new project."""
return {
"projects": [
{
"id": "p07-example",
"aliases": ["p07", "example-project"],
"description": "Short description of the project and staged corpus.",
"ingest_roots": [
{
"source": "vault",
"subpath": "incoming/projects/p07-example",
"label": "Primary staged project docs",
}
],
}
]
}
def build_project_registration_proposal(
project_id: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str = "",
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Build a normalized project registration proposal without mutating state."""
normalized_id = project_id.strip()
if not normalized_id:
raise ValueError("Project id must be non-empty")
normalized_aliases = _normalize_aliases(aliases or [])
normalized_roots = _normalize_ingest_roots(ingest_roots or [])
if not normalized_roots:
raise ValueError("At least one ingest root is required")
collisions = _find_name_collisions(normalized_id, normalized_aliases)
resolved_roots = []
for root in normalized_roots:
source_ref = ProjectSourceRef(
source=root["source"],
subpath=root["subpath"],
label=root.get("label", ""),
)
resolved_path = _resolve_ingest_root(source_ref)
resolved_roots.append(
{
**root,
"path": str(resolved_path),
"exists": resolved_path.exists(),
"is_dir": resolved_path.is_dir(),
}
)
return {
"project": {
"id": normalized_id,
"aliases": normalized_aliases,
"description": description.strip(),
"ingest_roots": normalized_roots,
},
"resolved_ingest_roots": resolved_roots,
"collisions": collisions,
"registry_path": str(_config.settings.resolved_project_registry_path),
"valid": not collisions,
}
2026-04-06 09:52:19 -04:00
def register_project(
project_id: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str = "",
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Persist a validated project registration to the registry file."""
proposal = build_project_registration_proposal(
project_id=project_id,
aliases=aliases,
description=description,
ingest_roots=ingest_roots,
)
if not proposal["valid"]:
collision_names = ", ".join(collision["name"] for collision in proposal["collisions"])
raise ValueError(f"Project registration has collisions: {collision_names}")
registry_path = _config.settings.resolved_project_registry_path
payload = _load_registry_payload(registry_path)
payload.setdefault("projects", []).append(proposal["project"])
_write_registry_payload(registry_path, payload)
return {
**proposal,
"status": "registered",
}
2026-04-06 12:31:24 -04:00
def update_project(
project_name: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str | None = None,
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Update an existing project registration in the registry file."""
existing = get_registered_project(project_name)
if existing is None:
raise ValueError(f"Unknown project: {project_name}")
final_aliases = _normalize_aliases(aliases) if aliases is not None else list(existing.aliases)
final_description = description.strip() if description is not None else existing.description
final_roots = (
_normalize_ingest_roots(ingest_roots)
if ingest_roots is not None
else [asdict(root) for root in existing.ingest_roots]
)
if not final_roots:
raise ValueError("At least one ingest root is required")
collisions = _find_name_collisions(
existing.project_id,
final_aliases,
exclude_project_id=existing.project_id,
)
if collisions:
collision_names = ", ".join(collision["name"] for collision in collisions)
raise ValueError(f"Project update has collisions: {collision_names}")
updated_entry = {
"id": existing.project_id,
"aliases": final_aliases,
"description": final_description,
"ingest_roots": final_roots,
}
resolved_roots = []
for root in final_roots:
source_ref = ProjectSourceRef(
source=root["source"],
subpath=root["subpath"],
label=root.get("label", ""),
)
resolved_path = _resolve_ingest_root(source_ref)
resolved_roots.append(
{
**root,
"path": str(resolved_path),
"exists": resolved_path.exists(),
"is_dir": resolved_path.is_dir(),
}
)
registry_path = _config.settings.resolved_project_registry_path
payload = _load_registry_payload(registry_path)
payload["projects"] = [
updated_entry if str(entry.get("id", "")).strip() == existing.project_id else entry
for entry in payload.get("projects", [])
]
_write_registry_payload(registry_path, payload)
return {
"project": updated_entry,
"resolved_ingest_roots": resolved_roots,
"collisions": [],
"registry_path": str(registry_path),
"valid": True,
"status": "updated",
}
def load_project_registry() -> list[RegisteredProject]:
"""Load project registry entries from JSON config."""
registry_path = _config.settings.resolved_project_registry_path
2026-04-06 09:52:19 -04:00
payload = _load_registry_payload(registry_path)
entries = payload.get("projects", [])
projects: list[RegisteredProject] = []
for entry in entries:
project_id = str(entry["id"]).strip()
if not project_id:
raise ValueError("Project registry entry is missing a non-empty id")
aliases = tuple(
alias.strip()
for alias in entry.get("aliases", [])
if isinstance(alias, str) and alias.strip()
)
description = str(entry.get("description", "")).strip()
ingest_roots = tuple(
ProjectSourceRef(
source=str(root["source"]).strip(),
subpath=str(root["subpath"]).strip(),
label=str(root.get("label", "")).strip(),
)
for root in entry.get("ingest_roots", [])
if str(root.get("source", "")).strip()
and str(root.get("subpath", "")).strip()
)
if not ingest_roots:
raise ValueError(f"Project registry entry '{project_id}' has no ingest_roots")
projects.append(
RegisteredProject(
project_id=project_id,
aliases=aliases,
description=description,
ingest_roots=ingest_roots,
)
)
_validate_unique_project_names(projects)
return projects
def list_registered_projects() -> list[dict]:
"""Return registry entries with resolved source readiness."""
return [_project_to_dict(project) for project in load_project_registry()]
def get_registered_project(project_name: str) -> RegisteredProject | None:
"""Resolve a registry entry by id or alias."""
needle = project_name.strip().lower()
if not needle:
return None
for project in load_project_registry():
candidates = {project.project_id.lower(), *(alias.lower() for alias in project.aliases)}
if needle in candidates:
return project
return None
def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict:
feat: tunable ranking, refresh status, chroma backup + admin endpoints Three small improvements that move the operational baseline forward without changing the existing trust model. 1. Tunable retrieval ranking weights - rank_project_match_boost, rank_query_token_step, rank_query_token_cap, rank_path_high_signal_boost, rank_path_low_signal_penalty are now Settings fields - all overridable via ATOCORE_* env vars - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32 - lets ranking be tuned per environment as Wave 1 is exercised without code changes 2. /projects/{name}/refresh status - refresh_registered_project now returns an overall status field ("ingested", "partial", "nothing_to_ingest") plus roots_ingested and roots_skipped counters - ProjectRefreshResponse advertises the new fields so callers can rely on them - covers the case where every configured root is missing on disk 3. Chroma cold snapshot + admin backup endpoints - create_runtime_backup now accepts include_chroma and writes a cold directory copy of the chroma persistence path - new list_runtime_backups() and validate_backup() helpers - new endpoints: - POST /admin/backup create snapshot (optional chroma) - GET /admin/backup list snapshots - GET /admin/backup/{stamp}/validate structural validation - chroma snapshots are taken under exclusive_ingestion() so a refresh or ingest cannot race with the cold copy - backup metadata records what was actually included and how big Tests: - 8 new tests covering tunable weights, refresh status branches (ingested / partial / nothing_to_ingest), chroma snapshot, list, validate, and the API endpoints (including the lock-acquisition path) - existing fake refresh stubs in test_api_storage.py updated for the expanded ProjectRefreshResponse model - full suite: 105 passing (was 97) next-steps doc updated to reflect that the chroma snapshot + restore validation gap from current-state.md is now closed in code; only the operational retention policy remains.
2026-04-06 18:42:19 -04:00
"""Ingest all configured source roots for a registered project.
The returned dict carries an overall ``status`` so callers can tell at a
glance whether the refresh was fully successful, partial, or did nothing
at all because every configured root was missing or not a directory:
- ``ingested``: every root was a real directory and was ingested
- ``partial``: at least one root ingested and at least one was unusable
- ``nothing_to_ingest``: no roots were usable
"""
project = get_registered_project(project_name)
if project is None:
raise ValueError(f"Unknown project: {project_name}")
roots = []
feat: tunable ranking, refresh status, chroma backup + admin endpoints Three small improvements that move the operational baseline forward without changing the existing trust model. 1. Tunable retrieval ranking weights - rank_project_match_boost, rank_query_token_step, rank_query_token_cap, rank_path_high_signal_boost, rank_path_low_signal_penalty are now Settings fields - all overridable via ATOCORE_* env vars - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32 - lets ranking be tuned per environment as Wave 1 is exercised without code changes 2. /projects/{name}/refresh status - refresh_registered_project now returns an overall status field ("ingested", "partial", "nothing_to_ingest") plus roots_ingested and roots_skipped counters - ProjectRefreshResponse advertises the new fields so callers can rely on them - covers the case where every configured root is missing on disk 3. Chroma cold snapshot + admin backup endpoints - create_runtime_backup now accepts include_chroma and writes a cold directory copy of the chroma persistence path - new list_runtime_backups() and validate_backup() helpers - new endpoints: - POST /admin/backup create snapshot (optional chroma) - GET /admin/backup list snapshots - GET /admin/backup/{stamp}/validate structural validation - chroma snapshots are taken under exclusive_ingestion() so a refresh or ingest cannot race with the cold copy - backup metadata records what was actually included and how big Tests: - 8 new tests covering tunable weights, refresh status branches (ingested / partial / nothing_to_ingest), chroma snapshot, list, validate, and the API endpoints (including the lock-acquisition path) - existing fake refresh stubs in test_api_storage.py updated for the expanded ProjectRefreshResponse model - full suite: 105 passing (was 97) next-steps doc updated to reflect that the chroma snapshot + restore validation gap from current-state.md is now closed in code; only the operational retention policy remains.
2026-04-06 18:42:19 -04:00
ingested_count = 0
skipped_count = 0
for source_ref in project.ingest_roots:
resolved = _resolve_ingest_root(source_ref)
root_result = {
"source": source_ref.source,
"subpath": source_ref.subpath,
"label": source_ref.label,
"path": str(resolved),
}
if not resolved.exists():
roots.append({**root_result, "status": "missing"})
feat: tunable ranking, refresh status, chroma backup + admin endpoints Three small improvements that move the operational baseline forward without changing the existing trust model. 1. Tunable retrieval ranking weights - rank_project_match_boost, rank_query_token_step, rank_query_token_cap, rank_path_high_signal_boost, rank_path_low_signal_penalty are now Settings fields - all overridable via ATOCORE_* env vars - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32 - lets ranking be tuned per environment as Wave 1 is exercised without code changes 2. /projects/{name}/refresh status - refresh_registered_project now returns an overall status field ("ingested", "partial", "nothing_to_ingest") plus roots_ingested and roots_skipped counters - ProjectRefreshResponse advertises the new fields so callers can rely on them - covers the case where every configured root is missing on disk 3. Chroma cold snapshot + admin backup endpoints - create_runtime_backup now accepts include_chroma and writes a cold directory copy of the chroma persistence path - new list_runtime_backups() and validate_backup() helpers - new endpoints: - POST /admin/backup create snapshot (optional chroma) - GET /admin/backup list snapshots - GET /admin/backup/{stamp}/validate structural validation - chroma snapshots are taken under exclusive_ingestion() so a refresh or ingest cannot race with the cold copy - backup metadata records what was actually included and how big Tests: - 8 new tests covering tunable weights, refresh status branches (ingested / partial / nothing_to_ingest), chroma snapshot, list, validate, and the API endpoints (including the lock-acquisition path) - existing fake refresh stubs in test_api_storage.py updated for the expanded ProjectRefreshResponse model - full suite: 105 passing (was 97) next-steps doc updated to reflect that the chroma snapshot + restore validation gap from current-state.md is now closed in code; only the operational retention policy remains.
2026-04-06 18:42:19 -04:00
skipped_count += 1
continue
if not resolved.is_dir():
roots.append({**root_result, "status": "not_directory"})
feat: tunable ranking, refresh status, chroma backup + admin endpoints Three small improvements that move the operational baseline forward without changing the existing trust model. 1. Tunable retrieval ranking weights - rank_project_match_boost, rank_query_token_step, rank_query_token_cap, rank_path_high_signal_boost, rank_path_low_signal_penalty are now Settings fields - all overridable via ATOCORE_* env vars - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32 - lets ranking be tuned per environment as Wave 1 is exercised without code changes 2. /projects/{name}/refresh status - refresh_registered_project now returns an overall status field ("ingested", "partial", "nothing_to_ingest") plus roots_ingested and roots_skipped counters - ProjectRefreshResponse advertises the new fields so callers can rely on them - covers the case where every configured root is missing on disk 3. Chroma cold snapshot + admin backup endpoints - create_runtime_backup now accepts include_chroma and writes a cold directory copy of the chroma persistence path - new list_runtime_backups() and validate_backup() helpers - new endpoints: - POST /admin/backup create snapshot (optional chroma) - GET /admin/backup list snapshots - GET /admin/backup/{stamp}/validate structural validation - chroma snapshots are taken under exclusive_ingestion() so a refresh or ingest cannot race with the cold copy - backup metadata records what was actually included and how big Tests: - 8 new tests covering tunable weights, refresh status branches (ingested / partial / nothing_to_ingest), chroma snapshot, list, validate, and the API endpoints (including the lock-acquisition path) - existing fake refresh stubs in test_api_storage.py updated for the expanded ProjectRefreshResponse model - full suite: 105 passing (was 97) next-steps doc updated to reflect that the chroma snapshot + restore validation gap from current-state.md is now closed in code; only the operational retention policy remains.
2026-04-06 18:42:19 -04:00
skipped_count += 1
continue
roots.append(
{
**root_result,
"status": "ingested",
"results": ingest_folder(resolved, purge_deleted=purge_deleted),
}
)
feat: tunable ranking, refresh status, chroma backup + admin endpoints Three small improvements that move the operational baseline forward without changing the existing trust model. 1. Tunable retrieval ranking weights - rank_project_match_boost, rank_query_token_step, rank_query_token_cap, rank_path_high_signal_boost, rank_path_low_signal_penalty are now Settings fields - all overridable via ATOCORE_* env vars - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32 - lets ranking be tuned per environment as Wave 1 is exercised without code changes 2. /projects/{name}/refresh status - refresh_registered_project now returns an overall status field ("ingested", "partial", "nothing_to_ingest") plus roots_ingested and roots_skipped counters - ProjectRefreshResponse advertises the new fields so callers can rely on them - covers the case where every configured root is missing on disk 3. Chroma cold snapshot + admin backup endpoints - create_runtime_backup now accepts include_chroma and writes a cold directory copy of the chroma persistence path - new list_runtime_backups() and validate_backup() helpers - new endpoints: - POST /admin/backup create snapshot (optional chroma) - GET /admin/backup list snapshots - GET /admin/backup/{stamp}/validate structural validation - chroma snapshots are taken under exclusive_ingestion() so a refresh or ingest cannot race with the cold copy - backup metadata records what was actually included and how big Tests: - 8 new tests covering tunable weights, refresh status branches (ingested / partial / nothing_to_ingest), chroma snapshot, list, validate, and the API endpoints (including the lock-acquisition path) - existing fake refresh stubs in test_api_storage.py updated for the expanded ProjectRefreshResponse model - full suite: 105 passing (was 97) next-steps doc updated to reflect that the chroma snapshot + restore validation gap from current-state.md is now closed in code; only the operational retention policy remains.
2026-04-06 18:42:19 -04:00
ingested_count += 1
if ingested_count == 0:
overall_status = "nothing_to_ingest"
elif skipped_count == 0:
overall_status = "ingested"
else:
overall_status = "partial"
return {
"project": project.project_id,
"aliases": list(project.aliases),
"description": project.description,
"purge_deleted": purge_deleted,
feat: tunable ranking, refresh status, chroma backup + admin endpoints Three small improvements that move the operational baseline forward without changing the existing trust model. 1. Tunable retrieval ranking weights - rank_project_match_boost, rank_query_token_step, rank_query_token_cap, rank_path_high_signal_boost, rank_path_low_signal_penalty are now Settings fields - all overridable via ATOCORE_* env vars - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32 - lets ranking be tuned per environment as Wave 1 is exercised without code changes 2. /projects/{name}/refresh status - refresh_registered_project now returns an overall status field ("ingested", "partial", "nothing_to_ingest") plus roots_ingested and roots_skipped counters - ProjectRefreshResponse advertises the new fields so callers can rely on them - covers the case where every configured root is missing on disk 3. Chroma cold snapshot + admin backup endpoints - create_runtime_backup now accepts include_chroma and writes a cold directory copy of the chroma persistence path - new list_runtime_backups() and validate_backup() helpers - new endpoints: - POST /admin/backup create snapshot (optional chroma) - GET /admin/backup list snapshots - GET /admin/backup/{stamp}/validate structural validation - chroma snapshots are taken under exclusive_ingestion() so a refresh or ingest cannot race with the cold copy - backup metadata records what was actually included and how big Tests: - 8 new tests covering tunable weights, refresh status branches (ingested / partial / nothing_to_ingest), chroma snapshot, list, validate, and the API endpoints (including the lock-acquisition path) - existing fake refresh stubs in test_api_storage.py updated for the expanded ProjectRefreshResponse model - full suite: 105 passing (was 97) next-steps doc updated to reflect that the chroma snapshot + restore validation gap from current-state.md is now closed in code; only the operational retention policy remains.
2026-04-06 18:42:19 -04:00
"status": overall_status,
"roots_ingested": ingested_count,
"roots_skipped": skipped_count,
"roots": roots,
}
def _normalize_aliases(aliases: list[str] | tuple[str, ...]) -> list[str]:
deduped: list[str] = []
seen: set[str] = set()
for alias in aliases:
candidate = alias.strip()
if not candidate:
continue
key = candidate.lower()
if key in seen:
continue
seen.add(key)
deduped.append(candidate)
return deduped
def _normalize_ingest_roots(ingest_roots: list[dict] | tuple[dict, ...]) -> list[dict]:
normalized: list[dict] = []
for root in ingest_roots:
source = str(root.get("source", "")).strip()
subpath = str(root.get("subpath", "")).strip()
label = str(root.get("label", "")).strip()
if not source or not subpath:
continue
if source not in {"vault", "drive"}:
raise ValueError(f"Unsupported source root: {source}")
normalized.append({"source": source, "subpath": subpath, "label": label})
return normalized
def _project_to_dict(project: RegisteredProject) -> dict:
return {
"id": project.project_id,
"aliases": list(project.aliases),
"description": project.description,
"ingest_roots": [
{
**asdict(source_ref),
"path": str(_resolve_ingest_root(source_ref)),
"exists": _resolve_ingest_root(source_ref).exists(),
"is_dir": _resolve_ingest_root(source_ref).is_dir(),
}
for source_ref in project.ingest_roots
],
}
def _resolve_ingest_root(source_ref: ProjectSourceRef) -> Path:
base_map = {
"vault": _config.settings.resolved_vault_source_dir,
"drive": _config.settings.resolved_drive_source_dir,
}
try:
base_dir = base_map[source_ref.source]
except KeyError as exc:
raise ValueError(f"Unsupported source root: {source_ref.source}") from exc
return (base_dir / source_ref.subpath).resolve(strict=False)
def _validate_unique_project_names(projects: list[RegisteredProject]) -> None:
seen: dict[str, str] = {}
for project in projects:
names = [project.project_id, *project.aliases]
for name in names:
key = name.lower()
if key in seen and seen[key] != project.project_id:
raise ValueError(
f"Project registry name collision: '{name}' is used by both "
f"'{seen[key]}' and '{project.project_id}'"
)
seen[key] = project.project_id
2026-04-06 12:31:24 -04:00
def _find_name_collisions(
project_id: str,
aliases: list[str],
exclude_project_id: str | None = None,
) -> list[dict]:
collisions: list[dict] = []
existing = load_project_registry()
requested_names = [project_id, *aliases]
for requested in requested_names:
requested_key = requested.lower()
for project in existing:
2026-04-06 12:31:24 -04:00
if exclude_project_id is not None and project.project_id == exclude_project_id:
continue
project_names = [project.project_id, *project.aliases]
if requested_key in {name.lower() for name in project_names}:
collisions.append(
{
"name": requested,
"existing_project": project.project_id,
}
)
break
return collisions
2026-04-06 09:52:19 -04:00
def _load_registry_payload(registry_path: Path) -> dict:
if not registry_path.exists():
return {"projects": []}
return json.loads(registry_path.read_text(encoding="utf-8"))
def _write_registry_payload(registry_path: Path, payload: dict) -> None:
registry_path.parent.mkdir(parents=True, exist_ok=True)
rendered = json.dumps(payload, indent=2, ensure_ascii=True) + "\n"
with tempfile.NamedTemporaryFile(
mode="w",
2026-04-06 09:52:19 -04:00
encoding="utf-8",
dir=registry_path.parent,
prefix=f"{registry_path.stem}.",
suffix=".tmp",
delete=False,
) as tmp_file:
tmp_file.write(rendered)
temp_path = Path(tmp_file.name)
temp_path.replace(registry_path)