"""Render a compact live-status report from a running AtoCore instance. This is intentionally read-only and stdlib-only so it can be used from a fresh checkout, a cron job, or a Codex/Claude session without installing the full app package. The output is meant to reduce docs drift: copy the report into status docs only after it was generated from the live service. """ from __future__ import annotations import argparse import errno import json import os import sys import urllib.error import urllib.request from typing import Any DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://dalidou:8100").rstrip("/") DEFAULT_TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "30")) DEFAULT_AUTH_TOKEN = os.environ.get("ATOCORE_AUTH_TOKEN", "").strip() def request_json(base_url: str, path: str, timeout: int, auth_token: str = "") -> dict[str, Any]: headers = {"Authorization": f"Bearer {auth_token}"} if auth_token else {} req = urllib.request.Request(f"{base_url}{path}", method="GET", headers=headers) with urllib.request.urlopen(req, timeout=timeout) as response: body = response.read().decode("utf-8") status = getattr(response, "status", None) payload = json.loads(body) if body.strip() else {} if not isinstance(payload, dict): payload = {"value": payload} if status is not None: payload["_http_status"] = status return payload def collect_status(base_url: str, timeout: int, auth_token: str = "") -> dict[str, Any]: payload: dict[str, Any] = {"base_url": base_url} for name, path in { "health": "/health", "stats": "/stats", "projects": "/projects", "dashboard": "/admin/dashboard", }.items(): try: payload[name] = request_json(base_url, path, timeout, auth_token) except (urllib.error.URLError, TimeoutError, OSError, json.JSONDecodeError) as exc: payload[name] = {"error": str(exc)} return payload def render_markdown(status: dict[str, Any]) -> str: health = status.get("health", {}) stats = status.get("stats", {}) projects = status.get("projects", {}).get("projects", []) dashboard = status.get("dashboard", {}) memories = dashboard.get("memories", {}) if isinstance(dashboard.get("memories"), dict) else {} project_state = dashboard.get("project_state", {}) if isinstance(dashboard.get("project_state"), dict) else {} interactions = dashboard.get("interactions", {}) if isinstance(dashboard.get("interactions"), dict) else {} pipeline = dashboard.get("pipeline", {}) if isinstance(dashboard.get("pipeline"), dict) else {} lines = [ "# AtoCore Live Status", "", f"- base_url: `{status.get('base_url', '')}`", "- endpoint_http_statuses: " f"`health={health.get('_http_status', 'error')}, " f"stats={stats.get('_http_status', 'error')}, " f"projects={status.get('projects', {}).get('_http_status', 'error')}, " f"dashboard={dashboard.get('_http_status', 'error')}`", f"- service_status: `{health.get('status', 'unknown')}`", f"- code_version: `{health.get('code_version', health.get('version', 'unknown'))}`", f"- build_sha: `{health.get('build_sha', 'unknown')}`", f"- build_branch: `{health.get('build_branch', 'unknown')}`", f"- build_time: `{health.get('build_time', 'unknown')}`", f"- env: `{health.get('env', 'unknown')}`", f"- documents: `{stats.get('total_documents', 'unknown')}`", f"- chunks: `{stats.get('total_chunks', 'unknown')}`", f"- vectors: `{stats.get('total_vectors', health.get('vectors_count', 'unknown'))}`", f"- registered_projects: `{len(projects)}`", f"- active_memories: `{memories.get('active', 'unknown')}`", f"- candidate_memories: `{memories.get('candidates', 'unknown')}`", f"- interactions: `{interactions.get('total', 'unknown')}`", f"- project_state_entries: `{project_state.get('total', 'unknown')}`", f"- pipeline_last_run: `{pipeline.get('last_run', 'unknown')}`", ] if projects: lines.extend(["", "## Projects"]) for project in projects: aliases = ", ".join(project.get("aliases", [])) suffix = f" ({aliases})" if aliases else "" lines.append(f"- `{project.get('id', '')}`{suffix}") return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser(description="Render live AtoCore status") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT) parser.add_argument( "--auth-token", default=DEFAULT_AUTH_TOKEN, help="Bearer token; defaults to ATOCORE_AUTH_TOKEN when set", ) parser.add_argument("--json", action="store_true", help="emit raw JSON") args = parser.parse_args() status = collect_status(args.base_url.rstrip("/"), args.timeout, args.auth_token) if args.json: output = json.dumps(status, indent=2, ensure_ascii=True) + "\n" else: output = render_markdown(status) try: sys.stdout.write(output) except BrokenPipeError: return 0 except OSError as exc: if exc.errno in {errno.EINVAL, errno.EPIPE}: return 0 raise return 0 if __name__ == "__main__": raise SystemExit(main())