2 Commits

Author SHA1 Message Date
86637f8eee feat: universal LLM consumption (Phase 1 complete)
Completes the Phase 1 master brain keystone: every LLM interaction
across the ecosystem now pulls context from AtoCore automatically.

Three adapters, one HTTP backend:

1. OpenClaw plugin pull (handler.js):
   - Added before_prompt_build hook that calls /context/build and
     injects the pack via prependContext
   - Existing capture hooks (before_agent_start + llm_output)
     unchanged
   - 6s context timeout, fail-open on AtoCore unreachable
   - Deployed to T420, gateway restarted, "7 plugins loaded"

2. atocore-proxy (scripts/atocore_proxy.py):
   - Stdlib-only OpenAI-compatible HTTP middleware
   - Drop-in layer for Codex, Ollama, LiteLLM, any OpenAI-compat client
   - Intercepts /chat/completions: extracts query, pulls context,
     injects as system message, forwards to upstream, captures back
   - Fail-open: AtoCore down = passthrough without injection
   - Configurable via env: UPSTREAM, PORT, CLIENT_LABEL, INJECT, CAPTURE

3. (from prior commit c49363f) atocore-mcp:
   - stdio MCP server, stdlib Python, 7 tools exposed
   - Registered in Claude Code: "✓ Connected"

Plus quick win:
- Project synthesis moved from Sunday-only to daily cron so wiki /
  mirror pages stay fresh (Step C in batch-extract.sh). Lint stays
  weekly.

Plus docs:
- docs/universal-consumption.md: configuration guide for all 3 adapters
  with registration/env-var tables and verification checklist

Plus housekeeping:
- .gitignore: add .mypy_cache/

Tests: 303/303 passing.

This closes the consumption gap: the reinforcement feedback loop
can now actually work (memories get injected → get referenced →
reinforcement fires → auto-promotion). Every Claude, OpenClaw,
Codex, or Ollama session is automatically AtoCore-grounded.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 20:14:25 -04:00
c49363fccc feat: atocore-mcp server for universal LLM consumption (Phase 1)
Stdlib-only Python stdio MCP server that wraps the AtoCore HTTP
API. Makes AtoCore available as built-in tools to every MCP-aware
client (Claude Desktop, Claude Code, Cursor, Zed, Windsurf).

7 tools exposed:
- atocore_context: full context pack (state + memories + chunks)
- atocore_search: semantic retrieval with scores + sources
- atocore_memory_list: filter active memories by project/type
- atocore_memory_create: propose a candidate memory
- atocore_project_state: query Trusted Project State by category
- atocore_projects: list registered projects + aliases
- atocore_health: service status check

Design choices:
- stdlib only (no mcp SDK dep) — AtoCore philosophy
- Thin HTTP passthrough — zero business logic, zero drift risk
- Fail-open: AtoCore unreachable returns graceful error, not crash
- Protocol MCP 2024-11-05 compatible

Registered in Claude Code: `claude mcp add atocore -- python ...`
Verified: ✓ Connected, all 7 tools exposed, context/search/state
return live data from Dalidou (sha=775960c8, vectors=33253).

This is the keystone for master brain vision: every Claude session
now has AtoCore available as built-in capability without the user
or agent having to remember to invoke it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 20:08:20 -04:00
6 changed files with 1205 additions and 46 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ __pycache__/
dist/
build/
.pytest_cache/
.mypy_cache/
htmlcov/
.coverage
venv/

View File

@@ -65,15 +65,16 @@ python3 "$APP_DIR/scripts/auto_promote_reinforced.py" \
log "WARN: auto-promote/expire failed (non-blocking)"
}
# Step C: Weekly synthesis (Sundays only)
if [[ "$(date -u +%u)" == "7" ]]; then
log "Step C: weekly project synthesis"
python3 "$APP_DIR/scripts/synthesize_projects.py" \
# Step C: Daily project synthesis (keeps wiki/mirror pages fresh)
log "Step C: project synthesis (daily)"
python3 "$APP_DIR/scripts/synthesize_projects.py" \
--base-url "$ATOCORE_URL" \
2>&1 || {
log "WARN: synthesis failed (non-blocking)"
}
}
# Step D: Weekly lint pass (Sundays only — heavier, not needed daily)
if [[ "$(date -u +%u)" == "7" ]]; then
log "Step D: weekly lint pass"
python3 "$APP_DIR/scripts/lint_knowledge_base.py" \
--base-url "$ATOCORE_URL" \

View File

