#!/usr/bin/env python3 from __future__ import annotations import json import os import re import sys import urllib.error import urllib.parse import urllib.request from typing import Any BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://dalidou:8100").rstrip("/") TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "30")) REFRESH_TIMEOUT = int(os.environ.get("ATOCORE_REFRESH_TIMEOUT_SECONDS", "1800")) FAIL_OPEN = os.environ.get("ATOCORE_FAIL_OPEN", "true").lower() == "true" USAGE = """Usage: atocore.py health atocore.py sources atocore.py stats atocore.py projects atocore.py project-template atocore.py detect-project atocore.py auto-context [budget] [project] atocore.py debug-context atocore.py propose-project [description] [label] atocore.py register-project [description] [label] atocore.py update-project [aliases_csv] atocore.py refresh-project [purge_deleted] atocore.py project-state [category] atocore.py project-state-set [source] [confidence] atocore.py project-state-invalidate atocore.py query [top_k] [project] atocore.py context-build [project] [budget] atocore.py audit-query [top_k] [project] atocore.py ingest-sources """ def print_json(payload: Any) -> None: print(json.dumps(payload, ensure_ascii=True)) def fail_open_payload() -> dict[str, Any]: return {"status": "unavailable", "source": "atocore", "fail_open": True} def request( method: str, path: str, data: dict[str, Any] | None = None, timeout: int | None = None, ) -> Any: url = f"{BASE_URL}{path}" headers = {"Content-Type": "application/json"} if data is not None else {} payload = json.dumps(data).encode("utf-8") if data is not None else None req = urllib.request.Request(url, data=payload, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=timeout or TIMEOUT) as response: body = response.read().decode("utf-8") except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8") if body: print(body) raise SystemExit(22) from exc except (urllib.error.URLError, TimeoutError, OSError): if FAIL_OPEN: print_json(fail_open_payload()) raise SystemExit(0) raise if not body.strip(): return {} return json.loads(body) def parse_aliases(aliases_csv: str) -> list[str]: return [alias.strip() for alias in aliases_csv.split(",") if alias.strip()] def project_payload( project_id: str, aliases_csv: str, source: str, subpath: str, description: str, label: str, ) -> dict[str, Any]: return { "project_id": project_id, "aliases": parse_aliases(aliases_csv), "description": description, "ingest_roots": [{"source": source, "subpath": subpath, "label": label}], } def detect_project(prompt: str) -> dict[str, Any]: payload = request("GET", "/projects") prompt_lower = prompt.lower() best_project = None best_alias = None best_score = -1 for project in payload.get("projects", []): candidates = [project.get("id", ""), *project.get("aliases", [])] for candidate in candidates: candidate = (candidate or "").strip() if not candidate: continue pattern = rf"(? best_score: best_project = project.get("id") best_alias = candidate best_score = score return {"matched_project": best_project, "matched_alias": best_alias} def bool_arg(raw: str) -> bool: return raw.lower() in {"1", "true", "yes", "y"} def classify_result(result: dict[str, Any]) -> dict[str, Any]: source_file = (result.get("source_file") or "").lower() heading = (result.get("heading_path") or "").lower() title = (result.get("title") or "").lower() text = " ".join([source_file, heading, title]) labels: list[str] = [] if any(token in text for token in ["_archive", "/archive", "archive/", "pre-cleanup", "pre-migration", "history"]): labels.append("archive_or_history") if any(token in text for token in ["status", "dashboard", "current-state", "current state", "next-steps", "next steps"]): labels.append("current_status") if any(token in text for token in ["decision", "adr", "tradeoff", "selected architecture", "selection"]): labels.append("decision") if any(token in text for token in ["requirement", "spec", "constraints", "baseline", "cdr", "sow"]): labels.append("requirements") if any(token in text for token in ["roadmap", "milestone", "plan", "workflow", "calibration", "contract"]): labels.append("execution_plan") if not labels: labels.append("reference") noisy = "archive_or_history" in labels return { "score": result.get("score"), "title": result.get("title"), "heading_path": result.get("heading_path"), "source_file": result.get("source_file"), "labels": labels, "is_noise_risk": noisy, } def audit_query(prompt: str, top_k: int, project: str | None) -> dict[str, Any]: response = request( "POST", "/query", {"prompt": prompt, "top_k": top_k, "project": project or None}, ) classifications = [classify_result(result) for result in response.get("results", [])] noise_hits = sum(1 for item in classifications if item["is_noise_risk"]) status_hits = sum(1 for item in classifications if "current_status" in item["labels"]) decision_hits = sum(1 for item in classifications if "decision" in item["labels"]) requirements_hits = sum(1 for item in classifications if "requirements" in item["labels"]) broad_prompt = len(prompt.split()) <= 2 recommendations: list[str] = [] if broad_prompt: recommendations.append("Prompt is broad; prefer a project-specific question with intent, artifact type, or constraint language.") if noise_hits: recommendations.append("Archive/history noise is present; prefer current-status, decision, requirements, and baseline docs in the next ingestion/ranking pass.") if status_hits == 0: recommendations.append("No current-status docs surfaced in the top results; Wave 2 should ingest or strengthen trusted operational truth.") if decision_hits == 0: recommendations.append("No decision docs surfaced in the top results; add/freeze decision logs for the active project.") if requirements_hits == 0: recommendations.append("No requirements/baseline docs surfaced in the top results; prioritize baseline and architecture freeze material.") if not recommendations: recommendations.append("Ranking looks healthy for this prompt.") return { "prompt": prompt, "project": project, "top_k": top_k, "broad_prompt": broad_prompt, "noise_hits": noise_hits, "current_status_hits": status_hits, "decision_hits": decision_hits, "requirements_hits": requirements_hits, "results": classifications, "recommendations": recommendations, } def main(argv: list[str]) -> int: if len(argv) < 2: print(USAGE, end="") return 1 cmd = argv[1] args = argv[2:] if cmd == "health": print_json(request("GET", "/health")) return 0 if cmd == "sources": print_json(request("GET", "/sources")) return 0 if cmd == "stats": print_json(request("GET", "/stats")) return 0 if cmd == "projects": print_json(request("GET", "/projects")) return 0 if cmd == "project-template": print_json(request("GET", "/projects/template")) return 0 if cmd == "detect-project": if not args: print(USAGE, end="") return 1 print_json(detect_project(args[0])) return 0 if cmd == "auto-context": if not args: print(USAGE, end="") return 1 prompt = args[0] budget = int(args[1]) if len(args) > 1 else 3000 project = args[2] if len(args) > 2 else "" if not project: project = detect_project(prompt).get("matched_project") or "" if not project: print_json({"status": "no_project_match", "source": "atocore", "mode": "auto-context"}) return 0 print_json(request("POST", "/context/build", {"prompt": prompt, "project": project, "budget": budget})) return 0 if cmd == "debug-context": print_json(request("GET", "/debug/context")) return 0 if cmd in {"propose-project", "register-project"}: if len(args) < 4: print(USAGE, end="") return 1 payload = project_payload( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else "", args[5] if len(args) > 5 else "", ) path = "/projects/proposal" if cmd == "propose-project" else "/projects/register" print_json(request("POST", path, payload)) return 0 if cmd == "update-project": if len(args) < 2: print(USAGE, end="") return 1 payload: dict[str, Any] = {"description": args[1]} if len(args) > 2 and args[2].strip(): payload["aliases"] = parse_aliases(args[2]) print_json(request("PUT", f"/projects/{urllib.parse.quote(args[0])}", payload)) return 0 if cmd == "refresh-project": if not args: print(USAGE, end="") return 1 purge_deleted = bool_arg(args[1]) if len(args) > 1 else False path = f"/projects/{urllib.parse.quote(args[0])}/refresh?purge_deleted={str(purge_deleted).lower()}" print_json(request("POST", path, {}, timeout=REFRESH_TIMEOUT)) return 0 if cmd == "project-state": if not args: print(USAGE, end="") return 1 project = urllib.parse.quote(args[0]) suffix = f"?category={urllib.parse.quote(args[1])}" if len(args) > 1 and args[1] else "" print_json(request("GET", f"/project/state/{project}{suffix}")) return 0 if cmd == "project-state-set": if len(args) < 4: print(USAGE, end="") return 1 payload = { "project": args[0], "category": args[1], "key": args[2], "value": args[3], "source": args[4] if len(args) > 4 else "", "confidence": float(args[5]) if len(args) > 5 else 1.0, } print_json(request("POST", "/project/state", payload)) return 0 if cmd == "project-state-invalidate": if len(args) < 3: print(USAGE, end="") return 1 payload = {"project": args[0], "category": args[1], "key": args[2]} print_json(request("DELETE", "/project/state", payload)) return 0 if cmd == "query": if not args: print(USAGE, end="") return 1 prompt = args[0] top_k = int(args[1]) if len(args) > 1 else 5 project = args[2] if len(args) > 2 else "" print_json(request("POST", "/query", {"prompt": prompt, "top_k": top_k, "project": project or None})) return 0 if cmd == "context-build": if not args: print(USAGE, end="") return 1 prompt = args[0] project = args[1] if len(args) > 1 else "" budget = int(args[2]) if len(args) > 2 else 3000 print_json(request("POST", "/context/build", {"prompt": prompt, "project": project or None, "budget": budget})) return 0 if cmd == "audit-query": if not args: print(USAGE, end="") return 1 prompt = args[0] top_k = int(args[1]) if len(args) > 1 else 5 project = args[2] if len(args) > 2 else "" print_json(audit_query(prompt, top_k, project or None)) return 0 if cmd == "ingest-sources": print_json(request("POST", "/ingest/sources", {})) return 0 print(USAGE, end="") return 1 if __name__ == "__main__": raise SystemExit(main(sys.argv))