From c49363fccc2e8f9b949e7b3650b01247165ea788 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Thu, 16 Apr 2026 20:08:20 -0400 Subject: [PATCH] feat: atocore-mcp server for universal LLM consumption (Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stdlib-only Python stdio MCP server that wraps the AtoCore HTTP API. Makes AtoCore available as built-in tools to every MCP-aware client (Claude Desktop, Claude Code, Cursor, Zed, Windsurf). 7 tools exposed: - atocore_context: full context pack (state + memories + chunks) - atocore_search: semantic retrieval with scores + sources - atocore_memory_list: filter active memories by project/type - atocore_memory_create: propose a candidate memory - atocore_project_state: query Trusted Project State by category - atocore_projects: list registered projects + aliases - atocore_health: service status check Design choices: - stdlib only (no mcp SDK dep) — AtoCore philosophy - Thin HTTP passthrough — zero business logic, zero drift risk - Fail-open: AtoCore unreachable returns graceful error, not crash - Protocol MCP 2024-11-05 compatible Registered in Claude Code: `claude mcp add atocore -- python ...` Verified: ✓ Connected, all 7 tools exposed, context/search/state return live data from Dalidou (sha=775960c8, vectors=33253). This is the keystone for master brain vision: every Claude session now has AtoCore available as built-in capability without the user or agent having to remember to invoke it. Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/atocore_mcp.py | 479 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 479 insertions(+) create mode 100644 scripts/atocore_mcp.py diff --git a/scripts/atocore_mcp.py b/scripts/atocore_mcp.py new file mode 100644 index 0000000..1aefccc --- /dev/null +++ b/scripts/atocore_mcp.py @@ -0,0 +1,479 @@ +#!/usr/bin/env python3 +"""AtoCore MCP server — stdio transport, stdlib-only. + +Exposes the AtoCore HTTP API as MCP tools so any MCP-aware client +(Claude Desktop, Claude Code, Cursor, Zed, Windsurf) can pull +context + memories automatically at prompt time. + +Design: + - stdlib only (no mcp SDK dep) — MCP protocol is simple JSON-RPC + over stdio, and AtoCore's philosophy prefers stdlib. + - Thin wrapper: every tool is a direct pass-through to an HTTP + endpoint. Zero business logic here — the AtoCore server is + the single source of truth. + - Fail-open: if AtoCore is unreachable, tools return a graceful + "unavailable" message rather than crashing the client. + +Protocol: MCP 2024-11-05 / 2025-03-26 compatible + https://spec.modelcontextprotocol.io/specification/ + +Usage (standalone test): + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"0"}}}' | python atocore_mcp.py + +Register with Claude Code: + claude mcp add atocore -- python /path/to/atocore_mcp.py + +Environment: + ATOCORE_URL base URL of the AtoCore HTTP API (default http://dalidou:8100) + ATOCORE_TIMEOUT per-request HTTP timeout seconds (default 10) +""" + +from __future__ import annotations + +import json +import os +import sys +import urllib.error +import urllib.parse +import urllib.request + +# --- Configuration --- + +ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100").rstrip("/") +HTTP_TIMEOUT = float(os.environ.get("ATOCORE_TIMEOUT", "10")) +SERVER_NAME = "atocore" +SERVER_VERSION = "0.1.0" +PROTOCOL_VERSION = "2024-11-05" + + +# --- stderr logging (stdout is reserved for JSON-RPC) --- + +def log(msg: str) -> None: + print(f"[atocore-mcp] {msg}", file=sys.stderr, flush=True) + + +# --- HTTP helpers --- + +def http_get(path: str, params: dict | None = None) -> dict: + """GET a JSON response from AtoCore. Raises on HTTP error.""" + url = ATOCORE_URL + path + if params: + # Drop empty params so the URL stays clean + clean = {k: v for k, v in params.items() if v not in (None, "", [], {})} + if clean: + url += "?" + urllib.parse.urlencode(clean) + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def http_post(path: str, body: dict) -> dict: + url = ATOCORE_URL + path + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + url, data=data, method="POST", + headers={"Content-Type": "application/json", "Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def safe_call(fn, *args, **kwargs) -> tuple[dict | None, str | None]: + """Run an HTTP call, return (result, error_message_or_None).""" + try: + return fn(*args, **kwargs), None + except urllib.error.HTTPError as e: + try: + body = e.read().decode("utf-8", errors="replace") + except Exception: + body = "" + return None, f"AtoCore HTTP {e.code}: {body[:200]}" + except urllib.error.URLError as e: + return None, f"AtoCore unreachable at {ATOCORE_URL}: {e.reason}" + except Exception as e: + return None, f"AtoCore error: {type(e).__name__}: {str(e)[:200]}" + + +# --- Tool definitions --- +# Each tool: name, description, inputSchema (JSON Schema), handler + +def _tool_context(args: dict) -> str: + """Build a full context pack for a query — state + memories + retrieved chunks.""" + query = (args.get("query") or "").strip() + project = args.get("project") or "" + if not query: + return "Error: 'query' is required." + result, err = safe_call(http_post, "/context/build", { + "prompt": query, "project": project, + }) + if err: + return f"AtoCore context unavailable: {err}" + pack = result.get("formatted_context", "") or "" + if not pack.strip(): + return "(AtoCore returned an empty context pack — no matching state, memories, or chunks.)" + return pack + + +def _tool_search(args: dict) -> str: + """Retrieval only — raw chunks ranked by semantic similarity.""" + query = (args.get("query") or "").strip() + project = args.get("project") or "" + top_k = int(args.get("top_k") or 5) + if not query: + return "Error: 'query' is required." + result, err = safe_call(http_post, "/query", { + "prompt": query, "project": project, "top_k": top_k, + }) + if err: + return f"AtoCore search unavailable: {err}" + chunks = result.get("results", []) or [] + if not chunks: + return "No results." + lines = [] + for i, c in enumerate(chunks, 1): + src = c.get("source_file") or c.get("title") or "unknown" + heading = c.get("heading_path") or "" + snippet = (c.get("content") or "")[:300] + score = c.get("score", 0.0) + head_str = f" ({heading})" if heading else "" + lines.append(f"[{i}] score={score:.3f} source={src}{head_str}\n{snippet}") + return "\n\n".join(lines) + + +def _tool_memory_list(args: dict) -> str: + """List active memories, optionally filtered by project and type.""" + params = { + "status": "active", + "limit": int(args.get("limit") or 20), + } + if args.get("project"): + params["project"] = args["project"] + if args.get("memory_type"): + params["memory_type"] = args["memory_type"] + result, err = safe_call(http_get, "/memory", params=params) + if err: + return f"AtoCore memory list unavailable: {err}" + memories = result.get("memories", []) or [] + if not memories: + return "No memories match." + lines = [] + for m in memories: + mt = m.get("memory_type", "?") + proj = m.get("project") or "(global)" + conf = m.get("confidence", 0.0) + refs = m.get("reference_count", 0) + content = (m.get("content") or "")[:250] + lines.append(f"[{mt}/{proj}] conf={conf:.2f} refs={refs}\n {content}") + return "\n\n".join(lines) + + +def _tool_memory_create(args: dict) -> str: + """Create a candidate memory (enters the triage queue).""" + memory_type = (args.get("memory_type") or "").strip() + content = (args.get("content") or "").strip() + project = args.get("project") or "" + confidence = float(args.get("confidence") or 0.5) + if not memory_type or not content: + return "Error: 'memory_type' and 'content' are required." + valid_types = ["identity", "preference", "project", "episodic", "knowledge", "adaptation"] + if memory_type not in valid_types: + return f"Error: memory_type must be one of {valid_types}." + result, err = safe_call(http_post, "/memory", { + "memory_type": memory_type, + "content": content, + "project": project, + "confidence": confidence, + "status": "candidate", + }) + if err: + return f"AtoCore memory create failed: {err}" + mid = result.get("id", "?") + return f"Candidate memory created: id={mid} type={memory_type} project={project or '(global)'}" + + +def _tool_project_state(args: dict) -> str: + """Get Trusted Project State entries for a project.""" + project = (args.get("project") or "").strip() + category = args.get("category") or "" + if not project: + return "Error: 'project' is required." + path = f"/project/state/{urllib.parse.quote(project)}" + params = {"category": category} if category else None + result, err = safe_call(http_get, path, params=params) + if err: + return f"AtoCore project state unavailable: {err}" + entries = result.get("entries", []) or result.get("state", []) or [] + if not entries: + return f"No state entries for project '{project}'." + lines = [] + for e in entries: + cat = e.get("category", "?") + key = e.get("key", "?") + value = (e.get("value") or "")[:300] + src = e.get("source") or "" + lines.append(f"[{cat}/{key}] (source: {src})\n {value}") + return "\n\n".join(lines) + + +def _tool_projects(args: dict) -> str: + """List registered AtoCore projects.""" + result, err = safe_call(http_get, "/projects") + if err: + return f"AtoCore projects unavailable: {err}" + projects = result.get("projects", []) or [] + if not projects: + return "No projects registered." + lines = [] + for p in projects: + pid = p.get("project_id") or p.get("id") or p.get("name") or "?" + aliases = p.get("aliases", []) or [] + alias_str = f" (aliases: {', '.join(aliases)})" if aliases else "" + lines.append(f"- {pid}{alias_str}") + return "\n".join(lines) + + +def _tool_health(args: dict) -> str: + """Check AtoCore service health.""" + result, err = safe_call(http_get, "/health") + if err: + return f"AtoCore unreachable: {err}" + sha = result.get("build_sha", "?")[:8] + vectors = result.get("vectors_count", "?") + env = result.get("env", "?") + return f"AtoCore healthy: sha={sha} vectors={vectors} env={env}" + + +TOOLS = [ + { + "name": "atocore_context", + "description": ( + "Get the full AtoCore context pack for a user query. Returns " + "Trusted Project State (high trust), relevant memories, and " + "retrieved source chunks formatted for prompt injection. " + "Use this FIRST on any project-related query to ground the " + "conversation in what AtoCore already knows." + ), + "inputSchema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "The user's question or task"}, + "project": {"type": "string", "description": "Project hint (e.g. 'p04-gigabit'); optional"}, + }, + "required": ["query"], + }, + "handler": _tool_context, + }, + { + "name": "atocore_search", + "description": ( + "Semantic search over AtoCore's ingested source documents. " + "Returns top-K ranked chunks. Use this when you need raw " + "references rather than a full context pack." + ), + "inputSchema": { + "type": "object", + "properties": { + "query": {"type": "string"}, + "project": {"type": "string", "description": "optional project filter"}, + "top_k": {"type": "integer", "minimum": 1, "maximum": 20, "default": 5}, + }, + "required": ["query"], + }, + "handler": _tool_search, + }, + { + "name": "atocore_memory_list", + "description": ( + "List active memories (curated facts, decisions, preferences). " + "Filter by project and/or memory_type. Use this to inspect what " + "AtoCore currently remembers about a topic." + ), + "inputSchema": { + "type": "object", + "properties": { + "project": {"type": "string"}, + "memory_type": { + "type": "string", + "enum": ["identity", "preference", "project", "episodic", "knowledge", "adaptation"], + }, + "limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20}, + }, + }, + "handler": _tool_memory_list, + }, + { + "name": "atocore_memory_create", + "description": ( + "Propose a new memory for AtoCore. Creates a CANDIDATE that " + "enters the triage queue for human/auto review — not immediately " + "active. Use this to capture durable facts/decisions that " + "should persist across sessions. Do NOT use for transient state " + "or session-specific notes." + ), + "inputSchema": { + "type": "object", + "properties": { + "memory_type": { + "type": "string", + "enum": ["identity", "preference", "project", "episodic", "knowledge", "adaptation"], + }, + "content": {"type": "string", "description": "The fact/decision/preference to remember"}, + "project": {"type": "string", "description": "project id if project-scoped; empty for global"}, + "confidence": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.5}, + }, + "required": ["memory_type", "content"], + }, + "handler": _tool_memory_create, + }, + { + "name": "atocore_project_state", + "description": ( + "Get Trusted Project State entries for a given project — the " + "highest-trust tier with curated decisions, requirements, " + "facts, contacts, milestones. Use this to look up authoritative " + "project info." + ), + "inputSchema": { + "type": "object", + "properties": { + "project": {"type": "string"}, + "category": { + "type": "string", + "enum": ["status", "decision", "requirement", "contact", "milestone", "fact", "config"], + }, + }, + "required": ["project"], + }, + "handler": _tool_project_state, + }, + { + "name": "atocore_projects", + "description": "List all registered AtoCore projects (id + aliases).", + "inputSchema": {"type": "object", "properties": {}}, + "handler": _tool_projects, + }, + { + "name": "atocore_health", + "description": "Check AtoCore service health (build SHA, vector count, env).", + "inputSchema": {"type": "object", "properties": {}}, + "handler": _tool_health, + }, +] + + +# --- JSON-RPC handlers --- + +def handle_initialize(params: dict) -> dict: + return { + "protocolVersion": PROTOCOL_VERSION, + "capabilities": { + "tools": {"listChanged": False}, + }, + "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}, + } + + +def handle_tools_list(params: dict) -> dict: + return { + "tools": [ + {"name": t["name"], "description": t["description"], "inputSchema": t["inputSchema"]} + for t in TOOLS + ] + } + + +def handle_tools_call(params: dict) -> dict: + tool_name = params.get("name", "") + args = params.get("arguments", {}) or {} + tool = next((t for t in TOOLS if t["name"] == tool_name), None) + if tool is None: + return { + "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}], + "isError": True, + } + try: + text = tool["handler"](args) + except Exception as e: + log(f"tool {tool_name} raised: {e}") + return { + "content": [{"type": "text", "text": f"Tool error: {type(e).__name__}: {e}"}], + "isError": True, + } + return {"content": [{"type": "text", "text": text}]} + + +def handle_ping(params: dict) -> dict: + return {} + + +METHODS = { + "initialize": handle_initialize, + "tools/list": handle_tools_list, + "tools/call": handle_tools_call, + "ping": handle_ping, +} + + +# --- stdio main loop --- + +def send(obj: dict) -> None: + """Write a single-line JSON message to stdout and flush.""" + sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n") + sys.stdout.flush() + + +def make_response(req_id, result=None, error=None) -> dict: + resp = {"jsonrpc": "2.0", "id": req_id} + if error is not None: + resp["error"] = error + else: + resp["result"] = result if result is not None else {} + return resp + + +def main() -> int: + log(f"starting (AtoCore at {ATOCORE_URL})") + for line in sys.stdin: + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + except json.JSONDecodeError as e: + log(f"parse error: {e}") + continue + + method = msg.get("method", "") + req_id = msg.get("id") + params = msg.get("params", {}) or {} + + # Notifications (no id) don't need a response + if req_id is None: + if method == "notifications/initialized": + log("client initialized") + continue + + handler = METHODS.get(method) + if handler is None: + send(make_response(req_id, error={ + "code": -32601, + "message": f"Method not found: {method}", + })) + continue + + try: + result = handler(params) + send(make_response(req_id, result=result)) + except Exception as e: + log(f"handler {method} raised: {e}") + send(make_response(req_id, error={ + "code": -32603, + "message": f"Internal error: {type(e).__name__}: {e}", + })) + + log("stdin closed, exiting") + return 0 + + +if __name__ == "__main__": + sys.exit(main())