@@ -0,0 +1,274 @@
# Universal Consumption — Connecting LLM Clients to AtoCore
Phase 1 of the Master Brain plan. Every LLM interaction across the ecosystem
pulls context from AtoCore automatically, without the user or agent having
to remember to ask for it.
## Architecture
```
┌─────────────────────┐
│ AtoCore HTTP API │ ← single source of truth
│ http://dalidou:8100│
└──────────┬──────────┘
┌────────────────────┼────────────────────┐
│ │ │
┌───┴────┐ ┌─────┴────┐ ┌────┴────┐
│ MCP │ │ OpenClaw │ │ HTTP │
│ server │ │ plugin │ │ proxy │
└───┬────┘ └──────┬───┘ └────┬────┘
│ │ │
Claude/Cursor/ OpenClaw Codex/Ollama/
Zed/Windsurf any OpenAI-compat client
```
Three adapters, one HTTP backend. Each adapter is a thin passthrough — no
business logic duplicated.
---
## Adapter 1: MCP Server (Claude Desktop, Claude Code, Cursor, Zed, Windsurf)
The MCP server is `scripts/atocore_mcp.py` — stdlib-only Python, stdio
transport, wraps the HTTP API. Claude-family clients see AtoCore as built-in
tools just like `Read` or `Bash`.
### Tools exposed
- **`atocore_context`** (most important): Full context pack for a query —
Trusted Project State + memories + retrieved chunks. Use at the start of
any project-related conversation to ground it.
- **`atocore_search`**: Semantic search over ingested documents (top-K chunks).
- **`atocore_memory_list`**: List active memories, filterable by project + type.
- **`atocore_memory_create`**: Propose a candidate memory (enters triage queue).
- **`atocore_project_state`**: Get Trusted Project State entries by category.
- **`atocore_projects`**: List registered projects + aliases.
- **`atocore_health`**: Service status check.
### Registration
#### Claude Code (CLI)
```bash
claude mcp add atocore -- python C:/Users/antoi/ATOCore/scripts/atocore_mcp.py
claude mcp list # verify: "atocore ... ✓ Connected"
```
#### Claude Desktop (GUI)
Edit `~/Library/Application Support/Claude/claude_desktop_config.json`
(macOS) or `%APPDATA%\Claude\claude_desktop_config.json` (Windows):
```json
{
"mcpServers": {
"atocore": {
"command": "python",
"args": ["C:/Users/antoi/ATOCore/scripts/atocore_mcp.py"],
"env": {
"ATOCORE_URL": "http://dalidou:8100"
}
}
}
}
```
Restart Claude Desktop.
#### Cursor / Zed / Windsurf
Similar JSON config in each tool's MCP settings. Consult their docs —
the config schema is standard MCP.
### Configuration
Environment variables the MCP server honors:
| Var | Default | Purpose |
|---|---|---|
| `ATOCORE_URL` | `http://dalidou:8100` | Where to reach AtoCore |
| `ATOCORE_TIMEOUT` | `10` | Per-request HTTP timeout (seconds) |
### Behavior
- Fail-open: if Dalidou is unreachable, tools return "AtoCore unavailable"
error messages but don't crash the client.
- Zero business logic: every tool is a direct HTTP passthrough.
- stdlib only: no MCP SDK dependency.
---
## Adapter 2: OpenClaw Plugin (`openclaw-plugins/atocore-capture/handler.js`)
The plugin on T420 OpenClaw has two responsibilities:
1. **CAPTURE**: On `before_agent_start` + `llm_output`, POST completed turns
to AtoCore `/interactions` (existing).
2. **PULL**: On `before_prompt_build`, call `/context/build` and inject the
context pack via `prependContext` so the agent's system prompt includes
AtoCore knowledge.
### Deployment
The plugin is loaded from
`/tmp/atocore-openclaw-capture-plugin/openclaw-plugins/atocore-capture/`
on the T420 (per OpenClaw's plugin config at `~/.openclaw/openclaw.json`).
To update:
```bash
scp openclaw-plugins/atocore-capture/handler.js \
papa@192.168.86.39:/tmp/atocore-openclaw-capture-plugin/openclaw-plugins/atocore-capture/index.js
ssh papa@192.168.86.39 'systemctl --user restart openclaw-gateway'
```
Verify in gateway logs: look for "ready (7 plugins: acpx, atocore-capture, ...)"
### Configuration (env vars set on T420)
| Var | Default | Purpose |
|---|---|---|
| `ATOCORE_BASE_URL` | `http://dalidou:8100` | AtoCore HTTP endpoint |
| `ATOCORE_PULL_DISABLED` | (unset) | Set to `1` to disable context pull |
### Behavior
- Fail-open: AtoCore unreachable = no injection, no capture, agent runs
normally.
- 6s timeout on context pull, 10s on capture — won't stall the agent.
- Context pack prepended as a clearly-bracketed block so the agent can see
it's auto-injected grounding info.
---
## Adapter 3: HTTP Proxy (`scripts/atocore_proxy.py`)
A stdlib-only OpenAI-compatible HTTP proxy. Sits between any
OpenAI-API-speaking client and the real provider, enriches every
`/chat/completions` request with AtoCore context.
Works with:
- **Codex CLI** (OpenAI-compatible endpoint)
- **Ollama** (has OpenAI-compatible `/v1` endpoint since 0.1.24)
- **LiteLLM**, **llama.cpp server**, custom agents
- Anything that can be pointed at a custom base URL
### Start it
```bash
# For Ollama (local models):
ATOCORE_UPSTREAM=http://localhost:11434/v1 \
python scripts/atocore_proxy.py
# For OpenAI cloud:
ATOCORE_UPSTREAM=https://api.openai.com/v1 \
ATOCORE_CLIENT_LABEL=codex \
python scripts/atocore_proxy.py
# Test:
curl http://127.0.0.1:11435/healthz
```
### Point a client at it
Set the client's OpenAI base URL to `http://127.0.0.1:11435/v1`.
#### Ollama example:
```bash
OPENAI_BASE_URL=http://127.0.0.1:11435/v1 \
some-openai-client --model llama3:8b
```
#### Codex CLI:
Set `OPENAI_BASE_URL=http://127.0.0.1:11435/v1` in your codex config.
### Configuration
| Var | Default | Purpose |
|---|---|---|
| `ATOCORE_URL` | `http://dalidou:8100` | AtoCore HTTP endpoint |
| `ATOCORE_UPSTREAM` | (required) | Real provider base URL |
| `ATOCORE_PROXY_PORT` | `11435` | Proxy listen port |
| `ATOCORE_PROXY_HOST` | `127.0.0.1` | Proxy bind address |
| `ATOCORE_CLIENT_LABEL` | `proxy` | Client id in captures |
| `ATOCORE_INJECT` | `1` | Inject context (set `0` to disable) |
| `ATOCORE_CAPTURE` | `1` | Capture interactions (set `0` to disable) |
### Behavior
- GET requests (model listing etc) pass through unchanged
- POST to `/chat/completions` (or `/v1/chat/completions`) gets enriched:
1. Last user message extracted as query
2. AtoCore `/context/build` called with 6s timeout
3. Pack injected as system message (or prepended to existing system)
4. Enriched body forwarded to upstream
5. After success, interaction POSTed to `/interactions` in background
- Fail-open: AtoCore unreachable = pass through without injection
- Streaming responses: currently buffered (not true stream). Good enough for
most cases; can be upgraded later if needed.
### Running as a service
On Linux, create `~/.config/systemd/user/atocore-proxy.service`:
```ini
[Unit]
Description=AtoCore HTTP proxy
[Service]
Environment=ATOCORE_UPSTREAM=http://localhost:11434/v1
Environment=ATOCORE_CLIENT_LABEL=ollama
ExecStart=/usr/bin/python3 /path/to/scripts/atocore_proxy.py
Restart=on-failure
[Install]
WantedBy=default.target
```
Then: `systemctl --user enable --now atocore-proxy`
On Windows, register via Task Scheduler (similar pattern to backup task)
or use NSSM to install as a service.
---
## Verification Checklist
Fresh end-to-end test to confirm Phase 1 is working:
### For Claude Code (MCP)
1. Open a new Claude Code session (not this one).
2. Ask: "what do we know about p06 polisher's control architecture?"
3. Claude should invoke `atocore_context` or `atocore_project_state`
on its own and answer grounded in AtoCore data.
### For OpenClaw (plugin pull)
1. Send a Discord message to OpenClaw: "what's the status on p04?"
2. Check T420 logs: `journalctl --user -u openclaw-gateway --since "1 min ago" | grep atocore-pull`
3. Expect: `atocore-pull:injected project=p04-gigabit chars=NNN`
### For proxy (any OpenAI-compat client)
1. Start proxy with appropriate upstream
2. Run a client query through it
3. Check stderr: `[atocore-proxy] inject: project=... chars=...`
4. Check `curl http://127.0.0.1:8100/interactions?client=proxy` — should
show the captured turn
---
## Why not just MCP everywhere?
MCP is great for Claude-family clients but:
- Not supported natively by Codex CLI, Ollama, or OpenAI's own API
- No universal "attach MCP" mechanism in all LLM runtimes
- HTTP APIs are truly universal
HTTP API is the truth, each adapter is the thinnest possible shim for its
ecosystem. When new adapters are needed (Gemini CLI, Claude Code plugin
system, etc.), they follow the same pattern.
---
## Future enhancements
- **Streaming passthrough** in the proxy (currently buffered for simplicity)
- **Response grounding check**: parse assistant output for references to
injected context, count reinforcement events
- **Per-client metrics** in the dashboard: how often each client pulls,
context pack size, injection rate
- **Smart project detection**: today we use keyword matching; could use
AtoCore's own project resolver endpoint

View File

@@ -1,63 +1,146 @@
/**
* AtoCore capture hook for OpenClaw.
* AtoCore OpenClaw plugin — capture + pull.
*
* Listens on message:received (buffer prompt) and message:sent (POST pair).
* Fail-open: errors are caught silently.
* Two responsibilities:
*
* 1. CAPTURE (existing): On before_agent_start, buffer the user prompt.
* On llm_output, POST prompt+response to AtoCore /interactions.
* This is the "write" side — OpenClaw turns feed AtoCore's memory.
*
* 2. PULL (Phase 1 master brain): On before_prompt_build, call AtoCore
* /context/build and inject the returned context via prependContext.
* Every OpenClaw response is automatically grounded in what AtoCore
* knows (project state, memories, relevant chunks).
*
* Fail-open throughout: AtoCore unreachable = no injection, no capture,
* never blocks the agent.
*/
import { definePluginEntry } from "openclaw/plugin-sdk/core";
const BASE_URL = process.env.ATOCORE_BASE_URL || "http://dalidou:8100";
const MIN_LEN = 15;
const MAX_RESP = 50000;
const CONTEXT_TIMEOUT_MS = 6000;
const CAPTURE_TIMEOUT_MS = 10000;
let lastPrompt = null; // simple single-slot buffer
function trim(v) { return typeof v === "string" ? v.trim() : ""; }
function trunc(t, m) { return !t || t.length <= m ? t : t.slice(0, m) + "\n\n[truncated]"; }
const atocoreCaptureHook = async (event) => {
try {
if (process.env.ATOCORE_CAPTURE_DISABLED === "1") return;
if (event.type === "message" && event.action === "received") {
const content = (event.context?.content || "").trim();
if (content.length >= MIN_LEN && !content.startsWith("<")) {
lastPrompt = { text: content, ts: Date.now() };
function detectProject(prompt) {
const lower = (prompt || "").toLowerCase();
const hints = [
["p04", "p04-gigabit"],
["gigabit", "p04-gigabit"],
["p05", "p05-interferometer"],
["interferometer", "p05-interferometer"],
["p06", "p06-polisher"],
["polisher", "p06-polisher"],
["fullum", "p06-polisher"],
["abb", "abb-space"],
["atomizer", "atomizer-v2"],
["atocore", "atocore"],
];
for (const [token, proj] of hints) {
if (lower.includes(token)) return proj;
}
return "";
}
export default definePluginEntry({
register(api) {
const log = api.logger;
let lastPrompt = null;
// --- PULL: inject AtoCore context into every prompt ---
api.on("before_prompt_build", async (event, ctx) => {
if (process.env.ATOCORE_PULL_DISABLED === "1") return;
const prompt = trim(event?.prompt || "");
if (prompt.length < MIN_LEN) return;
const project = detectProject(prompt);
try {
const res = await fetch(BASE_URL.replace(/\/$/, "") + "/context/build", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt, project }),
signal: AbortSignal.timeout(CONTEXT_TIMEOUT_MS),
});
if (!res.ok) {
log.info("atocore-pull:http_error", { status: res.status });
return;
}
const data = await res.json();
const contextPack = data.formatted_context || "";
if (!contextPack.trim()) return;
if (event.type === "message" && event.action === "sent") {
if (!event.context?.success) return;
const response = (event.context?.content || "").trim();
if (!response || !lastPrompt) return;
log.info("atocore-pull:injected", {
project: project || "(none)",
chars: contextPack.length,
});
// Discard stale prompts (>5 min old)
if (Date.now() - lastPrompt.ts > 300000) {
return {
prependContext:
"--- AtoCore Context (auto-injected) ---\n" +
contextPack +
"\n--- End AtoCore Context ---\n",
};
} catch (err) {
log.info("atocore-pull:error", { error: String(err).slice(0, 200) });
}
});
// --- CAPTURE: buffer user prompts on agent start ---
api.on("before_agent_start", async (event, ctx) => {
const prompt = trim(event?.prompt || event?.cleanedBody || "");
if (prompt.length < MIN_LEN || prompt.startsWith("<")) {
lastPrompt = null;
return;
}
lastPrompt = { text: prompt, sessionKey: ctx?.sessionKey || "", ts: Date.now() };
log.info("atocore-capture:prompt_buffered", { len: prompt.length });
});
// --- CAPTURE: send completed turns to AtoCore ---
api.on("llm_output", async (event, ctx) => {
if (!lastPrompt) return;
const texts = Array.isArray(event?.assistantTexts) ? event.assistantTexts : [];
const response = trunc(trim(texts.join("\n\n")), MAX_RESP);
if (!response) return;
const prompt = lastPrompt.text;
const sessionKey = lastPrompt.sessionKey || ctx?.sessionKey || "";
const project = detectProject(prompt);
lastPrompt = null;
const body = JSON.stringify({
prompt,
response: response.length > MAX_RESP
? response.slice(0, MAX_RESP) + "\n\n[truncated]"
: response,
client: "openclaw",
session_id: event.sessionKey || "",
project: "",
reinforce: true,
log.info("atocore-capture:posting", {
promptLen: prompt.length,
responseLen: response.length,
project: project || "(none)",
});
fetch(BASE_URL.replace(/\/$/, "") + "/interactions", {
method: "POST",
headers: { "Content-Type": "application/json" },
body,
signal: AbortSignal.timeout(10000),
}).catch(() => {});
}
} catch {
// fail-open: never crash the gateway
}
};
body: JSON.stringify({
prompt,
response,
client: "openclaw",
session_id: sessionKey,
project,
reinforce: true,
}),
signal: AbortSignal.timeout(CAPTURE_TIMEOUT_MS),
}).then(res => {
log.info("atocore-capture:posted", { status: res.status });
}).catch(err => {
log.warn("atocore-capture:post_error", { error: String(err).slice(0, 200) });
});
});
export default atocoreCaptureHook;
api.on("session_end", async () => {
lastPrompt = null;
});
}
});

479
scripts/atocore_mcp.py Normal file
View File

@@ -0,0 +1,479 @@
#!/usr/bin/env python3
"""AtoCore MCP server — stdio transport, stdlib-only.
Exposes the AtoCore HTTP API as MCP tools so any MCP-aware client
(Claude Desktop, Claude Code, Cursor, Zed, Windsurf) can pull
context + memories automatically at prompt time.
Design:
- stdlib only (no mcp SDK dep) — MCP protocol is simple JSON-RPC
over stdio, and AtoCore's philosophy prefers stdlib.
- Thin wrapper: every tool is a direct pass-through to an HTTP
endpoint. Zero business logic here — the AtoCore server is
the single source of truth.
- Fail-open: if AtoCore is unreachable, tools return a graceful
"unavailable" message rather than crashing the client.
Protocol: MCP 2024-11-05 / 2025-03-26 compatible
https://spec.modelcontextprotocol.io/specification/
Usage (standalone test):
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"0"}}}' | python atocore_mcp.py
Register with Claude Code:
claude mcp add atocore -- python /path/to/atocore_mcp.py
Environment:
ATOCORE_URL base URL of the AtoCore HTTP API (default http://dalidou:8100)
ATOCORE_TIMEOUT per-request HTTP timeout seconds (default 10)
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
# --- Configuration ---
ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100").rstrip("/")
HTTP_TIMEOUT = float(os.environ.get("ATOCORE_TIMEOUT", "10"))
SERVER_NAME = "atocore"
SERVER_VERSION = "0.1.0"
PROTOCOL_VERSION = "2024-11-05"
# --- stderr logging (stdout is reserved for JSON-RPC) ---
def log(msg: str) -> None:
print(f"[atocore-mcp] {msg}", file=sys.stderr, flush=True)
# --- HTTP helpers ---
def http_get(path: str, params: dict | None = None) -> dict:
"""GET a JSON response from AtoCore. Raises on HTTP error."""
url = ATOCORE_URL + path
if params:
# Drop empty params so the URL stays clean
clean = {k: v for k, v in params.items() if v not in (None, "", [], {})}
if clean:
url += "?" + urllib.parse.urlencode(clean)
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
return json.loads(resp.read().decode("utf-8"))
def http_post(path: str, body: dict) -> dict:
url = ATOCORE_URL + path
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
url, data=data, method="POST",
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
return json.loads(resp.read().decode("utf-8"))
def safe_call(fn, *args, **kwargs) -> tuple[dict | None, str | None]:
"""Run an HTTP call, return (result, error_message_or_None)."""
try:
return fn(*args, **kwargs), None
except urllib.error.HTTPError as e:
try:
body = e.read().decode("utf-8", errors="replace")
except Exception:
body = ""
return None, f"AtoCore HTTP {e.code}: {body[:200]}"
except urllib.error.URLError as e:
return None, f"AtoCore unreachable at {ATOCORE_URL}: {e.reason}"
except Exception as e:
return None, f"AtoCore error: {type(e).__name__}: {str(e)[:200]}"
# --- Tool definitions ---
# Each tool: name, description, inputSchema (JSON Schema), handler
def _tool_context(args: dict) -> str:
"""Build a full context pack for a query — state + memories + retrieved chunks."""
query = (args.get("query") or "").strip()
project = args.get("project") or ""
if not query:
return "Error: 'query' is required."
result, err = safe_call(http_post, "/context/build", {
"prompt": query, "project": project,
})
if err:
return f"AtoCore context unavailable: {err}"
pack = result.get("formatted_context", "") or ""
if not pack.strip():
return "(AtoCore returned an empty context pack — no matching state, memories, or chunks.)"
return pack
def _tool_search(args: dict) -> str:
"""Retrieval only — raw chunks ranked by semantic similarity."""
query = (args.get("query") or "").strip()
project = args.get("project") or ""
top_k = int(args.get("top_k") or 5)
if not query:
return "Error: 'query' is required."
result, err = safe_call(http_post, "/query", {
"prompt": query, "project": project, "top_k": top_k,
})
if err:
return f"AtoCore search unavailable: {err}"
chunks = result.get("results", []) or []
if not chunks:
return "No results."
lines = []
for i, c in enumerate(chunks, 1):
src = c.get("source_file") or c.get("title") or "unknown"
heading = c.get("heading_path") or ""
snippet = (c.get("content") or "")[:300]
score = c.get("score", 0.0)
head_str = f" ({heading})" if heading else ""
lines.append(f"[{i}] score={score:.3f} source={src}{head_str}\n{snippet}")
return "\n\n".join(lines)
def _tool_memory_list(args: dict) -> str:
"""List active memories, optionally filtered by project and type."""
params = {
"status": "active",
"limit": int(args.get("limit") or 20),
}
if args.get("project"):
params["project"] = args["project"]
if args.get("memory_type"):
params["memory_type"] = args["memory_type"]
result, err = safe_call(http_get, "/memory", params=params)
if err:
return f"AtoCore memory list unavailable: {err}"
memories = result.get("memories", []) or []
if not memories:
return "No memories match."
lines = []
for m in memories:
mt = m.get("memory_type", "?")
proj = m.get("project") or "(global)"
conf = m.get("confidence", 0.0)
refs = m.get("reference_count", 0)
content = (m.get("content") or "")[:250]
lines.append(f"[{mt}/{proj}] conf={conf:.2f} refs={refs}\n {content}")
return "\n\n".join(lines)
def _tool_memory_create(args: dict) -> str:
"""Create a candidate memory (enters the triage queue)."""
memory_type = (args.get("memory_type") or "").strip()
content = (args.get("content") or "").strip()
project = args.get("project") or ""
confidence = float(args.get("confidence") or 0.5)
if not memory_type or not content:
return "Error: 'memory_type' and 'content' are required."
valid_types = ["identity", "preference", "project", "episodic", "knowledge", "adaptation"]
if memory_type not in valid_types:
return f"Error: memory_type must be one of {valid_types}."
result, err = safe_call(http_post, "/memory", {
"memory_type": memory_type,
"content": content,
"project": project,
"confidence": confidence,
"status": "candidate",
})
if err:
return f"AtoCore memory create failed: {err}"
mid = result.get("id", "?")
return f"Candidate memory created: id={mid} type={memory_type} project={project or '(global)'}"
def _tool_project_state(args: dict) -> str:
"""Get Trusted Project State entries for a project."""
project = (args.get("project") or "").strip()
category = args.get("category") or ""
if not project:
return "Error: 'project' is required."
path = f"/project/state/{urllib.parse.quote(project)}"
params = {"category": category} if category else None
result, err = safe_call(http_get, path, params=params)
if err:
return f"AtoCore project state unavailable: {err}"
entries = result.get("entries", []) or result.get("state", []) or []
if not entries:
return f"No state entries for project '{project}'."
lines = []
for e in entries:
cat = e.get("category", "?")
key = e.get("key", "?")
value = (e.get("value") or "")[:300]
src = e.get("source") or ""
lines.append(f"[{cat}/{key}] (source: {src})\n {value}")
return "\n\n".join(lines)
def _tool_projects(args: dict) -> str:
"""List registered AtoCore projects."""
result, err = safe_call(http_get, "/projects")
if err:
return f"AtoCore projects unavailable: {err}"
projects = result.get("projects", []) or []
if not projects:
return "No projects registered."
lines = []
for p in projects:
pid = p.get("project_id") or p.get("id") or p.get("name") or "?"
aliases = p.get("aliases", []) or []
alias_str = f" (aliases: {', '.join(aliases)})" if aliases else ""
lines.append(f"- {pid}{alias_str}")
return "\n".join(lines)
def _tool_health(args: dict) -> str:
"""Check AtoCore service health."""
result, err = safe_call(http_get, "/health")
if err:
return f"AtoCore unreachable: {err}"
sha = result.get("build_sha", "?")[:8]
vectors = result.get("vectors_count", "?")
env = result.get("env", "?")
return f"AtoCore healthy: sha={sha} vectors={vectors} env={env}"
TOOLS = [
{
"name": "atocore_context",
"description": (
"Get the full AtoCore context pack for a user query. Returns "
"Trusted Project State (high trust), relevant memories, and "
"retrieved source chunks formatted for prompt injection. "
"Use this FIRST on any project-related query to ground the "
"conversation in what AtoCore already knows."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The user's question or task"},
"project": {"type": "string", "description": "Project hint (e.g. 'p04-gigabit'); optional"},
},
"required": ["query"],
},
"handler": _tool_context,
},
{
"name": "atocore_search",
"description": (
"Semantic search over AtoCore's ingested source documents. "
"Returns top-K ranked chunks. Use this when you need raw "
"references rather than a full context pack."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"project": {"type": "string", "description": "optional project filter"},
"top_k": {"type": "integer", "minimum": 1, "maximum": 20, "default": 5},
},
"required": ["query"],
},
"handler": _tool_search,
},
{
"name": "atocore_memory_list",
"description": (
"List active memories (curated facts, decisions, preferences). "
"Filter by project and/or memory_type. Use this to inspect what "
"AtoCore currently remembers about a topic."
),
"inputSchema": {
"type": "object",
"properties": {
"project": {"type": "string"},
"memory_type": {
"type": "string",
"enum": ["identity", "preference", "project", "episodic", "knowledge", "adaptation"],
},
"limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20},
},
},
"handler": _tool_memory_list,
},
{
"name": "atocore_memory_create",
"description": (
"Propose a new memory for AtoCore. Creates a CANDIDATE that "
"enters the triage queue for human/auto review — not immediately "
"active. Use this to capture durable facts/decisions that "
"should persist across sessions. Do NOT use for transient state "
"or session-specific notes."
),
"inputSchema": {
"type": "object",
"properties": {
"memory_type": {
"type": "string",
"enum": ["identity", "preference", "project", "episodic", "knowledge", "adaptation"],
},
"content": {"type": "string", "description": "The fact/decision/preference to remember"},
"project": {"type": "string", "description": "project id if project-scoped; empty for global"},
"confidence": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.5},
},
"required": ["memory_type", "content"],
},
"handler": _tool_memory_create,
},
{
"name": "atocore_project_state",
"description": (
"Get Trusted Project State entries for a given project — the "
"highest-trust tier with curated decisions, requirements, "
"facts, contacts, milestones. Use this to look up authoritative "
"project info."
),
"inputSchema": {
"type": "object",
"properties": {
"project": {"type": "string"},
"category": {
"type": "string",
"enum": ["status", "decision", "requirement", "contact", "milestone", "fact", "config"],
},
},
"required": ["project"],
},
"handler": _tool_project_state,
},
{
"name": "atocore_projects",
"description": "List all registered AtoCore projects (id + aliases).",
"inputSchema": {"type": "object", "properties": {}},
"handler": _tool_projects,
},
{
"name": "atocore_health",
"description": "Check AtoCore service health (build SHA, vector count, env).",
"inputSchema": {"type": "object", "properties": {}},
"handler": _tool_health,
},
]
# --- JSON-RPC handlers ---
def handle_initialize(params: dict) -> dict:
return {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"tools": {"listChanged": False},
},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
}
def handle_tools_list(params: dict) -> dict:
return {
"tools": [
{"name": t["name"], "description": t["description"], "inputSchema": t["inputSchema"]}
for t in TOOLS
]
}
def handle_tools_call(params: dict) -> dict:
tool_name = params.get("name", "")
args = params.get("arguments", {}) or {}
tool = next((t for t in TOOLS if t["name"] == tool_name), None)
if tool is None:
return {
"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
"isError": True,
}
try:
text = tool["handler"](args)
except Exception as e:
log(f"tool {tool_name} raised: {e}")
return {
"content": [{"type": "text", "text": f"Tool error: {type(e).__name__}: {e}"}],
"isError": True,
}
return {"content": [{"type": "text", "text": text}]}
def handle_ping(params: dict) -> dict:
return {}
METHODS = {
"initialize": handle_initialize,
"tools/list": handle_tools_list,
"tools/call": handle_tools_call,
"ping": handle_ping,
}
# --- stdio main loop ---
def send(obj: dict) -> None:
"""Write a single-line JSON message to stdout and flush."""
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
def make_response(req_id, result=None, error=None) -> dict:
resp = {"jsonrpc": "2.0", "id": req_id}
if error is not None:
resp["error"] = error
else:
resp["result"] = result if result is not None else {}
return resp
def main() -> int:
log(f"starting (AtoCore at {ATOCORE_URL})")
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError as e:
log(f"parse error: {e}")
continue
method = msg.get("method", "")
req_id = msg.get("id")
params = msg.get("params", {}) or {}
# Notifications (no id) don't need a response
if req_id is None:
if method == "notifications/initialized":
log("client initialized")
continue
handler = METHODS.get(method)
if handler is None:
send(make_response(req_id, error={
"code": -32601,
"message": f"Method not found: {method}",
}))
continue
try:
result = handler(params)
send(make_response(req_id, result=result))
except Exception as e:
log(f"handler {method} raised: {e}")
send(make_response(req_id, error={
"code": -32603,
"message": f"Internal error: {type(e).__name__}: {e}",
}))
log("stdin closed, exiting")
return 0
if __name__ == "__main__":
sys.exit(main())

321
scripts/atocore_proxy.py Normal file
View File

@@ -0,0 +1,321 @@
#!/usr/bin/env python3
"""AtoCore Proxy — OpenAI-compatible HTTP middleware.
Acts as a drop-in layer for any client that speaks the OpenAI Chat
Completions API (Codex, Ollama, LiteLLM, custom agents). Sits between
the client and the real model provider:
client -> atocore_proxy -> real_provider (OpenAI, Ollama, Anthropic, ...)
For each chat completion request:
1. Extract the user's last message as the "query"
2. Call AtoCore /context/build to get a context pack
3. Inject the pack as a system message (or prepend to existing system)
4. Forward the enriched request to the real provider
5. Capture the full interaction back to AtoCore /interactions
Fail-open: if AtoCore is unreachable, the request passes through
unchanged. If the real provider fails, the error is propagated to the
client as-is.
Configuration (env vars):
ATOCORE_URL AtoCore base URL (default http://dalidou:8100)
ATOCORE_UPSTREAM real provider base URL (e.g. http://localhost:11434/v1 for Ollama)
ATOCORE_PROXY_PORT port to listen on (default 11435)
ATOCORE_PROXY_HOST bind address (default 127.0.0.1)
ATOCORE_CLIENT_LABEL client id recorded in captures (default "proxy")
ATOCORE_CAPTURE "1" to capture interactions back (default "1")
ATOCORE_INJECT "1" to inject context (default "1")
Usage:
# Proxy for Ollama:
ATOCORE_UPSTREAM=http://localhost:11434/v1 python atocore_proxy.py
# Then point your client at http://localhost:11435/v1 instead of the
# real provider.
Stdlib only — deliberate to keep the dependency footprint at zero.
"""
from __future__ import annotations
import http.server
import json
import os
import socketserver
import sys
import threading
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100").rstrip("/")
UPSTREAM_URL = os.environ.get("ATOCORE_UPSTREAM", "").rstrip("/")
PROXY_PORT = int(os.environ.get("ATOCORE_PROXY_PORT", "11435"))
PROXY_HOST = os.environ.get("ATOCORE_PROXY_HOST", "127.0.0.1")
CLIENT_LABEL = os.environ.get("ATOCORE_CLIENT_LABEL", "proxy")
CAPTURE_ENABLED = os.environ.get("ATOCORE_CAPTURE", "1") == "1"
INJECT_ENABLED = os.environ.get("ATOCORE_INJECT", "1") == "1"
ATOCORE_TIMEOUT = float(os.environ.get("ATOCORE_TIMEOUT", "6"))
UPSTREAM_TIMEOUT = float(os.environ.get("ATOCORE_UPSTREAM_TIMEOUT", "300"))
PROJECT_HINTS = [
("p04-gigabit", ["p04", "gigabit"]),
("p05-interferometer", ["p05", "interferometer"]),
("p06-polisher", ["p06", "polisher", "fullum"]),
("abb-space", ["abb"]),
("atomizer-v2", ["atomizer"]),
("atocore", ["atocore", "dalidou"]),
]
def log(msg: str) -> None:
print(f"[atocore-proxy] {msg}", file=sys.stderr, flush=True)
def detect_project(text: str) -> str:
lower = (text or "").lower()
for proj, tokens in PROJECT_HINTS:
if any(t in lower for t in tokens):
return proj
return ""
def get_last_user_message(body: dict) -> str:
messages = body.get("messages", []) or []
for m in reversed(messages):
if m.get("role") == "user":
content = m.get("content", "")
if isinstance(content, list):
# OpenAI multi-part content: extract text parts
parts = [p.get("text", "") for p in content if p.get("type") == "text"]
return "\n".join(parts)
return str(content)
return ""
def get_assistant_text(response: dict) -> str:
"""Extract assistant text from an OpenAI-style completion response."""
choices = response.get("choices", []) or []
if not choices:
return ""
msg = choices[0].get("message", {}) or {}
content = msg.get("content", "")
if isinstance(content, list):
parts = [p.get("text", "") for p in content if p.get("type") == "text"]
return "\n".join(parts)
return str(content)
def fetch_context(query: str, project: str) -> str:
"""Pull a context pack from AtoCore. Returns '' on any failure."""
if not INJECT_ENABLED or not query:
return ""
try:
data = json.dumps({"prompt": query, "project": project}).encode("utf-8")
req = urllib.request.Request(
ATOCORE_URL + "/context/build",
data=data,
method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=ATOCORE_TIMEOUT) as resp:
result = json.loads(resp.read().decode("utf-8"))
return result.get("formatted_context", "") or ""
except Exception as e:
log(f"context fetch failed: {type(e).__name__}: {e}")
return ""
def capture_interaction(prompt: str, response: str, project: str) -> None:
"""POST the completed turn back to AtoCore. Fire-and-forget."""
if not CAPTURE_ENABLED or not prompt or not response:
return
def _post():
try:
data = json.dumps({
"prompt": prompt,
"response": response,
"client": CLIENT_LABEL,
"project": project,
"reinforce": True,
}).encode("utf-8")
req = urllib.request.Request(
ATOCORE_URL + "/interactions",
data=data,
method="POST",
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=ATOCORE_TIMEOUT)
except Exception as e:
log(f"capture failed: {type(e).__name__}: {e}")
threading.Thread(target=_post, daemon=True).start()
def inject_context(body: dict, context_pack: str) -> dict:
"""Prepend the AtoCore context as a system message, or augment existing."""
if not context_pack.strip():
return body
header = "--- AtoCore Context (auto-injected) ---\n"
footer = "\n--- End AtoCore Context ---\n"
injection = header + context_pack + footer
messages = list(body.get("messages", []) or [])
if messages and messages[0].get("role") == "system":
# Augment existing system message
existing = messages[0].get("content", "") or ""
if isinstance(existing, list):
# multi-part: prepend a text part
messages[0]["content"] = [{"type": "text", "text": injection}] + existing
else:
messages[0]["content"] = injection + "\n" + str(existing)
else:
messages.insert(0, {"role": "system", "content": injection})
body["messages"] = messages
return body
def forward_to_upstream(body: dict, headers: dict[str, str], path: str) -> tuple[int, dict]:
"""Forward the enriched body to the upstream provider. Returns (status, response_dict)."""
if not UPSTREAM_URL:
return 503, {"error": {"message": "ATOCORE_UPSTREAM not configured"}}
url = UPSTREAM_URL + path
data = json.dumps(body).encode("utf-8")
# Strip hop-by-hop / host-specific headers
fwd_headers = {"Content-Type": "application/json"}
for k, v in headers.items():
lk = k.lower()
if lk in ("authorization", "x-api-key", "anthropic-version"):
fwd_headers[k] = v
req = urllib.request.Request(url, data=data, method="POST", headers=fwd_headers)
try:
with urllib.request.urlopen(req, timeout=UPSTREAM_TIMEOUT) as resp:
return resp.status, json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
try:
body_bytes = e.read()
payload = json.loads(body_bytes.decode("utf-8"))
except Exception:
payload = {"error": {"message": f"upstream HTTP {e.code}"}}
return e.code, payload
except Exception as e:
log(f"upstream error: {e}")
return 502, {"error": {"message": f"upstream unreachable: {e}"}}
class ProxyHandler(http.server.BaseHTTPRequestHandler):
# Silence default request logging (we log what matters ourselves)
def log_message(self, format: str, *args: Any) -> None:
pass
def _read_body(self) -> dict:
length = int(self.headers.get("Content-Length", "0") or "0")
if length <= 0:
return {}
raw = self.rfile.read(length)
try:
return json.loads(raw.decode("utf-8"))
except Exception:
return {}
def _send_json(self, status: int, payload: dict) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def do_OPTIONS(self) -> None: # CORS preflight
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-API-Key")
self.end_headers()
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path == "/healthz":
self._send_json(200, {
"status": "ok",
"atocore": ATOCORE_URL,
"upstream": UPSTREAM_URL or "(not configured)",
"inject": INJECT_ENABLED,
"capture": CAPTURE_ENABLED,
})
return
# Pass through GET to upstream (model listing etc)
if not UPSTREAM_URL:
self._send_json(503, {"error": {"message": "ATOCORE_UPSTREAM not configured"}})
return
try:
req = urllib.request.Request(UPSTREAM_URL + parsed.path + (f"?{parsed.query}" if parsed.query else ""))
for k in ("Authorization", "X-API-Key"):
v = self.headers.get(k)
if v:
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=UPSTREAM_TIMEOUT) as resp:
data = resp.read()
self.send_response(resp.status)
self.send_header("Content-Type", resp.headers.get("Content-Type", "application/json"))
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
except Exception as e:
self._send_json(502, {"error": {"message": f"upstream error: {e}"}})
def do_POST(self) -> None:
parsed = urllib.parse.urlparse(self.path)
body = self._read_body()
# Only enrich chat completions; other endpoints pass through
if parsed.path.endswith("/chat/completions") or parsed.path == "/v1/chat/completions":
prompt = get_last_user_message(body)
project = detect_project(prompt)
context = fetch_context(prompt, project) if prompt else ""
if context:
log(f"inject: project={project or '(none)'} chars={len(context)}")
body = inject_context(body, context)
status, response = forward_to_upstream(body, dict(self.headers), parsed.path)
self._send_json(status, response)
if status == 200:
assistant_text = get_assistant_text(response)
capture_interaction(prompt, assistant_text, project)
else:
# Non-chat endpoints (embeddings, completions, etc.) — pure passthrough
status, response = forward_to_upstream(body, dict(self.headers), parsed.path)
self._send_json(status, response)
class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
daemon_threads = True
allow_reuse_address = True
def main() -> int:
if not UPSTREAM_URL:
log("WARNING: ATOCORE_UPSTREAM not set. Chat completions will fail.")
log("Example: ATOCORE_UPSTREAM=http://localhost:11434/v1 for Ollama")
server = ThreadedServer((PROXY_HOST, PROXY_PORT), ProxyHandler)
log(f"listening on {PROXY_HOST}:{PROXY_PORT}")
log(f"AtoCore: {ATOCORE_URL} inject={INJECT_ENABLED} capture={CAPTURE_ENABLED}")
log(f"Upstream: {UPSTREAM_URL or '(not configured)'}")
log(f"Client label: {CLIENT_LABEL}")
log("Ready. Point your OpenAI-compatible client at /v1/chat/completions")
try:
server.serve_forever()
except KeyboardInterrupt:
log("stopping")
server.server_close()
return 0
if __name__ == "__main__":
sys.exit(main())