diff --git a/deploy/hooks/capture_stop.py b/deploy/hooks/capture_stop.py new file mode 100644 index 0000000..e1a419b --- /dev/null +++ b/deploy/hooks/capture_stop.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +"""Claude Code Stop hook: capture interaction to AtoCore. + +Reads the Stop hook JSON from stdin, extracts the last user prompt +from the transcript JSONL, and POSTs to the AtoCore /interactions +endpoint in conservative mode (reinforce=false, no extraction). + +Fail-open: always exits 0, logs errors to stderr only. + +Environment variables: + ATOCORE_URL Base URL of the AtoCore instance (default: http://dalidou:8100) + ATOCORE_CAPTURE_DISABLED Set to "1" to disable capture (kill switch) + +Usage in ~/.claude/settings.json: + "Stop": [{ + "matcher": "", + "hooks": [{ + "type": "command", + "command": "python /path/to/capture_stop.py", + "timeout": 15 + }] + }] +""" + +from __future__ import annotations + +import json +import os +import sys +import urllib.error +import urllib.request + +ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100") +TIMEOUT_SECONDS = 10 + +# Minimum prompt length to bother capturing. Single-word acks, +# slash commands, and empty lines aren't useful interactions. +MIN_PROMPT_LENGTH = 15 + +# Maximum response length to capture. Truncate very long assistant +# responses to keep the interactions table manageable. +MAX_RESPONSE_LENGTH = 50_000 + + +def main() -> None: + """Entry point. Always exits 0.""" + try: + _capture() + except Exception as exc: + print(f"capture_stop: {exc}", file=sys.stderr) + + +def _capture() -> None: + if os.environ.get("ATOCORE_CAPTURE_DISABLED") == "1": + return + + raw = sys.stdin.read() + if not raw.strip(): + return + + hook_data = json.loads(raw) + session_id = hook_data.get("session_id", "") + assistant_message = hook_data.get("assistant_message", "") + transcript_path = hook_data.get("transcript_path", "") + cwd = hook_data.get("cwd", "") + + prompt = _extract_last_user_prompt(transcript_path) + if not prompt or len(prompt.strip()) < MIN_PROMPT_LENGTH: + return + + response = assistant_message or "" + if len(response) > MAX_RESPONSE_LENGTH: + response = response[:MAX_RESPONSE_LENGTH] + "\n\n[truncated]" + + project = _infer_project(cwd) + + payload = { + "prompt": prompt, + "response": response, + "client": "claude-code", + "session_id": session_id, + "project": project, + "reinforce": False, + } + + body = json.dumps(payload, ensure_ascii=True).encode("utf-8") + req = urllib.request.Request( + f"{ATOCORE_URL}/interactions", + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + resp = urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS) + result = json.loads(resp.read().decode("utf-8")) + print( + f"capture_stop: recorded interaction {result.get('id', '?')} " + f"(project={project or 'none'}, prompt_chars={len(prompt)}, " + f"response_chars={len(response)})", + file=sys.stderr, + ) + + +def _extract_last_user_prompt(transcript_path: str) -> str: + """Read the JSONL transcript and return the last real user prompt. + + Skips meta messages (isMeta=True) and system/command messages + (content starting with '<'). + """ + if not transcript_path: + return "" + + # Normalize path for the current OS + path = os.path.normpath(transcript_path) + if not os.path.isfile(path): + return "" + + last_prompt = "" + try: + with open(path, encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + + if entry.get("type") != "user": + continue + if entry.get("isMeta", False): + continue + + msg = entry.get("message", {}) + if not isinstance(msg, dict): + continue + + content = msg.get("content", "") + + if isinstance(content, str): + text = content.strip() + elif isinstance(content, list): + # Content blocks: extract text blocks + parts = [] + for block in content: + if isinstance(block, str): + parts.append(block) + elif isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + text = "\n".join(parts).strip() + else: + continue + + # Skip system/command XML and very short messages + if text.startswith("<") or len(text) < MIN_PROMPT_LENGTH: + continue + + last_prompt = text + except OSError: + pass + + return last_prompt + + +# Project inference from working directory. +# Maps known repo paths to AtoCore project IDs. The user can extend +# this table or replace it with a registry lookup later. +_PROJECT_PATH_MAP: dict[str, str] = { + # Add mappings as needed, e.g.: + # "C:\\Users\\antoi\\gigabit": "p04-gigabit", + # "C:\\Users\\antoi\\interferometer": "p05-interferometer", +} + + +def _infer_project(cwd: str) -> str: + """Try to map the working directory to an AtoCore project.""" + if not cwd: + return "" + norm = os.path.normpath(cwd).lower() + for path_prefix, project_id in _PROJECT_PATH_MAP.items(): + if norm.startswith(os.path.normpath(path_prefix).lower()): + return project_id + return "" + + +if __name__ == "__main__": + main() diff --git a/docs/backup-restore-procedure.md b/docs/backup-restore-procedure.md index 032d5a8..53b70f0 100644 --- a/docs/backup-restore-procedure.md +++ b/docs/backup-restore-procedure.md @@ -247,6 +247,18 @@ for i in 1 2 3 4 5 6 7 8 9 10; do done ``` +**Note on build_sha after restore:** The one-shot `docker compose run` +container does not carry the build provenance env vars that `deploy.sh` +exports at deploy time. After a restore, `/health` will report +`build_sha: "unknown"` until you re-run `deploy.sh` or manually +re-deploy. This is cosmetic — the data is correctly restored — but if +you need `build_sha` to be accurate, run a redeploy after the restore: + +```bash +cd /srv/storage/atocore/app +bash deploy/dalidou/deploy.sh +``` + ### Post-restore verification ```bash diff --git a/docs/current-state.md b/docs/current-state.md index 61b22fe..96a398b 100644 --- a/docs/current-state.md +++ b/docs/current-state.md @@ -244,12 +244,16 @@ This separation is healthy: ## Immediate Next Focus -1. Re-run the full backup/restore drill on Dalidou with the - Chroma bind-mount fix in place (end-to-end green, not the - partial pass from 2026-04-09) -2. Turn on auto-capture of Claude Code sessions in conservative - mode now that the restore path is trustworthy -3. Use the new T420-side organic routing layer in real OpenClaw workflows +1. ~~Re-run the full backup/restore drill~~ — DONE 2026-04-11, + full pass (db, registry, chroma, integrity all true) +2. ~~Turn on auto-capture of Claude Code sessions in conservative + mode~~ — DONE 2026-04-11, Stop hook wired via + `deploy/hooks/capture_stop.py` → `POST /interactions` + with `reinforce=false`; kill switch via + `ATOCORE_CAPTURE_DISABLED=1` +3. Run a short real-use pilot with auto-capture on, verify + interactions are landing in Dalidou, review quality +4. Use the new T420-side organic routing layer in real OpenClaw workflows 4. Tighten retrieval quality for the now fully ingested active project corpora 5. Move to Wave 2 trusted-operational ingestion instead of blindly widening raw corpus further 6. Keep the new engineering-knowledge architecture docs as implementation guidance while avoiding premature schema work diff --git a/docs/next-steps.md b/docs/next-steps.md index ac33f1a..a30a5cd 100644 --- a/docs/next-steps.md +++ b/docs/next-steps.md @@ -20,24 +20,14 @@ This working list should be read alongside: ## Immediate Next Steps -1. Re-run the backup/restore drill on Dalidou with the Chroma - bind-mount fix in place - - the 2026-04-09 drill was a PARTIAL PASS: db restore + marker - reversal worked cleanly, but the Chroma step failed with - `OSError [Errno 16] Device or resource busy` because - `shutil.rmtree` cannot unlink a Docker bind-mounted volume - - fix landed immediately after: `restore_runtime_backup()` now - clears the destination's CONTENTS and uses - `copytree(dirs_exist_ok=True)`, and the regression test - `test_restore_chroma_does_not_unlink_destination_directory` - asserts the destination inode is stable - - need a green end-to-end run with `--chroma` actually - working in-container before enabling write-path automation -2. Turn on auto-capture of Claude Code sessions once the drill - re-run is clean - - conservative mode: Stop hook posts to `/interactions`, - no auto-extraction into review queue without review cadence - in place +1. ~~Re-run the backup/restore drill~~ — DONE 2026-04-11, full pass +2. ~~Turn on auto-capture of Claude Code sessions~~ — DONE 2026-04-11, + Stop hook via `deploy/hooks/capture_stop.py` → `POST /interactions` + with `reinforce=false`; kill switch: `ATOCORE_CAPTURE_DISABLED=1` +2a. Run a short real-use pilot with auto-capture on + - verify interactions are landing in Dalidou + - check prompt/response quality and truncation + - confirm fail-open: no user-visible impact when Dalidou is down 3. Use the T420 `atocore-context` skill and the new organic routing layer in real OpenClaw workflows - confirm `auto-context` feels natural diff --git a/tests/test_capture_stop.py b/tests/test_capture_stop.py new file mode 100644 index 0000000..7adfea9 --- /dev/null +++ b/tests/test_capture_stop.py @@ -0,0 +1,249 @@ +"""Tests for deploy/hooks/capture_stop.py — Claude Code Stop hook.""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +import textwrap +from io import StringIO +from pathlib import Path +from unittest import mock + +import pytest + +# The hook script lives outside of the normal package tree, so import +# it by manipulating sys.path. +_HOOK_DIR = str(Path(__file__).resolve().parent.parent / "deploy" / "hooks") +if _HOOK_DIR not in sys.path: + sys.path.insert(0, _HOOK_DIR) + +import capture_stop # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write_transcript(tmp: Path, entries: list[dict]) -> str: + """Write a JSONL transcript and return the path.""" + path = tmp / "transcript.jsonl" + with open(path, "w", encoding="utf-8") as f: + for entry in entries: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + return str(path) + + +def _user_entry(content: str, *, is_meta: bool = False) -> dict: + return { + "type": "user", + "isMeta": is_meta, + "message": {"role": "user", "content": content}, + } + + +def _assistant_entry() -> dict: + return { + "type": "assistant", + "message": { + "role": "assistant", + "content": [{"type": "text", "text": "Sure, here's the answer."}], + }, + } + + +def _system_entry() -> dict: + return {"type": "system", "message": {"role": "system", "content": "system init"}} + + +# --------------------------------------------------------------------------- +# _extract_last_user_prompt +# --------------------------------------------------------------------------- + +class TestExtractLastUserPrompt: + def test_returns_last_real_prompt(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("First prompt that is long enough to capture"), + _assistant_entry(), + _user_entry("Second prompt that should be the one we capture"), + _assistant_entry(), + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "Second prompt that should be the one we capture" + + def test_skips_meta_messages(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("Real prompt that is definitely long enough"), + _user_entry("some system stuff"), + _user_entry("Meta message that looks real enough", is_meta=True), + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "Real prompt that is definitely long enough" + + def test_skips_xml_content(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("Actual prompt from a real human user"), + _user_entry("/help"), + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "Actual prompt from a real human user" + + def test_skips_short_messages(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("This prompt is long enough to be captured"), + _user_entry("yes"), # too short + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "This prompt is long enough to be captured" + + def test_handles_content_blocks(self, tmp_path): + entry = { + "type": "user", + "message": { + "role": "user", + "content": [ + {"type": "text", "text": "First paragraph of the prompt."}, + {"type": "text", "text": "Second paragraph continues here."}, + ], + }, + } + path = _write_transcript(tmp_path, [entry]) + result = capture_stop._extract_last_user_prompt(path) + assert "First paragraph" in result + assert "Second paragraph" in result + + def test_empty_transcript(self, tmp_path): + path = _write_transcript(tmp_path, []) + result = capture_stop._extract_last_user_prompt(path) + assert result == "" + + def test_missing_file(self): + result = capture_stop._extract_last_user_prompt("/nonexistent/path.jsonl") + assert result == "" + + def test_empty_path(self): + result = capture_stop._extract_last_user_prompt("") + assert result == "" + + +# --------------------------------------------------------------------------- +# _infer_project +# --------------------------------------------------------------------------- + +class TestInferProject: + def test_empty_cwd(self): + assert capture_stop._infer_project("") == "" + + def test_unknown_path(self): + assert capture_stop._infer_project("C:\\Users\\antoi\\random") == "" + + def test_mapped_path(self): + with mock.patch.dict(capture_stop._PROJECT_PATH_MAP, { + "C:\\Users\\antoi\\gigabit": "p04-gigabit", + }): + result = capture_stop._infer_project("C:\\Users\\antoi\\gigabit\\src") + assert result == "p04-gigabit" + + +# --------------------------------------------------------------------------- +# _capture (integration-style, mocking HTTP) +# --------------------------------------------------------------------------- + +class TestCapture: + def _hook_input(self, *, transcript_path: str = "", **overrides) -> str: + data = { + "session_id": "test-session-123", + "transcript_path": transcript_path, + "cwd": "C:\\Users\\antoi\\ATOCore", + "permission_mode": "default", + "hook_event_name": "Stop", + "assistant_message": "Here is the answer to your question about the code.", + "turn_number": 3, + } + data.update(overrides) + return json.dumps(data) + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_posts_to_atocore(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("Please explain how the backup system works in detail"), + _assistant_entry(), + ]) + mock_resp = mock.MagicMock() + mock_resp.read.return_value = json.dumps({"id": "int-001", "status": "recorded"}).encode() + mock_urlopen.return_value = mock_resp + + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + + mock_urlopen.assert_called_once() + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data.decode()) + assert body["prompt"] == "Please explain how the backup system works in detail" + assert body["client"] == "claude-code" + assert body["session_id"] == "test-session-123" + assert body["reinforce"] is False + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_skips_when_disabled(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("A prompt that would normally be captured"), + ]) + with mock.patch.dict(os.environ, {"ATOCORE_CAPTURE_DISABLED": "1"}): + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + mock_urlopen.assert_not_called() + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_skips_short_prompt(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("yes"), + ]) + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + mock_urlopen.assert_not_called() + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_truncates_long_response(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("Tell me everything about the entire codebase architecture"), + ]) + long_response = "x" * 60_000 + mock_resp = mock.MagicMock() + mock_resp.read.return_value = json.dumps({"id": "int-002"}).encode() + mock_urlopen.return_value = mock_resp + + with mock.patch("sys.stdin", StringIO( + self._hook_input(transcript_path=transcript, assistant_message=long_response) + )): + capture_stop._capture() + + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data.decode()) + assert len(body["response"]) <= capture_stop.MAX_RESPONSE_LENGTH + 20 + assert body["response"].endswith("[truncated]") + + def test_main_never_raises(self): + """main() must always exit 0, even on garbage input.""" + with mock.patch("sys.stdin", StringIO("not json at all")): + # Should not raise + capture_stop.main() + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_uses_atocore_url_env(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("Please help me with this particular problem in the code"), + ]) + mock_resp = mock.MagicMock() + mock_resp.read.return_value = json.dumps({"id": "int-003"}).encode() + mock_urlopen.return_value = mock_resp + + with mock.patch.dict(os.environ, {"ATOCORE_URL": "http://localhost:9999"}): + # Re-read the env var + with mock.patch.object(capture_stop, "ATOCORE_URL", "http://localhost:9999"): + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + + req = mock_urlopen.call_args[0][0] + assert req.full_url == "http://localhost:9999/interactions"