Add operator client and operations playbook
This commit is contained in:
@@ -24,6 +24,10 @@ curl -X POST http://localhost:8100/context/build \
|
||||
|
||||
# CLI ingestion
|
||||
python scripts/ingest_folder.py --path /path/to/notes
|
||||
|
||||
# Live operator client
|
||||
python scripts/atocore_client.py health
|
||||
python scripts/atocore_client.py audit-query "gigabit" 5
|
||||
```
|
||||
|
||||
## API Endpoints
|
||||
@@ -65,3 +69,8 @@ Set via environment variables (prefix `ATOCORE_`):
|
||||
pip install -e ".[dev]"
|
||||
pytest
|
||||
```
|
||||
|
||||
## Operations
|
||||
|
||||
- `scripts/atocore_client.py` provides a live API client for project refresh, project-state inspection, and retrieval-quality audits.
|
||||
- `docs/operations.md` captures the current operational priority order: retrieval quality, Wave 2 trusted-operational ingestion, AtoDrive scoping, and restore validation.
|
||||
|
||||
96
docs/operations.md
Normal file
96
docs/operations.md
Normal file
@@ -0,0 +1,96 @@
|
||||
# AtoCore Operations
|
||||
|
||||
Current operating order for improving AtoCore:
|
||||
|
||||
1. Retrieval-quality pass
|
||||
2. Wave 2 trusted-operational ingestion
|
||||
3. AtoDrive clarification
|
||||
4. Restore and ops validation
|
||||
|
||||
## Retrieval-Quality Pass
|
||||
|
||||
Current live behavior:
|
||||
|
||||
- broad prompts like `gigabit` and `polisher` can surface archive/history noise
|
||||
- meaningful project prompts perform much better
|
||||
- ranking quality now matters more than raw corpus growth
|
||||
|
||||
Use the operator client to audit retrieval:
|
||||
|
||||
```bash
|
||||
python scripts/atocore_client.py audit-query "gigabit" 5
|
||||
python scripts/atocore_client.py audit-query "polisher" 5
|
||||
python scripts/atocore_client.py audit-query "mirror frame stiffness requirements and selected architecture" 5 p04-gigabit
|
||||
python scripts/atocore_client.py audit-query "interferometer error budget and vendor selection constraints" 5 p05-interferometer
|
||||
python scripts/atocore_client.py audit-query "polisher system map shared contracts and calibration workflow" 5 p06-polisher
|
||||
```
|
||||
|
||||
What to improve:
|
||||
|
||||
- reduce `_archive`, `pre-cleanup`, `pre-migration`, and `History` prominence
|
||||
- prefer current-status, decision, requirement, architecture-freeze, and milestone docs
|
||||
- prefer trusted project-state when it expresses current truth
|
||||
- avoid letting broad single-word prompts drift into stale chunks
|
||||
|
||||
## Wave 2 Trusted-Operational Ingestion
|
||||
|
||||
Do not ingest the whole PKM vault next.
|
||||
|
||||
Prioritize, for each active project:
|
||||
|
||||
- current status
|
||||
- current decisions
|
||||
- requirements baseline
|
||||
- architecture freeze / current baseline
|
||||
- milestone plan
|
||||
- next actions
|
||||
|
||||
Useful commands:
|
||||
|
||||
```bash
|
||||
python scripts/atocore_client.py project-state p04-gigabit
|
||||
python scripts/atocore_client.py project-state p05-interferometer
|
||||
python scripts/atocore_client.py project-state p06-polisher
|
||||
python scripts/atocore_client.py refresh-project p04-gigabit
|
||||
python scripts/atocore_client.py refresh-project p05-interferometer
|
||||
python scripts/atocore_client.py refresh-project p06-polisher
|
||||
```
|
||||
|
||||
## AtoDrive Clarification
|
||||
|
||||
Treat AtoDrive as a curated trusted-operational source, not a generic dump.
|
||||
|
||||
Good candidates:
|
||||
|
||||
- current dashboards
|
||||
- approved baselines
|
||||
- architecture freezes
|
||||
- decision logs
|
||||
- milestone and next-step views
|
||||
|
||||
Avoid by default:
|
||||
|
||||
- duplicated exports
|
||||
- stale snapshots
|
||||
- generic archives
|
||||
- exploratory notes that are not designated current truth
|
||||
|
||||
## Restore and Ops Validation
|
||||
|
||||
Backups are not enough until restore has been tested.
|
||||
|
||||
Validate:
|
||||
|
||||
- SQLite metadata restore
|
||||
- Chroma restore or rebuild
|
||||
- project registry restore
|
||||
- project refresh after recovery
|
||||
- retrieval audit before and after recovery
|
||||
|
||||
Baseline capture:
|
||||
|
||||
```bash
|
||||
python scripts/atocore_client.py health
|
||||
python scripts/atocore_client.py stats
|
||||
python scripts/atocore_client.py projects
|
||||
```
|
||||
313
scripts/atocore_client.py
Normal file
313
scripts/atocore_client.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""Operator-facing API client for live AtoCore instances.
|
||||
|
||||
This script is intentionally external to the app runtime. It is for admins and
|
||||
operators who want a convenient way to inspect live project state, refresh
|
||||
projects, audit retrieval quality, and manage trusted project-state entries.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from typing import Any
|
||||
|
||||
|
||||
BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://dalidou:8100").rstrip("/")
|
||||
TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "30"))
|
||||
REFRESH_TIMEOUT = int(os.environ.get("ATOCORE_REFRESH_TIMEOUT_SECONDS", "1800"))
|
||||
FAIL_OPEN = os.environ.get("ATOCORE_FAIL_OPEN", "true").lower() == "true"
|
||||
|
||||
|
||||
def print_json(payload: Any) -> None:
|
||||
print(json.dumps(payload, ensure_ascii=True, indent=2))
|
||||
|
||||
|
||||
def fail_open_payload() -> dict[str, Any]:
|
||||
return {"status": "unavailable", "source": "atocore", "fail_open": True}
|
||||
|
||||
|
||||
def request(
|
||||
method: str,
|
||||
path: str,
|
||||
data: dict[str, Any] | None = None,
|
||||
timeout: int | None = None,
|
||||
) -> Any:
|
||||
url = f"{BASE_URL}{path}"
|
||||
headers = {"Content-Type": "application/json"} if data is not None else {}
|
||||
payload = json.dumps(data).encode("utf-8") if data is not None else None
|
||||
req = urllib.request.Request(url, data=payload, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout or TIMEOUT) as response:
|
||||
body = response.read().decode("utf-8")
|
||||
except urllib.error.HTTPError as exc:
|
||||
body = exc.read().decode("utf-8")
|
||||
if body:
|
||||
print(body)
|
||||
raise SystemExit(22) from exc
|
||||
except (urllib.error.URLError, TimeoutError, OSError):
|
||||
if FAIL_OPEN:
|
||||
print_json(fail_open_payload())
|
||||
raise SystemExit(0)
|
||||
raise
|
||||
|
||||
if not body.strip():
|
||||
return {}
|
||||
return json.loads(body)
|
||||
|
||||
|
||||
def parse_aliases(aliases_csv: str) -> list[str]:
|
||||
return [alias.strip() for alias in aliases_csv.split(",") if alias.strip()]
|
||||
|
||||
|
||||
def detect_project(prompt: str) -> dict[str, Any]:
|
||||
payload = request("GET", "/projects")
|
||||
prompt_lower = prompt.lower()
|
||||
best_project = None
|
||||
best_alias = None
|
||||
best_score = -1
|
||||
|
||||
for project in payload.get("projects", []):
|
||||
candidates = [project.get("id", ""), *project.get("aliases", [])]
|
||||
for candidate in candidates:
|
||||
candidate = (candidate or "").strip()
|
||||
if not candidate:
|
||||
continue
|
||||
pattern = rf"(?<![a-z0-9]){re.escape(candidate.lower())}(?![a-z0-9])"
|
||||
matched = re.search(pattern, prompt_lower) is not None
|
||||
if not matched and candidate.lower() not in prompt_lower:
|
||||
continue
|
||||
score = len(candidate)
|
||||
if score > best_score:
|
||||
best_project = project.get("id")
|
||||
best_alias = candidate
|
||||
best_score = score
|
||||
|
||||
return {"matched_project": best_project, "matched_alias": best_alias}
|
||||
|
||||
|
||||
def classify_result(result: dict[str, Any]) -> dict[str, Any]:
|
||||
source_file = (result.get("source_file") or "").lower()
|
||||
heading = (result.get("heading_path") or "").lower()
|
||||
title = (result.get("title") or "").lower()
|
||||
text = " ".join([source_file, heading, title])
|
||||
|
||||
labels: list[str] = []
|
||||
if any(token in text for token in ["_archive", "/archive", "archive/", "pre-cleanup", "pre-migration", "history"]):
|
||||
labels.append("archive_or_history")
|
||||
if any(token in text for token in ["status", "dashboard", "current-state", "current state", "next-steps", "next steps"]):
|
||||
labels.append("current_status")
|
||||
if any(token in text for token in ["decision", "adr", "tradeoff", "selected architecture", "selection"]):
|
||||
labels.append("decision")
|
||||
if any(token in text for token in ["requirement", "spec", "constraints", "baseline", "cdr", "sow"]):
|
||||
labels.append("requirements")
|
||||
if any(token in text for token in ["roadmap", "milestone", "plan", "workflow", "calibration", "contract"]):
|
||||
labels.append("execution_plan")
|
||||
if not labels:
|
||||
labels.append("reference")
|
||||
|
||||
return {
|
||||
"score": result.get("score"),
|
||||
"title": result.get("title"),
|
||||
"heading_path": result.get("heading_path"),
|
||||
"source_file": result.get("source_file"),
|
||||
"labels": labels,
|
||||
"is_noise_risk": "archive_or_history" in labels,
|
||||
}
|
||||
|
||||
|
||||
def audit_query(prompt: str, top_k: int, project: str | None) -> dict[str, Any]:
|
||||
response = request(
|
||||
"POST",
|
||||
"/query",
|
||||
{"prompt": prompt, "top_k": top_k, "project": project or None},
|
||||
)
|
||||
classifications = [classify_result(result) for result in response.get("results", [])]
|
||||
broad_prompt = len(prompt.split()) <= 2
|
||||
noise_hits = sum(1 for item in classifications if item["is_noise_risk"])
|
||||
current_hits = sum(1 for item in classifications if "current_status" in item["labels"])
|
||||
decision_hits = sum(1 for item in classifications if "decision" in item["labels"])
|
||||
requirements_hits = sum(1 for item in classifications if "requirements" in item["labels"])
|
||||
|
||||
recommendations: list[str] = []
|
||||
if broad_prompt:
|
||||
recommendations.append("Prompt is broad; prefer a project-specific question with intent, artifact type, or constraint language.")
|
||||
if noise_hits:
|
||||
recommendations.append("Archive/history noise is present; prefer current-status, decision, requirements, and baseline docs in the next ingestion/ranking pass.")
|
||||
if current_hits == 0:
|
||||
recommendations.append("No current-status docs surfaced in the top results; Wave 2 should ingest or strengthen trusted operational truth.")
|
||||
if decision_hits == 0:
|
||||
recommendations.append("No decision docs surfaced in the top results; add or freeze decision logs for the active project.")
|
||||
if requirements_hits == 0:
|
||||
recommendations.append("No requirements/baseline docs surfaced in the top results; prioritize baseline and architecture-freeze material.")
|
||||
if not recommendations:
|
||||
recommendations.append("Ranking looks healthy for this prompt.")
|
||||
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"project": project,
|
||||
"top_k": top_k,
|
||||
"broad_prompt": broad_prompt,
|
||||
"noise_hits": noise_hits,
|
||||
"current_status_hits": current_hits,
|
||||
"decision_hits": decision_hits,
|
||||
"requirements_hits": requirements_hits,
|
||||
"results": classifications,
|
||||
"recommendations": recommendations,
|
||||
}
|
||||
|
||||
|
||||
def project_payload(
|
||||
project_id: str,
|
||||
aliases_csv: str,
|
||||
source: str,
|
||||
subpath: str,
|
||||
description: str,
|
||||
label: str,
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"project_id": project_id,
|
||||
"aliases": parse_aliases(aliases_csv),
|
||||
"description": description,
|
||||
"ingest_roots": [{"source": source, "subpath": subpath, "label": label}],
|
||||
}
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="AtoCore live API client")
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
for name in ["health", "sources", "stats", "projects", "project-template", "debug-context", "ingest-sources"]:
|
||||
sub.add_parser(name)
|
||||
|
||||
p = sub.add_parser("detect-project")
|
||||
p.add_argument("prompt")
|
||||
|
||||
p = sub.add_parser("auto-context")
|
||||
p.add_argument("prompt")
|
||||
p.add_argument("budget", nargs="?", type=int, default=3000)
|
||||
p.add_argument("project", nargs="?", default="")
|
||||
|
||||
for name in ["propose-project", "register-project"]:
|
||||
p = sub.add_parser(name)
|
||||
p.add_argument("project_id")
|
||||
p.add_argument("aliases_csv")
|
||||
p.add_argument("source")
|
||||
p.add_argument("subpath")
|
||||
p.add_argument("description", nargs="?", default="")
|
||||
p.add_argument("label", nargs="?", default="")
|
||||
|
||||
p = sub.add_parser("update-project")
|
||||
p.add_argument("project")
|
||||
p.add_argument("description")
|
||||
p.add_argument("aliases_csv", nargs="?", default="")
|
||||
|
||||
p = sub.add_parser("refresh-project")
|
||||
p.add_argument("project")
|
||||
p.add_argument("purge_deleted", nargs="?", default="false")
|
||||
|
||||
p = sub.add_parser("project-state")
|
||||
p.add_argument("project")
|
||||
p.add_argument("category", nargs="?", default="")
|
||||
|
||||
p = sub.add_parser("project-state-set")
|
||||
p.add_argument("project")
|
||||
p.add_argument("category")
|
||||
p.add_argument("key")
|
||||
p.add_argument("value")
|
||||
p.add_argument("source", nargs="?", default="")
|
||||
p.add_argument("confidence", nargs="?", type=float, default=1.0)
|
||||
|
||||
p = sub.add_parser("project-state-invalidate")
|
||||
p.add_argument("project")
|
||||
p.add_argument("category")
|
||||
p.add_argument("key")
|
||||
|
||||
p = sub.add_parser("query")
|
||||
p.add_argument("prompt")
|
||||
p.add_argument("top_k", nargs="?", type=int, default=5)
|
||||
p.add_argument("project", nargs="?", default="")
|
||||
|
||||
p = sub.add_parser("context-build")
|
||||
p.add_argument("prompt")
|
||||
p.add_argument("project", nargs="?", default="")
|
||||
p.add_argument("budget", nargs="?", type=int, default=3000)
|
||||
|
||||
p = sub.add_parser("audit-query")
|
||||
p.add_argument("prompt")
|
||||
p.add_argument("top_k", nargs="?", type=int, default=5)
|
||||
p.add_argument("project", nargs="?", default="")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = build_parser().parse_args()
|
||||
cmd = args.command
|
||||
|
||||
if cmd == "health":
|
||||
print_json(request("GET", "/health"))
|
||||
elif cmd == "sources":
|
||||
print_json(request("GET", "/sources"))
|
||||
elif cmd == "stats":
|
||||
print_json(request("GET", "/stats"))
|
||||
elif cmd == "projects":
|
||||
print_json(request("GET", "/projects"))
|
||||
elif cmd == "project-template":
|
||||
print_json(request("GET", "/projects/template"))
|
||||
elif cmd == "debug-context":
|
||||
print_json(request("GET", "/debug/context"))
|
||||
elif cmd == "ingest-sources":
|
||||
print_json(request("POST", "/ingest/sources", {}))
|
||||
elif cmd == "detect-project":
|
||||
print_json(detect_project(args.prompt))
|
||||
elif cmd == "auto-context":
|
||||
project = args.project or detect_project(args.prompt).get("matched_project") or ""
|
||||
if not project:
|
||||
print_json({"status": "no_project_match", "source": "atocore", "mode": "auto-context"})
|
||||
else:
|
||||
print_json(request("POST", "/context/build", {"prompt": args.prompt, "project": project, "budget": args.budget}))
|
||||
elif cmd in {"propose-project", "register-project"}:
|
||||
path = "/projects/proposal" if cmd == "propose-project" else "/projects/register"
|
||||
print_json(request("POST", path, project_payload(args.project_id, args.aliases_csv, args.source, args.subpath, args.description, args.label)))
|
||||
elif cmd == "update-project":
|
||||
payload: dict[str, Any] = {"description": args.description}
|
||||
if args.aliases_csv.strip():
|
||||
payload["aliases"] = parse_aliases(args.aliases_csv)
|
||||
print_json(request("PUT", f"/projects/{urllib.parse.quote(args.project)}", payload))
|
||||
elif cmd == "refresh-project":
|
||||
purge_deleted = args.purge_deleted.lower() in {"1", "true", "yes", "y"}
|
||||
path = f"/projects/{urllib.parse.quote(args.project)}/refresh?purge_deleted={str(purge_deleted).lower()}"
|
||||
print_json(request("POST", path, {}, timeout=REFRESH_TIMEOUT))
|
||||
elif cmd == "project-state":
|
||||
suffix = f"?category={urllib.parse.quote(args.category)}" if args.category else ""
|
||||
print_json(request("GET", f"/project/state/{urllib.parse.quote(args.project)}{suffix}"))
|
||||
elif cmd == "project-state-set":
|
||||
print_json(request("POST", "/project/state", {
|
||||
"project": args.project,
|
||||
"category": args.category,
|
||||
"key": args.key,
|
||||
"value": args.value,
|
||||
"source": args.source,
|
||||
"confidence": args.confidence,
|
||||
}))
|
||||
elif cmd == "project-state-invalidate":
|
||||
print_json(request("DELETE", "/project/state", {"project": args.project, "category": args.category, "key": args.key}))
|
||||
elif cmd == "query":
|
||||
print_json(request("POST", "/query", {"prompt": args.prompt, "top_k": args.top_k, "project": args.project or None}))
|
||||
elif cmd == "context-build":
|
||||
print_json(request("POST", "/context/build", {"prompt": args.prompt, "project": args.project or None, "budget": args.budget}))
|
||||
elif cmd == "audit-query":
|
||||
print_json(audit_query(args.prompt, args.top_k, args.project or None))
|
||||
else:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user