From ceb129c7d1c25d899be7a46e9e88d1428420a2f5 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Mon, 6 Apr 2026 19:59:09 -0400 Subject: [PATCH] Add operator client and operations playbook --- README.md | 9 ++ docs/operations.md | 96 ++++++++++++ scripts/atocore_client.py | 313 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 418 insertions(+) create mode 100644 docs/operations.md create mode 100644 scripts/atocore_client.py diff --git a/README.md b/README.md index 02683a2..3728cda 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,10 @@ curl -X POST http://localhost:8100/context/build \ # CLI ingestion python scripts/ingest_folder.py --path /path/to/notes + +# Live operator client +python scripts/atocore_client.py health +python scripts/atocore_client.py audit-query "gigabit" 5 ``` ## API Endpoints @@ -65,3 +69,8 @@ Set via environment variables (prefix `ATOCORE_`): pip install -e ".[dev]" pytest ``` + +## Operations + +- `scripts/atocore_client.py` provides a live API client for project refresh, project-state inspection, and retrieval-quality audits. +- `docs/operations.md` captures the current operational priority order: retrieval quality, Wave 2 trusted-operational ingestion, AtoDrive scoping, and restore validation. diff --git a/docs/operations.md b/docs/operations.md new file mode 100644 index 0000000..4efc3ba --- /dev/null +++ b/docs/operations.md @@ -0,0 +1,96 @@ +# AtoCore Operations + +Current operating order for improving AtoCore: + +1. Retrieval-quality pass +2. Wave 2 trusted-operational ingestion +3. AtoDrive clarification +4. Restore and ops validation + +## Retrieval-Quality Pass + +Current live behavior: + +- broad prompts like `gigabit` and `polisher` can surface archive/history noise +- meaningful project prompts perform much better +- ranking quality now matters more than raw corpus growth + +Use the operator client to audit retrieval: + +```bash +python scripts/atocore_client.py audit-query "gigabit" 5 +python scripts/atocore_client.py audit-query "polisher" 5 +python scripts/atocore_client.py audit-query "mirror frame stiffness requirements and selected architecture" 5 p04-gigabit +python scripts/atocore_client.py audit-query "interferometer error budget and vendor selection constraints" 5 p05-interferometer +python scripts/atocore_client.py audit-query "polisher system map shared contracts and calibration workflow" 5 p06-polisher +``` + +What to improve: + +- reduce `_archive`, `pre-cleanup`, `pre-migration`, and `History` prominence +- prefer current-status, decision, requirement, architecture-freeze, and milestone docs +- prefer trusted project-state when it expresses current truth +- avoid letting broad single-word prompts drift into stale chunks + +## Wave 2 Trusted-Operational Ingestion + +Do not ingest the whole PKM vault next. + +Prioritize, for each active project: + +- current status +- current decisions +- requirements baseline +- architecture freeze / current baseline +- milestone plan +- next actions + +Useful commands: + +```bash +python scripts/atocore_client.py project-state p04-gigabit +python scripts/atocore_client.py project-state p05-interferometer +python scripts/atocore_client.py project-state p06-polisher +python scripts/atocore_client.py refresh-project p04-gigabit +python scripts/atocore_client.py refresh-project p05-interferometer +python scripts/atocore_client.py refresh-project p06-polisher +``` + +## AtoDrive Clarification + +Treat AtoDrive as a curated trusted-operational source, not a generic dump. + +Good candidates: + +- current dashboards +- approved baselines +- architecture freezes +- decision logs +- milestone and next-step views + +Avoid by default: + +- duplicated exports +- stale snapshots +- generic archives +- exploratory notes that are not designated current truth + +## Restore and Ops Validation + +Backups are not enough until restore has been tested. + +Validate: + +- SQLite metadata restore +- Chroma restore or rebuild +- project registry restore +- project refresh after recovery +- retrieval audit before and after recovery + +Baseline capture: + +```bash +python scripts/atocore_client.py health +python scripts/atocore_client.py stats +python scripts/atocore_client.py projects +``` diff --git a/scripts/atocore_client.py b/scripts/atocore_client.py new file mode 100644 index 0000000..ccfcff9 --- /dev/null +++ b/scripts/atocore_client.py @@ -0,0 +1,313 @@ +"""Operator-facing API client for live AtoCore instances. + +This script is intentionally external to the app runtime. It is for admins and +operators who want a convenient way to inspect live project state, refresh +projects, audit retrieval quality, and manage trusted project-state entries. +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + + +BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://dalidou:8100").rstrip("/") +TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "30")) +REFRESH_TIMEOUT = int(os.environ.get("ATOCORE_REFRESH_TIMEOUT_SECONDS", "1800")) +FAIL_OPEN = os.environ.get("ATOCORE_FAIL_OPEN", "true").lower() == "true" + + +def print_json(payload: Any) -> None: + print(json.dumps(payload, ensure_ascii=True, indent=2)) + + +def fail_open_payload() -> dict[str, Any]: + return {"status": "unavailable", "source": "atocore", "fail_open": True} + + +def request( + method: str, + path: str, + data: dict[str, Any] | None = None, + timeout: int | None = None, +) -> Any: + url = f"{BASE_URL}{path}" + headers = {"Content-Type": "application/json"} if data is not None else {} + payload = json.dumps(data).encode("utf-8") if data is not None else None + req = urllib.request.Request(url, data=payload, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=timeout or TIMEOUT) as response: + body = response.read().decode("utf-8") + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8") + if body: + print(body) + raise SystemExit(22) from exc + except (urllib.error.URLError, TimeoutError, OSError): + if FAIL_OPEN: + print_json(fail_open_payload()) + raise SystemExit(0) + raise + + if not body.strip(): + return {} + return json.loads(body) + + +def parse_aliases(aliases_csv: str) -> list[str]: + return [alias.strip() for alias in aliases_csv.split(",") if alias.strip()] + + +def detect_project(prompt: str) -> dict[str, Any]: + payload = request("GET", "/projects") + prompt_lower = prompt.lower() + best_project = None + best_alias = None + best_score = -1 + + for project in payload.get("projects", []): + candidates = [project.get("id", ""), *project.get("aliases", [])] + for candidate in candidates: + candidate = (candidate or "").strip() + if not candidate: + continue + pattern = rf"(? best_score: + best_project = project.get("id") + best_alias = candidate + best_score = score + + return {"matched_project": best_project, "matched_alias": best_alias} + + +def classify_result(result: dict[str, Any]) -> dict[str, Any]: + source_file = (result.get("source_file") or "").lower() + heading = (result.get("heading_path") or "").lower() + title = (result.get("title") or "").lower() + text = " ".join([source_file, heading, title]) + + labels: list[str] = [] + if any(token in text for token in ["_archive", "/archive", "archive/", "pre-cleanup", "pre-migration", "history"]): + labels.append("archive_or_history") + if any(token in text for token in ["status", "dashboard", "current-state", "current state", "next-steps", "next steps"]): + labels.append("current_status") + if any(token in text for token in ["decision", "adr", "tradeoff", "selected architecture", "selection"]): + labels.append("decision") + if any(token in text for token in ["requirement", "spec", "constraints", "baseline", "cdr", "sow"]): + labels.append("requirements") + if any(token in text for token in ["roadmap", "milestone", "plan", "workflow", "calibration", "contract"]): + labels.append("execution_plan") + if not labels: + labels.append("reference") + + return { + "score": result.get("score"), + "title": result.get("title"), + "heading_path": result.get("heading_path"), + "source_file": result.get("source_file"), + "labels": labels, + "is_noise_risk": "archive_or_history" in labels, + } + + +def audit_query(prompt: str, top_k: int, project: str | None) -> dict[str, Any]: + response = request( + "POST", + "/query", + {"prompt": prompt, "top_k": top_k, "project": project or None}, + ) + classifications = [classify_result(result) for result in response.get("results", [])] + broad_prompt = len(prompt.split()) <= 2 + noise_hits = sum(1 for item in classifications if item["is_noise_risk"]) + current_hits = sum(1 for item in classifications if "current_status" in item["labels"]) + decision_hits = sum(1 for item in classifications if "decision" in item["labels"]) + requirements_hits = sum(1 for item in classifications if "requirements" in item["labels"]) + + recommendations: list[str] = [] + if broad_prompt: + recommendations.append("Prompt is broad; prefer a project-specific question with intent, artifact type, or constraint language.") + if noise_hits: + recommendations.append("Archive/history noise is present; prefer current-status, decision, requirements, and baseline docs in the next ingestion/ranking pass.") + if current_hits == 0: + recommendations.append("No current-status docs surfaced in the top results; Wave 2 should ingest or strengthen trusted operational truth.") + if decision_hits == 0: + recommendations.append("No decision docs surfaced in the top results; add or freeze decision logs for the active project.") + if requirements_hits == 0: + recommendations.append("No requirements/baseline docs surfaced in the top results; prioritize baseline and architecture-freeze material.") + if not recommendations: + recommendations.append("Ranking looks healthy for this prompt.") + + return { + "prompt": prompt, + "project": project, + "top_k": top_k, + "broad_prompt": broad_prompt, + "noise_hits": noise_hits, + "current_status_hits": current_hits, + "decision_hits": decision_hits, + "requirements_hits": requirements_hits, + "results": classifications, + "recommendations": recommendations, + } + + +def project_payload( + project_id: str, + aliases_csv: str, + source: str, + subpath: str, + description: str, + label: str, +) -> dict[str, Any]: + return { + "project_id": project_id, + "aliases": parse_aliases(aliases_csv), + "description": description, + "ingest_roots": [{"source": source, "subpath": subpath, "label": label}], + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="AtoCore live API client") + sub = parser.add_subparsers(dest="command", required=True) + + for name in ["health", "sources", "stats", "projects", "project-template", "debug-context", "ingest-sources"]: + sub.add_parser(name) + + p = sub.add_parser("detect-project") + p.add_argument("prompt") + + p = sub.add_parser("auto-context") + p.add_argument("prompt") + p.add_argument("budget", nargs="?", type=int, default=3000) + p.add_argument("project", nargs="?", default="") + + for name in ["propose-project", "register-project"]: + p = sub.add_parser(name) + p.add_argument("project_id") + p.add_argument("aliases_csv") + p.add_argument("source") + p.add_argument("subpath") + p.add_argument("description", nargs="?", default="") + p.add_argument("label", nargs="?", default="") + + p = sub.add_parser("update-project") + p.add_argument("project") + p.add_argument("description") + p.add_argument("aliases_csv", nargs="?", default="") + + p = sub.add_parser("refresh-project") + p.add_argument("project") + p.add_argument("purge_deleted", nargs="?", default="false") + + p = sub.add_parser("project-state") + p.add_argument("project") + p.add_argument("category", nargs="?", default="") + + p = sub.add_parser("project-state-set") + p.add_argument("project") + p.add_argument("category") + p.add_argument("key") + p.add_argument("value") + p.add_argument("source", nargs="?", default="") + p.add_argument("confidence", nargs="?", type=float, default=1.0) + + p = sub.add_parser("project-state-invalidate") + p.add_argument("project") + p.add_argument("category") + p.add_argument("key") + + p = sub.add_parser("query") + p.add_argument("prompt") + p.add_argument("top_k", nargs="?", type=int, default=5) + p.add_argument("project", nargs="?", default="") + + p = sub.add_parser("context-build") + p.add_argument("prompt") + p.add_argument("project", nargs="?", default="") + p.add_argument("budget", nargs="?", type=int, default=3000) + + p = sub.add_parser("audit-query") + p.add_argument("prompt") + p.add_argument("top_k", nargs="?", type=int, default=5) + p.add_argument("project", nargs="?", default="") + + return parser + + +def main() -> int: + args = build_parser().parse_args() + cmd = args.command + + if cmd == "health": + print_json(request("GET", "/health")) + elif cmd == "sources": + print_json(request("GET", "/sources")) + elif cmd == "stats": + print_json(request("GET", "/stats")) + elif cmd == "projects": + print_json(request("GET", "/projects")) + elif cmd == "project-template": + print_json(request("GET", "/projects/template")) + elif cmd == "debug-context": + print_json(request("GET", "/debug/context")) + elif cmd == "ingest-sources": + print_json(request("POST", "/ingest/sources", {})) + elif cmd == "detect-project": + print_json(detect_project(args.prompt)) + elif cmd == "auto-context": + project = args.project or detect_project(args.prompt).get("matched_project") or "" + if not project: + print_json({"status": "no_project_match", "source": "atocore", "mode": "auto-context"}) + else: + print_json(request("POST", "/context/build", {"prompt": args.prompt, "project": project, "budget": args.budget})) + elif cmd in {"propose-project", "register-project"}: + path = "/projects/proposal" if cmd == "propose-project" else "/projects/register" + print_json(request("POST", path, project_payload(args.project_id, args.aliases_csv, args.source, args.subpath, args.description, args.label))) + elif cmd == "update-project": + payload: dict[str, Any] = {"description": args.description} + if args.aliases_csv.strip(): + payload["aliases"] = parse_aliases(args.aliases_csv) + print_json(request("PUT", f"/projects/{urllib.parse.quote(args.project)}", payload)) + elif cmd == "refresh-project": + purge_deleted = args.purge_deleted.lower() in {"1", "true", "yes", "y"} + path = f"/projects/{urllib.parse.quote(args.project)}/refresh?purge_deleted={str(purge_deleted).lower()}" + print_json(request("POST", path, {}, timeout=REFRESH_TIMEOUT)) + elif cmd == "project-state": + suffix = f"?category={urllib.parse.quote(args.category)}" if args.category else "" + print_json(request("GET", f"/project/state/{urllib.parse.quote(args.project)}{suffix}")) + elif cmd == "project-state-set": + print_json(request("POST", "/project/state", { + "project": args.project, + "category": args.category, + "key": args.key, + "value": args.value, + "source": args.source, + "confidence": args.confidence, + })) + elif cmd == "project-state-invalidate": + print_json(request("DELETE", "/project/state", {"project": args.project, "category": args.category, "key": args.key})) + elif cmd == "query": + print_json(request("POST", "/query", {"prompt": args.prompt, "top_k": args.top_k, "project": args.project or None})) + elif cmd == "context-build": + print_json(request("POST", "/context/build", {"prompt": args.prompt, "project": args.project or None, "budget": args.budget})) + elif cmd == "audit-query": + print_json(audit_query(args.prompt, args.top_k, args.project or None)) + else: + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())