From 86637f8eee0fe750b6bb66aaf6f5e16ec031943e Mon Sep 17 00:00:00 2001 From: Anto01 Date: Thu, 16 Apr 2026 20:14:25 -0400 Subject: [PATCH] feat: universal LLM consumption (Phase 1 complete) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the Phase 1 master brain keystone: every LLM interaction across the ecosystem now pulls context from AtoCore automatically. Three adapters, one HTTP backend: 1. OpenClaw plugin pull (handler.js): - Added before_prompt_build hook that calls /context/build and injects the pack via prependContext - Existing capture hooks (before_agent_start + llm_output) unchanged - 6s context timeout, fail-open on AtoCore unreachable - Deployed to T420, gateway restarted, "7 plugins loaded" 2. atocore-proxy (scripts/atocore_proxy.py): - Stdlib-only OpenAI-compatible HTTP middleware - Drop-in layer for Codex, Ollama, LiteLLM, any OpenAI-compat client - Intercepts /chat/completions: extracts query, pulls context, injects as system message, forwards to upstream, captures back - Fail-open: AtoCore down = passthrough without injection - Configurable via env: UPSTREAM, PORT, CLIENT_LABEL, INJECT, CAPTURE 3. (from prior commit c49363f) atocore-mcp: - stdio MCP server, stdlib Python, 7 tools exposed - Registered in Claude Code: "✓ Connected" Plus quick win: - Project synthesis moved from Sunday-only to daily cron so wiki / mirror pages stay fresh (Step C in batch-extract.sh). Lint stays weekly. Plus docs: - docs/universal-consumption.md: configuration guide for all 3 adapters with registration/env-var tables and verification checklist Plus housekeeping: - .gitignore: add .mypy_cache/ Tests: 303/303 passing. This closes the consumption gap: the reinforcement feedback loop can now actually work (memories get injected → get referenced → reinforcement fires → auto-promotion). Every Claude, OpenClaw, Codex, or Ollama session is automatically AtoCore-grounded. Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 1 + deploy/dalidou/batch-extract.sh | 17 +- docs/universal-consumption.md | 274 +++++++++++++++++ openclaw-plugins/atocore-capture/handler.js | 159 +++++++--- scripts/atocore_proxy.py | 321 ++++++++++++++++++++ 5 files changed, 726 insertions(+), 46 deletions(-) create mode 100644 docs/universal-consumption.md create mode 100644 scripts/atocore_proxy.py diff --git a/.gitignore b/.gitignore index 1069933..7fb3976 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ __pycache__/ dist/ build/ .pytest_cache/ +.mypy_cache/ htmlcov/ .coverage venv/ diff --git a/deploy/dalidou/batch-extract.sh b/deploy/dalidou/batch-extract.sh index 11a707a..f6faa63 100644 --- a/deploy/dalidou/batch-extract.sh +++ b/deploy/dalidou/batch-extract.sh @@ -65,15 +65,16 @@ python3 "$APP_DIR/scripts/auto_promote_reinforced.py" \ log "WARN: auto-promote/expire failed (non-blocking)" } -# Step C: Weekly synthesis (Sundays only) -if [[ "$(date -u +%u)" == "7" ]]; then - log "Step C: weekly project synthesis" - python3 "$APP_DIR/scripts/synthesize_projects.py" \ - --base-url "$ATOCORE_URL" \ - 2>&1 || { - log "WARN: synthesis failed (non-blocking)" - } +# Step C: Daily project synthesis (keeps wiki/mirror pages fresh) +log "Step C: project synthesis (daily)" +python3 "$APP_DIR/scripts/synthesize_projects.py" \ + --base-url "$ATOCORE_URL" \ + 2>&1 || { + log "WARN: synthesis failed (non-blocking)" +} +# Step D: Weekly lint pass (Sundays only — heavier, not needed daily) +if [[ "$(date -u +%u)" == "7" ]]; then log "Step D: weekly lint pass" python3 "$APP_DIR/scripts/lint_knowledge_base.py" \ --base-url "$ATOCORE_URL" \ diff --git a/docs/universal-consumption.md b/docs/universal-consumption.md new file mode 100644 index 0000000..3930a30 --- /dev/null +++ b/docs/universal-consumption.md @@ -0,0 +1,274 @@ +# Universal Consumption — Connecting LLM Clients to AtoCore + +Phase 1 of the Master Brain plan. Every LLM interaction across the ecosystem +pulls context from AtoCore automatically, without the user or agent having +to remember to ask for it. + +## Architecture + +``` + ┌─────────────────────┐ + │ AtoCore HTTP API │ ← single source of truth + │ http://dalidou:8100│ + └──────────┬──────────┘ + │ + ┌────────────────────┼────────────────────┐ + │ │ │ + ┌───┴────┐ ┌─────┴────┐ ┌────┴────┐ + │ MCP │ │ OpenClaw │ │ HTTP │ + │ server │ │ plugin │ │ proxy │ + └───┬────┘ └──────┬───┘ └────┬────┘ + │ │ │ + Claude/Cursor/ OpenClaw Codex/Ollama/ + Zed/Windsurf any OpenAI-compat client +``` + +Three adapters, one HTTP backend. Each adapter is a thin passthrough — no +business logic duplicated. + +--- + +## Adapter 1: MCP Server (Claude Desktop, Claude Code, Cursor, Zed, Windsurf) + +The MCP server is `scripts/atocore_mcp.py` — stdlib-only Python, stdio +transport, wraps the HTTP API. Claude-family clients see AtoCore as built-in +tools just like `Read` or `Bash`. + +### Tools exposed + +- **`atocore_context`** (most important): Full context pack for a query — + Trusted Project State + memories + retrieved chunks. Use at the start of + any project-related conversation to ground it. +- **`atocore_search`**: Semantic search over ingested documents (top-K chunks). +- **`atocore_memory_list`**: List active memories, filterable by project + type. +- **`atocore_memory_create`**: Propose a candidate memory (enters triage queue). +- **`atocore_project_state`**: Get Trusted Project State entries by category. +- **`atocore_projects`**: List registered projects + aliases. +- **`atocore_health`**: Service status check. + +### Registration + +#### Claude Code (CLI) +```bash +claude mcp add atocore -- python C:/Users/antoi/ATOCore/scripts/atocore_mcp.py +claude mcp list # verify: "atocore ... ✓ Connected" +``` + +#### Claude Desktop (GUI) +Edit `~/Library/Application Support/Claude/claude_desktop_config.json` +(macOS) or `%APPDATA%\Claude\claude_desktop_config.json` (Windows): + +```json +{ + "mcpServers": { + "atocore": { + "command": "python", + "args": ["C:/Users/antoi/ATOCore/scripts/atocore_mcp.py"], + "env": { + "ATOCORE_URL": "http://dalidou:8100" + } + } + } +} +``` +Restart Claude Desktop. + +#### Cursor / Zed / Windsurf +Similar JSON config in each tool's MCP settings. Consult their docs — +the config schema is standard MCP. + +### Configuration + +Environment variables the MCP server honors: + +| Var | Default | Purpose | +|---|---|---| +| `ATOCORE_URL` | `http://dalidou:8100` | Where to reach AtoCore | +| `ATOCORE_TIMEOUT` | `10` | Per-request HTTP timeout (seconds) | + +### Behavior + +- Fail-open: if Dalidou is unreachable, tools return "AtoCore unavailable" + error messages but don't crash the client. +- Zero business logic: every tool is a direct HTTP passthrough. +- stdlib only: no MCP SDK dependency. + +--- + +## Adapter 2: OpenClaw Plugin (`openclaw-plugins/atocore-capture/handler.js`) + +The plugin on T420 OpenClaw has two responsibilities: + +1. **CAPTURE**: On `before_agent_start` + `llm_output`, POST completed turns + to AtoCore `/interactions` (existing). +2. **PULL**: On `before_prompt_build`, call `/context/build` and inject the + context pack via `prependContext` so the agent's system prompt includes + AtoCore knowledge. + +### Deployment + +The plugin is loaded from +`/tmp/atocore-openclaw-capture-plugin/openclaw-plugins/atocore-capture/` +on the T420 (per OpenClaw's plugin config at `~/.openclaw/openclaw.json`). + +To update: +```bash +scp openclaw-plugins/atocore-capture/handler.js \ + papa@192.168.86.39:/tmp/atocore-openclaw-capture-plugin/openclaw-plugins/atocore-capture/index.js +ssh papa@192.168.86.39 'systemctl --user restart openclaw-gateway' +``` + +Verify in gateway logs: look for "ready (7 plugins: acpx, atocore-capture, ...)" + +### Configuration (env vars set on T420) + +| Var | Default | Purpose | +|---|---|---| +| `ATOCORE_BASE_URL` | `http://dalidou:8100` | AtoCore HTTP endpoint | +| `ATOCORE_PULL_DISABLED` | (unset) | Set to `1` to disable context pull | + +### Behavior + +- Fail-open: AtoCore unreachable = no injection, no capture, agent runs + normally. +- 6s timeout on context pull, 10s on capture — won't stall the agent. +- Context pack prepended as a clearly-bracketed block so the agent can see + it's auto-injected grounding info. + +--- + +## Adapter 3: HTTP Proxy (`scripts/atocore_proxy.py`) + +A stdlib-only OpenAI-compatible HTTP proxy. Sits between any +OpenAI-API-speaking client and the real provider, enriches every +`/chat/completions` request with AtoCore context. + +Works with: +- **Codex CLI** (OpenAI-compatible endpoint) +- **Ollama** (has OpenAI-compatible `/v1` endpoint since 0.1.24) +- **LiteLLM**, **llama.cpp server**, custom agents +- Anything that can be pointed at a custom base URL + +### Start it + +```bash +# For Ollama (local models): +ATOCORE_UPSTREAM=http://localhost:11434/v1 \ + python scripts/atocore_proxy.py + +# For OpenAI cloud: +ATOCORE_UPSTREAM=https://api.openai.com/v1 \ + ATOCORE_CLIENT_LABEL=codex \ + python scripts/atocore_proxy.py + +# Test: +curl http://127.0.0.1:11435/healthz +``` + +### Point a client at it + +Set the client's OpenAI base URL to `http://127.0.0.1:11435/v1`. + +#### Ollama example: +```bash +OPENAI_BASE_URL=http://127.0.0.1:11435/v1 \ + some-openai-client --model llama3:8b +``` + +#### Codex CLI: +Set `OPENAI_BASE_URL=http://127.0.0.1:11435/v1` in your codex config. + +### Configuration + +| Var | Default | Purpose | +|---|---|---| +| `ATOCORE_URL` | `http://dalidou:8100` | AtoCore HTTP endpoint | +| `ATOCORE_UPSTREAM` | (required) | Real provider base URL | +| `ATOCORE_PROXY_PORT` | `11435` | Proxy listen port | +| `ATOCORE_PROXY_HOST` | `127.0.0.1` | Proxy bind address | +| `ATOCORE_CLIENT_LABEL` | `proxy` | Client id in captures | +| `ATOCORE_INJECT` | `1` | Inject context (set `0` to disable) | +| `ATOCORE_CAPTURE` | `1` | Capture interactions (set `0` to disable) | + +### Behavior + +- GET requests (model listing etc) pass through unchanged +- POST to `/chat/completions` (or `/v1/chat/completions`) gets enriched: + 1. Last user message extracted as query + 2. AtoCore `/context/build` called with 6s timeout + 3. Pack injected as system message (or prepended to existing system) + 4. Enriched body forwarded to upstream + 5. After success, interaction POSTed to `/interactions` in background +- Fail-open: AtoCore unreachable = pass through without injection +- Streaming responses: currently buffered (not true stream). Good enough for + most cases; can be upgraded later if needed. + +### Running as a service + +On Linux, create `~/.config/systemd/user/atocore-proxy.service`: +```ini +[Unit] +Description=AtoCore HTTP proxy + +[Service] +Environment=ATOCORE_UPSTREAM=http://localhost:11434/v1 +Environment=ATOCORE_CLIENT_LABEL=ollama +ExecStart=/usr/bin/python3 /path/to/scripts/atocore_proxy.py +Restart=on-failure + +[Install] +WantedBy=default.target +``` +Then: `systemctl --user enable --now atocore-proxy` + +On Windows, register via Task Scheduler (similar pattern to backup task) +or use NSSM to install as a service. + +--- + +## Verification Checklist + +Fresh end-to-end test to confirm Phase 1 is working: + +### For Claude Code (MCP) +1. Open a new Claude Code session (not this one). +2. Ask: "what do we know about p06 polisher's control architecture?" +3. Claude should invoke `atocore_context` or `atocore_project_state` + on its own and answer grounded in AtoCore data. + +### For OpenClaw (plugin pull) +1. Send a Discord message to OpenClaw: "what's the status on p04?" +2. Check T420 logs: `journalctl --user -u openclaw-gateway --since "1 min ago" | grep atocore-pull` +3. Expect: `atocore-pull:injected project=p04-gigabit chars=NNN` + +### For proxy (any OpenAI-compat client) +1. Start proxy with appropriate upstream +2. Run a client query through it +3. Check stderr: `[atocore-proxy] inject: project=... chars=...` +4. Check `curl http://127.0.0.1:8100/interactions?client=proxy` — should + show the captured turn + +--- + +## Why not just MCP everywhere? + +MCP is great for Claude-family clients but: +- Not supported natively by Codex CLI, Ollama, or OpenAI's own API +- No universal "attach MCP" mechanism in all LLM runtimes +- HTTP APIs are truly universal + +HTTP API is the truth, each adapter is the thinnest possible shim for its +ecosystem. When new adapters are needed (Gemini CLI, Claude Code plugin +system, etc.), they follow the same pattern. + +--- + +## Future enhancements + +- **Streaming passthrough** in the proxy (currently buffered for simplicity) +- **Response grounding check**: parse assistant output for references to + injected context, count reinforcement events +- **Per-client metrics** in the dashboard: how often each client pulls, + context pack size, injection rate +- **Smart project detection**: today we use keyword matching; could use + AtoCore's own project resolver endpoint diff --git a/openclaw-plugins/atocore-capture/handler.js b/openclaw-plugins/atocore-capture/handler.js index a03d22d..1a87ffb 100644 --- a/openclaw-plugins/atocore-capture/handler.js +++ b/openclaw-plugins/atocore-capture/handler.js @@ -1,63 +1,146 @@ /** - * AtoCore capture hook for OpenClaw. + * AtoCore OpenClaw plugin — capture + pull. * - * Listens on message:received (buffer prompt) and message:sent (POST pair). - * Fail-open: errors are caught silently. + * Two responsibilities: + * + * 1. CAPTURE (existing): On before_agent_start, buffer the user prompt. + * On llm_output, POST prompt+response to AtoCore /interactions. + * This is the "write" side — OpenClaw turns feed AtoCore's memory. + * + * 2. PULL (Phase 1 master brain): On before_prompt_build, call AtoCore + * /context/build and inject the returned context via prependContext. + * Every OpenClaw response is automatically grounded in what AtoCore + * knows (project state, memories, relevant chunks). + * + * Fail-open throughout: AtoCore unreachable = no injection, no capture, + * never blocks the agent. */ +import { definePluginEntry } from "openclaw/plugin-sdk/core"; + const BASE_URL = process.env.ATOCORE_BASE_URL || "http://dalidou:8100"; const MIN_LEN = 15; const MAX_RESP = 50000; +const CONTEXT_TIMEOUT_MS = 6000; +const CAPTURE_TIMEOUT_MS = 10000; -let lastPrompt = null; // simple single-slot buffer +function trim(v) { return typeof v === "string" ? v.trim() : ""; } +function trunc(t, m) { return !t || t.length <= m ? t : t.slice(0, m) + "\n\n[truncated]"; } -const atocoreCaptureHook = async (event) => { - try { - if (process.env.ATOCORE_CAPTURE_DISABLED === "1") return; +function detectProject(prompt) { + const lower = (prompt || "").toLowerCase(); + const hints = [ + ["p04", "p04-gigabit"], + ["gigabit", "p04-gigabit"], + ["p05", "p05-interferometer"], + ["interferometer", "p05-interferometer"], + ["p06", "p06-polisher"], + ["polisher", "p06-polisher"], + ["fullum", "p06-polisher"], + ["abb", "abb-space"], + ["atomizer", "atomizer-v2"], + ["atocore", "atocore"], + ]; + for (const [token, proj] of hints) { + if (lower.includes(token)) return proj; + } + return ""; +} - if (event.type === "message" && event.action === "received") { - const content = (event.context?.content || "").trim(); - if (content.length >= MIN_LEN && !content.startsWith("<")) { - lastPrompt = { text: content, ts: Date.now() }; +export default definePluginEntry({ + register(api) { + const log = api.logger; + let lastPrompt = null; + + // --- PULL: inject AtoCore context into every prompt --- + api.on("before_prompt_build", async (event, ctx) => { + if (process.env.ATOCORE_PULL_DISABLED === "1") return; + const prompt = trim(event?.prompt || ""); + if (prompt.length < MIN_LEN) return; + + const project = detectProject(prompt); + + try { + const res = await fetch(BASE_URL.replace(/\/$/, "") + "/context/build", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ prompt, project }), + signal: AbortSignal.timeout(CONTEXT_TIMEOUT_MS), + }); + if (!res.ok) { + log.info("atocore-pull:http_error", { status: res.status }); + return; + } + const data = await res.json(); + const contextPack = data.formatted_context || ""; + if (!contextPack.trim()) return; + + log.info("atocore-pull:injected", { + project: project || "(none)", + chars: contextPack.length, + }); + + return { + prependContext: + "--- AtoCore Context (auto-injected) ---\n" + + contextPack + + "\n--- End AtoCore Context ---\n", + }; + } catch (err) { + log.info("atocore-pull:error", { error: String(err).slice(0, 200) }); } - return; - } + }); - if (event.type === "message" && event.action === "sent") { - if (!event.context?.success) return; - const response = (event.context?.content || "").trim(); - if (!response || !lastPrompt) return; - - // Discard stale prompts (>5 min old) - if (Date.now() - lastPrompt.ts > 300000) { + // --- CAPTURE: buffer user prompts on agent start --- + api.on("before_agent_start", async (event, ctx) => { + const prompt = trim(event?.prompt || event?.cleanedBody || ""); + if (prompt.length < MIN_LEN || prompt.startsWith("<")) { lastPrompt = null; return; } + lastPrompt = { text: prompt, sessionKey: ctx?.sessionKey || "", ts: Date.now() }; + log.info("atocore-capture:prompt_buffered", { len: prompt.length }); + }); + + // --- CAPTURE: send completed turns to AtoCore --- + api.on("llm_output", async (event, ctx) => { + if (!lastPrompt) return; + const texts = Array.isArray(event?.assistantTexts) ? event.assistantTexts : []; + const response = trunc(trim(texts.join("\n\n")), MAX_RESP); + if (!response) return; const prompt = lastPrompt.text; + const sessionKey = lastPrompt.sessionKey || ctx?.sessionKey || ""; + const project = detectProject(prompt); lastPrompt = null; - const body = JSON.stringify({ - prompt, - response: response.length > MAX_RESP - ? response.slice(0, MAX_RESP) + "\n\n[truncated]" - : response, - client: "openclaw", - session_id: event.sessionKey || "", - project: "", - reinforce: true, + log.info("atocore-capture:posting", { + promptLen: prompt.length, + responseLen: response.length, + project: project || "(none)", }); fetch(BASE_URL.replace(/\/$/, "") + "/interactions", { method: "POST", headers: { "Content-Type": "application/json" }, - body, - signal: AbortSignal.timeout(10000), - }).catch(() => {}); - } - } catch { - // fail-open: never crash the gateway - } -}; + body: JSON.stringify({ + prompt, + response, + client: "openclaw", + session_id: sessionKey, + project, + reinforce: true, + }), + signal: AbortSignal.timeout(CAPTURE_TIMEOUT_MS), + }).then(res => { + log.info("atocore-capture:posted", { status: res.status }); + }).catch(err => { + log.warn("atocore-capture:post_error", { error: String(err).slice(0, 200) }); + }); + }); -export default atocoreCaptureHook; + api.on("session_end", async () => { + lastPrompt = null; + }); + } +}); diff --git a/scripts/atocore_proxy.py b/scripts/atocore_proxy.py new file mode 100644 index 0000000..1defebd --- /dev/null +++ b/scripts/atocore_proxy.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +"""AtoCore Proxy — OpenAI-compatible HTTP middleware. + +Acts as a drop-in layer for any client that speaks the OpenAI Chat +Completions API (Codex, Ollama, LiteLLM, custom agents). Sits between +the client and the real model provider: + + client -> atocore_proxy -> real_provider (OpenAI, Ollama, Anthropic, ...) + +For each chat completion request: + 1. Extract the user's last message as the "query" + 2. Call AtoCore /context/build to get a context pack + 3. Inject the pack as a system message (or prepend to existing system) + 4. Forward the enriched request to the real provider + 5. Capture the full interaction back to AtoCore /interactions + +Fail-open: if AtoCore is unreachable, the request passes through +unchanged. If the real provider fails, the error is propagated to the +client as-is. + +Configuration (env vars): + ATOCORE_URL AtoCore base URL (default http://dalidou:8100) + ATOCORE_UPSTREAM real provider base URL (e.g. http://localhost:11434/v1 for Ollama) + ATOCORE_PROXY_PORT port to listen on (default 11435) + ATOCORE_PROXY_HOST bind address (default 127.0.0.1) + ATOCORE_CLIENT_LABEL client id recorded in captures (default "proxy") + ATOCORE_CAPTURE "1" to capture interactions back (default "1") + ATOCORE_INJECT "1" to inject context (default "1") + +Usage: + # Proxy for Ollama: + ATOCORE_UPSTREAM=http://localhost:11434/v1 python atocore_proxy.py + + # Then point your client at http://localhost:11435/v1 instead of the + # real provider. + +Stdlib only — deliberate to keep the dependency footprint at zero. +""" + +from __future__ import annotations + +import http.server +import json +import os +import socketserver +import sys +import threading +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + +ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100").rstrip("/") +UPSTREAM_URL = os.environ.get("ATOCORE_UPSTREAM", "").rstrip("/") +PROXY_PORT = int(os.environ.get("ATOCORE_PROXY_PORT", "11435")) +PROXY_HOST = os.environ.get("ATOCORE_PROXY_HOST", "127.0.0.1") +CLIENT_LABEL = os.environ.get("ATOCORE_CLIENT_LABEL", "proxy") +CAPTURE_ENABLED = os.environ.get("ATOCORE_CAPTURE", "1") == "1" +INJECT_ENABLED = os.environ.get("ATOCORE_INJECT", "1") == "1" +ATOCORE_TIMEOUT = float(os.environ.get("ATOCORE_TIMEOUT", "6")) +UPSTREAM_TIMEOUT = float(os.environ.get("ATOCORE_UPSTREAM_TIMEOUT", "300")) + +PROJECT_HINTS = [ + ("p04-gigabit", ["p04", "gigabit"]), + ("p05-interferometer", ["p05", "interferometer"]), + ("p06-polisher", ["p06", "polisher", "fullum"]), + ("abb-space", ["abb"]), + ("atomizer-v2", ["atomizer"]), + ("atocore", ["atocore", "dalidou"]), +] + + +def log(msg: str) -> None: + print(f"[atocore-proxy] {msg}", file=sys.stderr, flush=True) + + +def detect_project(text: str) -> str: + lower = (text or "").lower() + for proj, tokens in PROJECT_HINTS: + if any(t in lower for t in tokens): + return proj + return "" + + +def get_last_user_message(body: dict) -> str: + messages = body.get("messages", []) or [] + for m in reversed(messages): + if m.get("role") == "user": + content = m.get("content", "") + if isinstance(content, list): + # OpenAI multi-part content: extract text parts + parts = [p.get("text", "") for p in content if p.get("type") == "text"] + return "\n".join(parts) + return str(content) + return "" + + +def get_assistant_text(response: dict) -> str: + """Extract assistant text from an OpenAI-style completion response.""" + choices = response.get("choices", []) or [] + if not choices: + return "" + msg = choices[0].get("message", {}) or {} + content = msg.get("content", "") + if isinstance(content, list): + parts = [p.get("text", "") for p in content if p.get("type") == "text"] + return "\n".join(parts) + return str(content) + + +def fetch_context(query: str, project: str) -> str: + """Pull a context pack from AtoCore. Returns '' on any failure.""" + if not INJECT_ENABLED or not query: + return "" + try: + data = json.dumps({"prompt": query, "project": project}).encode("utf-8") + req = urllib.request.Request( + ATOCORE_URL + "/context/build", + data=data, + method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=ATOCORE_TIMEOUT) as resp: + result = json.loads(resp.read().decode("utf-8")) + return result.get("formatted_context", "") or "" + except Exception as e: + log(f"context fetch failed: {type(e).__name__}: {e}") + return "" + + +def capture_interaction(prompt: str, response: str, project: str) -> None: + """POST the completed turn back to AtoCore. Fire-and-forget.""" + if not CAPTURE_ENABLED or not prompt or not response: + return + + def _post(): + try: + data = json.dumps({ + "prompt": prompt, + "response": response, + "client": CLIENT_LABEL, + "project": project, + "reinforce": True, + }).encode("utf-8") + req = urllib.request.Request( + ATOCORE_URL + "/interactions", + data=data, + method="POST", + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=ATOCORE_TIMEOUT) + except Exception as e: + log(f"capture failed: {type(e).__name__}: {e}") + + threading.Thread(target=_post, daemon=True).start() + + +def inject_context(body: dict, context_pack: str) -> dict: + """Prepend the AtoCore context as a system message, or augment existing.""" + if not context_pack.strip(): + return body + header = "--- AtoCore Context (auto-injected) ---\n" + footer = "\n--- End AtoCore Context ---\n" + injection = header + context_pack + footer + + messages = list(body.get("messages", []) or []) + if messages and messages[0].get("role") == "system": + # Augment existing system message + existing = messages[0].get("content", "") or "" + if isinstance(existing, list): + # multi-part: prepend a text part + messages[0]["content"] = [{"type": "text", "text": injection}] + existing + else: + messages[0]["content"] = injection + "\n" + str(existing) + else: + messages.insert(0, {"role": "system", "content": injection}) + + body["messages"] = messages + return body + + +def forward_to_upstream(body: dict, headers: dict[str, str], path: str) -> tuple[int, dict]: + """Forward the enriched body to the upstream provider. Returns (status, response_dict).""" + if not UPSTREAM_URL: + return 503, {"error": {"message": "ATOCORE_UPSTREAM not configured"}} + url = UPSTREAM_URL + path + data = json.dumps(body).encode("utf-8") + # Strip hop-by-hop / host-specific headers + fwd_headers = {"Content-Type": "application/json"} + for k, v in headers.items(): + lk = k.lower() + if lk in ("authorization", "x-api-key", "anthropic-version"): + fwd_headers[k] = v + req = urllib.request.Request(url, data=data, method="POST", headers=fwd_headers) + try: + with urllib.request.urlopen(req, timeout=UPSTREAM_TIMEOUT) as resp: + return resp.status, json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as e: + try: + body_bytes = e.read() + payload = json.loads(body_bytes.decode("utf-8")) + except Exception: + payload = {"error": {"message": f"upstream HTTP {e.code}"}} + return e.code, payload + except Exception as e: + log(f"upstream error: {e}") + return 502, {"error": {"message": f"upstream unreachable: {e}"}} + + +class ProxyHandler(http.server.BaseHTTPRequestHandler): + # Silence default request logging (we log what matters ourselves) + def log_message(self, format: str, *args: Any) -> None: + pass + + def _read_body(self) -> dict: + length = int(self.headers.get("Content-Length", "0") or "0") + if length <= 0: + return {} + raw = self.rfile.read(length) + try: + return json.loads(raw.decode("utf-8")) + except Exception: + return {} + + def _send_json(self, status: int, payload: dict) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(body) + + def do_OPTIONS(self) -> None: # CORS preflight + self.send_response(204) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-API-Key") + self.end_headers() + + def do_GET(self) -> None: + parsed = urllib.parse.urlparse(self.path) + if parsed.path == "/healthz": + self._send_json(200, { + "status": "ok", + "atocore": ATOCORE_URL, + "upstream": UPSTREAM_URL or "(not configured)", + "inject": INJECT_ENABLED, + "capture": CAPTURE_ENABLED, + }) + return + # Pass through GET to upstream (model listing etc) + if not UPSTREAM_URL: + self._send_json(503, {"error": {"message": "ATOCORE_UPSTREAM not configured"}}) + return + try: + req = urllib.request.Request(UPSTREAM_URL + parsed.path + (f"?{parsed.query}" if parsed.query else "")) + for k in ("Authorization", "X-API-Key"): + v = self.headers.get(k) + if v: + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=UPSTREAM_TIMEOUT) as resp: + data = resp.read() + self.send_response(resp.status) + self.send_header("Content-Type", resp.headers.get("Content-Type", "application/json")) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + except Exception as e: + self._send_json(502, {"error": {"message": f"upstream error: {e}"}}) + + def do_POST(self) -> None: + parsed = urllib.parse.urlparse(self.path) + body = self._read_body() + + # Only enrich chat completions; other endpoints pass through + if parsed.path.endswith("/chat/completions") or parsed.path == "/v1/chat/completions": + prompt = get_last_user_message(body) + project = detect_project(prompt) + context = fetch_context(prompt, project) if prompt else "" + if context: + log(f"inject: project={project or '(none)'} chars={len(context)}") + body = inject_context(body, context) + + status, response = forward_to_upstream(body, dict(self.headers), parsed.path) + self._send_json(status, response) + + if status == 200: + assistant_text = get_assistant_text(response) + capture_interaction(prompt, assistant_text, project) + else: + # Non-chat endpoints (embeddings, completions, etc.) — pure passthrough + status, response = forward_to_upstream(body, dict(self.headers), parsed.path) + self._send_json(status, response) + + +class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + daemon_threads = True + allow_reuse_address = True + + +def main() -> int: + if not UPSTREAM_URL: + log("WARNING: ATOCORE_UPSTREAM not set. Chat completions will fail.") + log("Example: ATOCORE_UPSTREAM=http://localhost:11434/v1 for Ollama") + server = ThreadedServer((PROXY_HOST, PROXY_PORT), ProxyHandler) + log(f"listening on {PROXY_HOST}:{PROXY_PORT}") + log(f"AtoCore: {ATOCORE_URL} inject={INJECT_ENABLED} capture={CAPTURE_ENABLED}") + log(f"Upstream: {UPSTREAM_URL or '(not configured)'}") + log(f"Client label: {CLIENT_LABEL}") + log("Ready. Point your OpenAI-compatible client at /v1/chat/completions") + try: + server.serve_forever() + except KeyboardInterrupt: + log("stopping") + server.server_close() + return 0 + + +if __name__ == "__main__": + sys.exit(main())