Add project registration proposal preview

This commit is contained in:
2026-04-06 09:11:11 -04:00
parent 827dcf2cd1
commit 1f1e6b5749
4 changed files with 244 additions and 0 deletions

View File

@@ -34,6 +34,7 @@ from atocore.memory.service import (
)
from atocore.observability.logger import get_logger
from atocore.projects.registry import (
build_project_registration_proposal,
get_project_registry_template,
list_registered_projects,
refresh_registered_project,
@@ -68,6 +69,13 @@ class ProjectRefreshResponse(BaseModel):
roots: list[dict]
class ProjectRegistrationProposalRequest(BaseModel):
project_id: str
aliases: list[str] = []
description: str = ""
ingest_roots: list[dict]
class QueryRequest(BaseModel):
prompt: str
top_k: int = 10
@@ -180,6 +188,20 @@ def api_projects_template() -> dict:
}
@router.post("/projects/proposal")
def api_project_registration_proposal(req: ProjectRegistrationProposalRequest) -> dict:
"""Return a normalized project registration proposal without writing it."""
try:
return build_project_registration_proposal(
project_id=req.project_id,
aliases=req.aliases,
description=req.description,
ingest_roots=req.ingest_roots,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/projects/{project_name}/refresh", response_model=ProjectRefreshResponse)
def api_refresh_project(project_name: str, purge_deleted: bool = False) -> ProjectRefreshResponse:
"""Refresh one registered project from its configured ingest roots."""

View File

@@ -45,6 +45,54 @@ def get_project_registry_template() -> dict:
}
def build_project_registration_proposal(
project_id: str,
aliases: list[str] | tuple[str, ...] | None = None,
description: str = "",
ingest_roots: list[dict] | tuple[dict, ...] | None = None,
) -> dict:
"""Build a normalized project registration proposal without mutating state."""
normalized_id = project_id.strip()
if not normalized_id:
raise ValueError("Project id must be non-empty")
normalized_aliases = _normalize_aliases(aliases or [])
normalized_roots = _normalize_ingest_roots(ingest_roots or [])
if not normalized_roots:
raise ValueError("At least one ingest root is required")
collisions = _find_name_collisions(normalized_id, normalized_aliases)
resolved_roots = []
for root in normalized_roots:
source_ref = ProjectSourceRef(
source=root["source"],
subpath=root["subpath"],
label=root.get("label", ""),
)
resolved_path = _resolve_ingest_root(source_ref)
resolved_roots.append(
{
**root,
"path": str(resolved_path),
"exists": resolved_path.exists(),
"is_dir": resolved_path.is_dir(),
}
)
return {
"project": {
"id": normalized_id,
"aliases": normalized_aliases,
"description": description.strip(),
"ingest_roots": normalized_roots,
},
"resolved_ingest_roots": resolved_roots,
"collisions": collisions,
"registry_path": str(_config.settings.resolved_project_registry_path),
"valid": not collisions,
}
def load_project_registry() -> list[RegisteredProject]:
"""Load project registry entries from JSON config."""
registry_path = _config.settings.resolved_project_registry_path
@@ -147,6 +195,35 @@ def refresh_registered_project(project_name: str, purge_deleted: bool = False) -
}
def _normalize_aliases(aliases: list[str] | tuple[str, ...]) -> list[str]:
deduped: list[str] = []
seen: set[str] = set()
for alias in aliases:
candidate = alias.strip()
if not candidate:
continue
key = candidate.lower()
if key in seen:
continue
seen.add(key)
deduped.append(candidate)
return deduped
def _normalize_ingest_roots(ingest_roots: list[dict] | tuple[dict, ...]) -> list[dict]:
normalized: list[dict] = []
for root in ingest_roots:
source = str(root.get("source", "")).strip()
subpath = str(root.get("subpath", "")).strip()
label = str(root.get("label", "")).strip()
if not source or not subpath:
continue
if source not in {"vault", "drive"}:
raise ValueError(f"Unsupported source root: {source}")
normalized.append({"source": source, "subpath": subpath, "label": label})
return normalized
def _project_to_dict(project: RegisteredProject) -> dict:
return {
"id": project.project_id,
@@ -189,3 +266,22 @@ def _validate_unique_project_names(projects: list[RegisteredProject]) -> None:
f"'{seen[key]}' and '{project.project_id}'"
)
seen[key] = project.project_id
def _find_name_collisions(project_id: str, aliases: list[str]) -> list[dict]:
collisions: list[dict] = []
existing = load_project_registry()
requested_names = [project_id, *aliases]
for requested in requested_names:
requested_key = requested.lower()
for project in existing:
project_names = [project.project_id, *project.aliases]
if requested_key in {name.lower() for name in project_names}:
collisions.append(
{
"name": requested,
"existing_project": project.project_id,
}
)
break
return collisions