#!/usr/bin/env python3 """AtoCore MCP server — stdio transport, stdlib-only. Exposes the AtoCore HTTP API as MCP tools so any MCP-aware client (Claude Desktop, Claude Code, Cursor, Zed, Windsurf) can pull context + memories automatically at prompt time. Design: - stdlib only (no mcp SDK dep) — MCP protocol is simple JSON-RPC over stdio, and AtoCore's philosophy prefers stdlib. - Thin wrapper: every tool is a direct pass-through to an HTTP endpoint. Zero business logic here — the AtoCore server is the single source of truth. - Fail-open: if AtoCore is unreachable, tools return a graceful "unavailable" message rather than crashing the client. Protocol: MCP 2024-11-05 / 2025-03-26 compatible https://spec.modelcontextprotocol.io/specification/ Usage (standalone test): echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"0"}}}' | python atocore_mcp.py Register with Claude Code: claude mcp add atocore -- python /path/to/atocore_mcp.py Environment: ATOCORE_URL base URL of the AtoCore HTTP API (default http://dalidou:8100) ATOCORE_TIMEOUT per-request HTTP timeout seconds (default 10) """ from __future__ import annotations import json import os import sys import urllib.error import urllib.parse import urllib.request # --- Configuration --- ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100").rstrip("/") HTTP_TIMEOUT = float(os.environ.get("ATOCORE_TIMEOUT", "10")) SERVER_NAME = "atocore" SERVER_VERSION = "0.1.0" PROTOCOL_VERSION = "2024-11-05" # --- stderr logging (stdout is reserved for JSON-RPC) --- def log(msg: str) -> None: print(f"[atocore-mcp] {msg}", file=sys.stderr, flush=True) # --- HTTP helpers --- def http_get(path: str, params: dict | None = None) -> dict: """GET a JSON response from AtoCore. Raises on HTTP error.""" url = ATOCORE_URL + path if params: # Drop empty params so the URL stays clean clean = {k: v for k, v in params.items() if v not in (None, "", [], {})} if clean: url += "?" + urllib.parse.urlencode(clean) req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp: return json.loads(resp.read().decode("utf-8")) def http_post(path: str, body: dict) -> dict: url = ATOCORE_URL + path data = json.dumps(body).encode("utf-8") req = urllib.request.Request( url, data=data, method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}, ) with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp: return json.loads(resp.read().decode("utf-8")) def safe_call(fn, *args, **kwargs) -> tuple[dict | None, str | None]: """Run an HTTP call, return (result, error_message_or_None).""" try: return fn(*args, **kwargs), None except urllib.error.HTTPError as e: try: body = e.read().decode("utf-8", errors="replace") except Exception: body = "" return None, f"AtoCore HTTP {e.code}: {body[:200]}" except urllib.error.URLError as e: return None, f"AtoCore unreachable at {ATOCORE_URL}: {e.reason}" except Exception as e: return None, f"AtoCore error: {type(e).__name__}: {str(e)[:200]}" # --- Tool definitions --- # Each tool: name, description, inputSchema (JSON Schema), handler def _tool_context(args: dict) -> str: """Build a full context pack for a query — state + memories + retrieved chunks.""" query = (args.get("query") or "").strip() project = args.get("project") or "" if not query: return "Error: 'query' is required." result, err = safe_call(http_post, "/context/build", { "prompt": query, "project": project, }) if err: return f"AtoCore context unavailable: {err}" pack = result.get("formatted_context", "") or "" if not pack.strip(): return "(AtoCore returned an empty context pack — no matching state, memories, or chunks.)" return pack def _tool_search(args: dict) -> str: """Retrieval only — raw chunks ranked by semantic similarity.""" query = (args.get("query") or "").strip() project = args.get("project") or "" top_k = int(args.get("top_k") or 5) if not query: return "Error: 'query' is required." result, err = safe_call(http_post, "/query", { "prompt": query, "project": project, "top_k": top_k, }) if err: return f"AtoCore search unavailable: {err}" chunks = result.get("results", []) or [] if not chunks: return "No results." lines = [] for i, c in enumerate(chunks, 1): src = c.get("source_file") or c.get("title") or "unknown" heading = c.get("heading_path") or "" snippet = (c.get("content") or "")[:300] score = c.get("score", 0.0) head_str = f" ({heading})" if heading else "" lines.append(f"[{i}] score={score:.3f} source={src}{head_str}\n{snippet}") return "\n\n".join(lines) def _tool_memory_list(args: dict) -> str: """List active memories, optionally filtered by project and type.""" params = { "status": "active", "limit": int(args.get("limit") or 20), } if args.get("project"): params["project"] = args["project"] if args.get("memory_type"): params["memory_type"] = args["memory_type"] result, err = safe_call(http_get, "/memory", params=params) if err: return f"AtoCore memory list unavailable: {err}" memories = result.get("memories", []) or [] if not memories: return "No memories match." lines = [] for m in memories: mt = m.get("memory_type", "?") proj = m.get("project") or "(global)" conf = m.get("confidence", 0.0) refs = m.get("reference_count", 0) content = (m.get("content") or "")[:250] lines.append(f"[{mt}/{proj}] conf={conf:.2f} refs={refs}\n {content}") return "\n\n".join(lines) def _tool_memory_create(args: dict) -> str: """Create a candidate memory (enters the triage queue).""" memory_type = (args.get("memory_type") or "").strip() content = (args.get("content") or "").strip() project = args.get("project") or "" confidence = float(args.get("confidence") or 0.5) if not memory_type or not content: return "Error: 'memory_type' and 'content' are required." valid_types = ["identity", "preference", "project", "episodic", "knowledge", "adaptation"] if memory_type not in valid_types: return f"Error: memory_type must be one of {valid_types}." result, err = safe_call(http_post, "/memory", { "memory_type": memory_type, "content": content, "project": project, "confidence": confidence, "status": "candidate", }) if err: return f"AtoCore memory create failed: {err}" mid = result.get("id", "?") return f"Candidate memory created: id={mid} type={memory_type} project={project or '(global)'}" def _tool_project_state(args: dict) -> str: """Get Trusted Project State entries for a project.""" project = (args.get("project") or "").strip() category = args.get("category") or "" if not project: return "Error: 'project' is required." path = f"/project/state/{urllib.parse.quote(project)}" params = {"category": category} if category else None result, err = safe_call(http_get, path, params=params) if err: return f"AtoCore project state unavailable: {err}" entries = result.get("entries", []) or result.get("state", []) or [] if not entries: return f"No state entries for project '{project}'." lines = [] for e in entries: cat = e.get("category", "?") key = e.get("key", "?") value = (e.get("value") or "")[:300] src = e.get("source") or "" lines.append(f"[{cat}/{key}] (source: {src})\n {value}") return "\n\n".join(lines) def _tool_projects(args: dict) -> str: """List registered AtoCore projects.""" result, err = safe_call(http_get, "/projects") if err: return f"AtoCore projects unavailable: {err}" projects = result.get("projects", []) or [] if not projects: return "No projects registered." lines = [] for p in projects: pid = p.get("project_id") or p.get("id") or p.get("name") or "?" aliases = p.get("aliases", []) or [] alias_str = f" (aliases: {', '.join(aliases)})" if aliases else "" lines.append(f"- {pid}{alias_str}") return "\n".join(lines) def _tool_health(args: dict) -> str: """Check AtoCore service health.""" result, err = safe_call(http_get, "/health") if err: return f"AtoCore unreachable: {err}" sha = result.get("build_sha", "?")[:8] vectors = result.get("vectors_count", "?") env = result.get("env", "?") return f"AtoCore healthy: sha={sha} vectors={vectors} env={env}" # --- Phase 5H: Engineering query tools --- def _tool_system_map(args: dict) -> str: """Q-001 + Q-004: subsystem/component tree for a project.""" project = (args.get("project") or "").strip() if not project: return "Error: 'project' is required." result, err = safe_call( http_get, f"/engineering/projects/{urllib.parse.quote(project)}/systems" ) if err: return f"Engineering query failed: {err}" subs = result.get("subsystems", []) or [] orphans = result.get("orphan_components", []) or [] if not subs and not orphans: return f"No subsystems or components registered for {project}." lines = [f"System map for {project}:"] for s in subs: lines.append(f"\n[{s['name']}] — {s.get('description') or '(no description)'}") for c in s.get("components", []): mats = ", ".join(c.get("materials", [])) or "-" lines.append(f" • {c['name']} (materials: {mats})") if orphans: lines.append(f"\nOrphan components (not attached to any subsystem):") for c in orphans: lines.append(f" • {c['name']}") return "\n".join(lines) def _tool_gaps(args: dict) -> str: """Q-006 + Q-009 + Q-011: find coverage gaps. Director's most-used query.""" project = (args.get("project") or "").strip() if not project: return "Error: 'project' is required." result, err = safe_call( http_get, f"/engineering/gaps", params={"project": project}, ) if err: return f"Gap query failed: {err}" orphan = result.get("orphan_requirements", {}) risky = result.get("risky_decisions", {}) unsup = result.get("unsupported_claims", {}) counts = f"{orphan.get('count',0)}/{risky.get('count',0)}/{unsup.get('count',0)}" lines = [f"Coverage gaps for {project} (orphan reqs / risky decisions / unsupported claims: {counts}):\n"] if orphan.get("count", 0): lines.append(f"ORPHAN REQUIREMENTS ({orphan['count']}) — no component claims to satisfy:") for g in orphan.get("gaps", [])[:10]: lines.append(f" • {g['name']}: {(g.get('description') or '')[:120]}") lines.append("") if risky.get("count", 0): lines.append(f"RISKY DECISIONS ({risky['count']}) — based on flagged assumptions:") for g in risky.get("gaps", [])[:10]: lines.append(f" • {g['decision_name']} (assumption: {g['assumption_name']} — {g['assumption_status']})") lines.append("") if unsup.get("count", 0): lines.append(f"UNSUPPORTED CLAIMS ({unsup['count']}) — no Result entity backs them:") for g in unsup.get("gaps", [])[:10]: lines.append(f" • {g['name']}: {(g.get('description') or '')[:120]}") if orphan.get("count", 0) == 0 and risky.get("count", 0) == 0 and unsup.get("count", 0) == 0: lines.append("✓ No gaps detected — every requirement satisfied, no flagged assumptions, all claims have evidence.") return "\n".join(lines) def _tool_requirements_for(args: dict) -> str: """Q-005: requirements that a component satisfies.""" component_id = (args.get("component_id") or "").strip() if not component_id: return "Error: 'component_id' is required." result, err = safe_call( http_get, f"/engineering/components/{urllib.parse.quote(component_id)}/requirements" ) if err: return f"Query failed: {err}" reqs = result.get("requirements", []) or [] if not reqs: return "No requirements associated with this component." lines = [f"Component satisfies {len(reqs)} requirement(s):"] for r in reqs: lines.append(f" • {r['name']}: {(r.get('description') or '')[:150]}") return "\n".join(lines) def _tool_decisions_affecting(args: dict) -> str: """Q-008: decisions affecting a project or subsystem.""" project = (args.get("project") or "").strip() subsystem = args.get("subsystem_id") or args.get("subsystem") or "" if not project: return "Error: 'project' is required." params = {"project": project} if subsystem: params["subsystem"] = subsystem result, err = safe_call(http_get, "/engineering/decisions", params=params) if err: return f"Query failed: {err}" decisions = result.get("decisions", []) or [] if not decisions: scope = f"subsystem {subsystem}" if subsystem else f"project {project}" return f"No decisions recorded for {scope}." scope = f"subsystem {subsystem}" if subsystem else project lines = [f"{len(decisions)} decision(s) affecting {scope}:"] for d in decisions: lines.append(f" • {d['name']}: {(d.get('description') or '')[:150]}") return "\n".join(lines) def _tool_recent_changes(args: dict) -> str: """Q-013: what changed recently in the engineering graph.""" project = (args.get("project") or "").strip() since = args.get("since") or "" limit = int(args.get("limit") or 20) if not project: return "Error: 'project' is required." params = {"project": project, "limit": limit} if since: params["since"] = since result, err = safe_call(http_get, "/engineering/changes", params=params) if err: return f"Query failed: {err}" changes = result.get("changes", []) or [] if not changes: return f"No entity changes in {project} since {since or '(all time)'}." lines = [f"Recent changes in {project} ({len(changes)}):"] for c in changes: lines.append( f" [{c['timestamp'][:16]}] {c['action']:10s} " f"[{c.get('entity_type','?')}] {c.get('entity_name','?')} " f"by {c.get('actor','?')}" ) return "\n".join(lines) def _tool_impact(args: dict) -> str: """Q-016: impact of changing an entity (downstream BFS).""" entity = (args.get("entity_id") or args.get("entity") or "").strip() if not entity: return "Error: 'entity_id' is required." max_depth = int(args.get("max_depth") or 3) result, err = safe_call( http_get, "/engineering/impact", params={"entity": entity, "max_depth": max_depth}, ) if err: return f"Query failed: {err}" root = result.get("root") or {} impacted = result.get("impacted", []) or [] if not impacted: return f"Nothing downstream of [{root.get('entity_type','?')}] {root.get('name','?')}." lines = [ f"Changing [{root.get('entity_type')}] {root.get('name')} " f"would affect {len(impacted)} entity(ies) (max depth {max_depth}):" ] for i in impacted[:25]: indent = " " * i.get("depth", 1) lines.append(f"{indent}→ [{i['entity_type']}] {i['name']} (via {i['relationship']})") if len(impacted) > 25: lines.append(f" ... and {len(impacted)-25} more") return "\n".join(lines) def _tool_evidence(args: dict) -> str: """Q-017: evidence chain for an entity.""" entity = (args.get("entity_id") or args.get("entity") or "").strip() if not entity: return "Error: 'entity_id' is required." result, err = safe_call(http_get, "/engineering/evidence", params={"entity": entity}) if err: return f"Query failed: {err}" root = result.get("root") or {} chain = result.get("evidence_chain", []) or [] lines = [f"Evidence for [{root.get('entity_type','?')}] {root.get('name','?')}:"] if not chain: lines.append(" (no inbound provenance edges)") else: for e in chain: lines.append( f" {e['via']} ← [{e['source_type']}] {e['source_name']}: " f"{(e.get('source_description') or '')[:100]}" ) refs = result.get("direct_source_refs") or [] if refs: lines.append(f"\nDirect source_refs: {refs[:5]}") return "\n".join(lines) TOOLS = [ { "name": "atocore_context", "description": ( "Get the full AtoCore context pack for a user query. Returns " "Trusted Project State (high trust), relevant memories, and " "retrieved source chunks formatted for prompt injection. " "Use this FIRST on any project-related query to ground the " "conversation in what AtoCore already knows." ), "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "The user's question or task"}, "project": {"type": "string", "description": "Project hint (e.g. 'p04-gigabit'); optional"}, }, "required": ["query"], }, "handler": _tool_context, }, { "name": "atocore_search", "description": ( "Semantic search over AtoCore's ingested source documents. " "Returns top-K ranked chunks. Use this when you need raw " "references rather than a full context pack." ), "inputSchema": { "type": "object", "properties": { "query": {"type": "string"}, "project": {"type": "string", "description": "optional project filter"}, "top_k": {"type": "integer", "minimum": 1, "maximum": 20, "default": 5}, }, "required": ["query"], }, "handler": _tool_search, }, { "name": "atocore_memory_list", "description": ( "List active memories (curated facts, decisions, preferences). " "Filter by project and/or memory_type. Use this to inspect what " "AtoCore currently remembers about a topic." ), "inputSchema": { "type": "object", "properties": { "project": {"type": "string"}, "memory_type": { "type": "string", "enum": ["identity", "preference", "project", "episodic", "knowledge", "adaptation"], }, "limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20}, }, }, "handler": _tool_memory_list, }, { "name": "atocore_memory_create", "description": ( "Propose a new memory for AtoCore. Creates a CANDIDATE that " "enters the triage queue for human/auto review — not immediately " "active. Use this to capture durable facts/decisions that " "should persist across sessions. Do NOT use for transient state " "or session-specific notes." ), "inputSchema": { "type": "object", "properties": { "memory_type": { "type": "string", "enum": ["identity", "preference", "project", "episodic", "knowledge", "adaptation"], }, "content": {"type": "string", "description": "The fact/decision/preference to remember"}, "project": {"type": "string", "description": "project id if project-scoped; empty for global"}, "confidence": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.5}, }, "required": ["memory_type", "content"], }, "handler": _tool_memory_create, }, { "name": "atocore_project_state", "description": ( "Get Trusted Project State entries for a given project — the " "highest-trust tier with curated decisions, requirements, " "facts, contacts, milestones. Use this to look up authoritative " "project info." ), "inputSchema": { "type": "object", "properties": { "project": {"type": "string"}, "category": { "type": "string", "enum": ["status", "decision", "requirement", "contact", "milestone", "fact", "config"], }, }, "required": ["project"], }, "handler": _tool_project_state, }, { "name": "atocore_projects", "description": "List all registered AtoCore projects (id + aliases).", "inputSchema": {"type": "object", "properties": {}}, "handler": _tool_projects, }, { "name": "atocore_health", "description": "Check AtoCore service health (build SHA, vector count, env).", "inputSchema": {"type": "object", "properties": {}}, "handler": _tool_health, }, # --- Phase 5H: Engineering knowledge graph tools --- { "name": "atocore_engineering_map", "description": ( "Get the subsystem/component tree for an engineering project. " "Returns the full system architecture: subsystems, their components, " "materials, and any orphan components not attached to a subsystem. " "Use when the user asks about project structure or system design." ), "inputSchema": { "type": "object", "properties": { "project": {"type": "string", "description": "Project id (e.g. p04-gigabit)"}, }, "required": ["project"], }, "handler": _tool_system_map, }, { "name": "atocore_engineering_gaps", "description": ( "Find coverage gaps in a project's engineering graph: orphan " "requirements (no component satisfies them), risky decisions " "(based on flagged assumptions), and unsupported claims (no " "Result evidence). This is the director's most useful query — " "answers 'what am I forgetting?' in seconds." ), "inputSchema": { "type": "object", "properties": { "project": {"type": "string"}, }, "required": ["project"], }, "handler": _tool_gaps, }, { "name": "atocore_engineering_requirements_for_component", "description": "List the requirements a specific component claims to satisfy (Q-005).", "inputSchema": { "type": "object", "properties": { "component_id": {"type": "string"}, }, "required": ["component_id"], }, "handler": _tool_requirements_for, }, { "name": "atocore_engineering_decisions", "description": ( "Decisions that affect a project, optionally scoped to a specific " "subsystem. Use when the user asks 'what did we decide about X?'" ), "inputSchema": { "type": "object", "properties": { "project": {"type": "string"}, "subsystem_id": {"type": "string", "description": "optional subsystem entity id"}, }, "required": ["project"], }, "handler": _tool_decisions_affecting, }, { "name": "atocore_engineering_changes", "description": ( "Recent changes to the engineering graph for a project: which " "entities were created/promoted/rejected/updated, by whom, when. " "Use for 'what changed recently?' type questions." ), "inputSchema": { "type": "object", "properties": { "project": {"type": "string"}, "since": {"type": "string", "description": "ISO timestamp; optional"}, "limit": {"type": "integer", "minimum": 1, "maximum": 200, "default": 20}, }, "required": ["project"], }, "handler": _tool_recent_changes, }, { "name": "atocore_engineering_impact", "description": ( "Impact analysis: what's downstream of a given entity. BFS over " "outbound relationships up to max_depth. Use to answer 'what would " "break if I change X?'" ), "inputSchema": { "type": "object", "properties": { "entity_id": {"type": "string"}, "max_depth": {"type": "integer", "minimum": 1, "maximum": 5, "default": 3}, }, "required": ["entity_id"], }, "handler": _tool_impact, }, { "name": "atocore_engineering_evidence", "description": ( "Evidence chain for an entity: what supports it? Walks inbound " "SUPPORTS / EVIDENCED_BY / DESCRIBED_BY / VALIDATED_BY / ANALYZED_BY " "edges. Use for 'how do we know X is true?' type questions." ), "inputSchema": { "type": "object", "properties": { "entity_id": {"type": "string"}, }, "required": ["entity_id"], }, "handler": _tool_evidence, }, ] # --- JSON-RPC handlers --- def handle_initialize(params: dict) -> dict: return { "protocolVersion": PROTOCOL_VERSION, "capabilities": { "tools": {"listChanged": False}, }, "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}, } def handle_tools_list(params: dict) -> dict: return { "tools": [ {"name": t["name"], "description": t["description"], "inputSchema": t["inputSchema"]} for t in TOOLS ] } def handle_tools_call(params: dict) -> dict: tool_name = params.get("name", "") args = params.get("arguments", {}) or {} tool = next((t for t in TOOLS if t["name"] == tool_name), None) if tool is None: return { "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}], "isError": True, } try: text = tool["handler"](args) except Exception as e: log(f"tool {tool_name} raised: {e}") return { "content": [{"type": "text", "text": f"Tool error: {type(e).__name__}: {e}"}], "isError": True, } return {"content": [{"type": "text", "text": text}]} def handle_ping(params: dict) -> dict: return {} METHODS = { "initialize": handle_initialize, "tools/list": handle_tools_list, "tools/call": handle_tools_call, "ping": handle_ping, } # --- stdio main loop --- def send(obj: dict) -> None: """Write a single-line JSON message to stdout and flush.""" sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n") sys.stdout.flush() def make_response(req_id, result=None, error=None) -> dict: resp = {"jsonrpc": "2.0", "id": req_id} if error is not None: resp["error"] = error else: resp["result"] = result if result is not None else {} return resp def main() -> int: log(f"starting (AtoCore at {ATOCORE_URL})") for line in sys.stdin: line = line.strip() if not line: continue try: msg = json.loads(line) except json.JSONDecodeError as e: log(f"parse error: {e}") continue method = msg.get("method", "") req_id = msg.get("id") params = msg.get("params", {}) or {} # Notifications (no id) don't need a response if req_id is None: if method == "notifications/initialized": log("client initialized") continue handler = METHODS.get(method) if handler is None: send(make_response(req_id, error={ "code": -32601, "message": f"Method not found: {method}", })) continue try: result = handler(params) send(make_response(req_id, result=result)) except Exception as e: log(f"handler {method} raised: {e}") send(make_response(req_id, error={ "code": -32603, "message": f"Internal error: {type(e).__name__}: {e}", })) log("stdin closed, exiting") return 0 if __name__ == "__main__": sys.exit(main())