346 lines
12 KiB
Python
346 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from typing import Any
|
|
|
|
|
|
BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://dalidou:8100").rstrip("/")
|
|
TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "30"))
|
|
REFRESH_TIMEOUT = int(os.environ.get("ATOCORE_REFRESH_TIMEOUT_SECONDS", "1800"))
|
|
FAIL_OPEN = os.environ.get("ATOCORE_FAIL_OPEN", "true").lower() == "true"
|
|
|
|
|
|
USAGE = """Usage:
|
|
atocore.py health
|
|
atocore.py sources
|
|
atocore.py stats
|
|
atocore.py projects
|
|
atocore.py project-template
|
|
atocore.py detect-project <prompt>
|
|
atocore.py auto-context <prompt> [budget] [project]
|
|
atocore.py debug-context
|
|
atocore.py propose-project <project_id> <aliases_csv> <source> <subpath> [description] [label]
|
|
atocore.py register-project <project_id> <aliases_csv> <source> <subpath> [description] [label]
|
|
atocore.py update-project <project> <description> [aliases_csv]
|
|
atocore.py refresh-project <project> [purge_deleted]
|
|
atocore.py project-state <project> [category]
|
|
atocore.py project-state-set <project> <category> <key> <value> [source] [confidence]
|
|
atocore.py project-state-invalidate <project> <category> <key>
|
|
atocore.py query <prompt> [top_k] [project]
|
|
atocore.py context-build <prompt> [project] [budget]
|
|
atocore.py audit-query <prompt> [top_k] [project]
|
|
atocore.py ingest-sources
|
|
"""
|
|
|
|
|
|
def print_json(payload: Any) -> None:
|
|
print(json.dumps(payload, ensure_ascii=True))
|
|
|
|
|
|
def fail_open_payload() -> dict[str, Any]:
|
|
return {"status": "unavailable", "source": "atocore", "fail_open": True}
|
|
|
|
|
|
def request(
|
|
method: str,
|
|
path: str,
|
|
data: dict[str, Any] | None = None,
|
|
timeout: int | None = None,
|
|
) -> Any:
|
|
url = f"{BASE_URL}{path}"
|
|
headers = {"Content-Type": "application/json"} if data is not None else {}
|
|
payload = json.dumps(data).encode("utf-8") if data is not None else None
|
|
req = urllib.request.Request(url, data=payload, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout or TIMEOUT) as response:
|
|
body = response.read().decode("utf-8")
|
|
except urllib.error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8")
|
|
if body:
|
|
print(body)
|
|
raise SystemExit(22) from exc
|
|
except (urllib.error.URLError, TimeoutError, OSError):
|
|
if FAIL_OPEN:
|
|
print_json(fail_open_payload())
|
|
raise SystemExit(0)
|
|
raise
|
|
|
|
if not body.strip():
|
|
return {}
|
|
return json.loads(body)
|
|
|
|
|
|
def parse_aliases(aliases_csv: str) -> list[str]:
|
|
return [alias.strip() for alias in aliases_csv.split(",") if alias.strip()]
|
|
|
|
|
|
def project_payload(
|
|
project_id: str,
|
|
aliases_csv: str,
|
|
source: str,
|
|
subpath: str,
|
|
description: str,
|
|
label: str,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"project_id": project_id,
|
|
"aliases": parse_aliases(aliases_csv),
|
|
"description": description,
|
|
"ingest_roots": [{"source": source, "subpath": subpath, "label": label}],
|
|
}
|
|
|
|
|
|
def detect_project(prompt: str) -> dict[str, Any]:
|
|
payload = request("GET", "/projects")
|
|
prompt_lower = prompt.lower()
|
|
best_project = None
|
|
best_alias = None
|
|
best_score = -1
|
|
|
|
for project in payload.get("projects", []):
|
|
candidates = [project.get("id", ""), *project.get("aliases", [])]
|
|
for candidate in candidates:
|
|
candidate = (candidate or "").strip()
|
|
if not candidate:
|
|
continue
|
|
pattern = rf"(?<![a-z0-9]){re.escape(candidate.lower())}(?![a-z0-9])"
|
|
matched = re.search(pattern, prompt_lower) is not None
|
|
if not matched and candidate.lower() not in prompt_lower:
|
|
continue
|
|
score = len(candidate)
|
|
if score > best_score:
|
|
best_project = project.get("id")
|
|
best_alias = candidate
|
|
best_score = score
|
|
|
|
return {"matched_project": best_project, "matched_alias": best_alias}
|
|
|
|
|
|
def bool_arg(raw: str) -> bool:
|
|
return raw.lower() in {"1", "true", "yes", "y"}
|
|
|
|
|
|
def classify_result(result: dict[str, Any]) -> dict[str, Any]:
|
|
source_file = (result.get("source_file") or "").lower()
|
|
heading = (result.get("heading_path") or "").lower()
|
|
title = (result.get("title") or "").lower()
|
|
text = " ".join([source_file, heading, title])
|
|
|
|
labels: list[str] = []
|
|
if any(token in text for token in ["_archive", "/archive", "archive/", "pre-cleanup", "pre-migration", "history"]):
|
|
labels.append("archive_or_history")
|
|
if any(token in text for token in ["status", "dashboard", "current-state", "current state", "next-steps", "next steps"]):
|
|
labels.append("current_status")
|
|
if any(token in text for token in ["decision", "adr", "tradeoff", "selected architecture", "selection"]):
|
|
labels.append("decision")
|
|
if any(token in text for token in ["requirement", "spec", "constraints", "baseline", "cdr", "sow"]):
|
|
labels.append("requirements")
|
|
if any(token in text for token in ["roadmap", "milestone", "plan", "workflow", "calibration", "contract"]):
|
|
labels.append("execution_plan")
|
|
if not labels:
|
|
labels.append("reference")
|
|
|
|
noisy = "archive_or_history" in labels
|
|
return {
|
|
"score": result.get("score"),
|
|
"title": result.get("title"),
|
|
"heading_path": result.get("heading_path"),
|
|
"source_file": result.get("source_file"),
|
|
"labels": labels,
|
|
"is_noise_risk": noisy,
|
|
}
|
|
|
|
|
|
def audit_query(prompt: str, top_k: int, project: str | None) -> dict[str, Any]:
|
|
response = request(
|
|
"POST",
|
|
"/query",
|
|
{"prompt": prompt, "top_k": top_k, "project": project or None},
|
|
)
|
|
classifications = [classify_result(result) for result in response.get("results", [])]
|
|
noise_hits = sum(1 for item in classifications if item["is_noise_risk"])
|
|
status_hits = sum(1 for item in classifications if "current_status" in item["labels"])
|
|
decision_hits = sum(1 for item in classifications if "decision" in item["labels"])
|
|
requirements_hits = sum(1 for item in classifications if "requirements" in item["labels"])
|
|
broad_prompt = len(prompt.split()) <= 2
|
|
|
|
recommendations: list[str] = []
|
|
if broad_prompt:
|
|
recommendations.append("Prompt is broad; prefer a project-specific question with intent, artifact type, or constraint language.")
|
|
if noise_hits:
|
|
recommendations.append("Archive/history noise is present; prefer current-status, decision, requirements, and baseline docs in the next ingestion/ranking pass.")
|
|
if status_hits == 0:
|
|
recommendations.append("No current-status docs surfaced in the top results; Wave 2 should ingest or strengthen trusted operational truth.")
|
|
if decision_hits == 0:
|
|
recommendations.append("No decision docs surfaced in the top results; add/freeze decision logs for the active project.")
|
|
if requirements_hits == 0:
|
|
recommendations.append("No requirements/baseline docs surfaced in the top results; prioritize baseline and architecture freeze material.")
|
|
if not recommendations:
|
|
recommendations.append("Ranking looks healthy for this prompt.")
|
|
|
|
return {
|
|
"prompt": prompt,
|
|
"project": project,
|
|
"top_k": top_k,
|
|
"broad_prompt": broad_prompt,
|
|
"noise_hits": noise_hits,
|
|
"current_status_hits": status_hits,
|
|
"decision_hits": decision_hits,
|
|
"requirements_hits": requirements_hits,
|
|
"results": classifications,
|
|
"recommendations": recommendations,
|
|
}
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
if len(argv) < 2:
|
|
print(USAGE, end="")
|
|
return 1
|
|
|
|
cmd = argv[1]
|
|
args = argv[2:]
|
|
|
|
if cmd == "health":
|
|
print_json(request("GET", "/health"))
|
|
return 0
|
|
if cmd == "sources":
|
|
print_json(request("GET", "/sources"))
|
|
return 0
|
|
if cmd == "stats":
|
|
print_json(request("GET", "/stats"))
|
|
return 0
|
|
if cmd == "projects":
|
|
print_json(request("GET", "/projects"))
|
|
return 0
|
|
if cmd == "project-template":
|
|
print_json(request("GET", "/projects/template"))
|
|
return 0
|
|
if cmd == "detect-project":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
print_json(detect_project(args[0]))
|
|
return 0
|
|
if cmd == "auto-context":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
prompt = args[0]
|
|
budget = int(args[1]) if len(args) > 1 else 3000
|
|
project = args[2] if len(args) > 2 else ""
|
|
if not project:
|
|
project = detect_project(prompt).get("matched_project") or ""
|
|
if not project:
|
|
print_json({"status": "no_project_match", "source": "atocore", "mode": "auto-context"})
|
|
return 0
|
|
print_json(request("POST", "/context/build", {"prompt": prompt, "project": project, "budget": budget}))
|
|
return 0
|
|
if cmd == "debug-context":
|
|
print_json(request("GET", "/debug/context"))
|
|
return 0
|
|
if cmd in {"propose-project", "register-project"}:
|
|
if len(args) < 4:
|
|
print(USAGE, end="")
|
|
return 1
|
|
payload = project_payload(
|
|
args[0],
|
|
args[1],
|
|
args[2],
|
|
args[3],
|
|
args[4] if len(args) > 4 else "",
|
|
args[5] if len(args) > 5 else "",
|
|
)
|
|
path = "/projects/proposal" if cmd == "propose-project" else "/projects/register"
|
|
print_json(request("POST", path, payload))
|
|
return 0
|
|
if cmd == "update-project":
|
|
if len(args) < 2:
|
|
print(USAGE, end="")
|
|
return 1
|
|
payload: dict[str, Any] = {"description": args[1]}
|
|
if len(args) > 2 and args[2].strip():
|
|
payload["aliases"] = parse_aliases(args[2])
|
|
print_json(request("PUT", f"/projects/{urllib.parse.quote(args[0])}", payload))
|
|
return 0
|
|
if cmd == "refresh-project":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
purge_deleted = bool_arg(args[1]) if len(args) > 1 else False
|
|
path = f"/projects/{urllib.parse.quote(args[0])}/refresh?purge_deleted={str(purge_deleted).lower()}"
|
|
print_json(request("POST", path, {}, timeout=REFRESH_TIMEOUT))
|
|
return 0
|
|
if cmd == "project-state":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
project = urllib.parse.quote(args[0])
|
|
suffix = f"?category={urllib.parse.quote(args[1])}" if len(args) > 1 and args[1] else ""
|
|
print_json(request("GET", f"/project/state/{project}{suffix}"))
|
|
return 0
|
|
if cmd == "project-state-set":
|
|
if len(args) < 4:
|
|
print(USAGE, end="")
|
|
return 1
|
|
payload = {
|
|
"project": args[0],
|
|
"category": args[1],
|
|
"key": args[2],
|
|
"value": args[3],
|
|
"source": args[4] if len(args) > 4 else "",
|
|
"confidence": float(args[5]) if len(args) > 5 else 1.0,
|
|
}
|
|
print_json(request("POST", "/project/state", payload))
|
|
return 0
|
|
if cmd == "project-state-invalidate":
|
|
if len(args) < 3:
|
|
print(USAGE, end="")
|
|
return 1
|
|
payload = {"project": args[0], "category": args[1], "key": args[2]}
|
|
print_json(request("DELETE", "/project/state", payload))
|
|
return 0
|
|
if cmd == "query":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
prompt = args[0]
|
|
top_k = int(args[1]) if len(args) > 1 else 5
|
|
project = args[2] if len(args) > 2 else ""
|
|
print_json(request("POST", "/query", {"prompt": prompt, "top_k": top_k, "project": project or None}))
|
|
return 0
|
|
if cmd == "context-build":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
prompt = args[0]
|
|
project = args[1] if len(args) > 1 else ""
|
|
budget = int(args[2]) if len(args) > 2 else 3000
|
|
print_json(request("POST", "/context/build", {"prompt": prompt, "project": project or None, "budget": budget}))
|
|
return 0
|
|
if cmd == "audit-query":
|
|
if not args:
|
|
print(USAGE, end="")
|
|
return 1
|
|
prompt = args[0]
|
|
top_k = int(args[1]) if len(args) > 1 else 5
|
|
project = args[2] if len(args) > 2 else ""
|
|
print_json(audit_query(prompt, top_k, project or None))
|
|
return 0
|
|
if cmd == "ingest-sources":
|
|
print_json(request("POST", "/ingest/sources", {}))
|
|
return 0
|
|
|
|
print(USAGE, end="")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv))
|