From 2d911909f8a9f5ccc1f25e407cf0ea4f033acb6f Mon Sep 17 00:00:00 2001 From: Anto01 Date: Sat, 11 Apr 2026 09:00:42 -0400 Subject: [PATCH 1/4] feat: auto-capture Claude Code sessions via Stop hook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add deploy/hooks/capture_stop.py — a Claude Code Stop hook that reads the transcript JSONL, extracts the last user prompt, and POSTs to the AtoCore /interactions endpoint in conservative mode (reinforce=false). Conservative mode means: capture only, no automatic reinforcement or extraction into the review queue. Kill switch: ATOCORE_CAPTURE_DISABLED=1. Also: note build_sha cosmetic issue after restore in runbook, update project status docs to reflect drill pass and auto-capture wiring. 17 new tests (243 total, all passing). Co-Authored-By: Claude Opus 4.6 --- deploy/hooks/capture_stop.py | 187 +++++++++++++++++++++++ docs/backup-restore-procedure.md | 12 ++ docs/current-state.md | 16 +- docs/next-steps.md | 26 +--- tests/test_capture_stop.py | 249 +++++++++++++++++++++++++++++++ 5 files changed, 466 insertions(+), 24 deletions(-) create mode 100644 deploy/hooks/capture_stop.py create mode 100644 tests/test_capture_stop.py diff --git a/deploy/hooks/capture_stop.py b/deploy/hooks/capture_stop.py new file mode 100644 index 0000000..e1a419b --- /dev/null +++ b/deploy/hooks/capture_stop.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +"""Claude Code Stop hook: capture interaction to AtoCore. + +Reads the Stop hook JSON from stdin, extracts the last user prompt +from the transcript JSONL, and POSTs to the AtoCore /interactions +endpoint in conservative mode (reinforce=false, no extraction). + +Fail-open: always exits 0, logs errors to stderr only. + +Environment variables: + ATOCORE_URL Base URL of the AtoCore instance (default: http://dalidou:8100) + ATOCORE_CAPTURE_DISABLED Set to "1" to disable capture (kill switch) + +Usage in ~/.claude/settings.json: + "Stop": [{ + "matcher": "", + "hooks": [{ + "type": "command", + "command": "python /path/to/capture_stop.py", + "timeout": 15 + }] + }] +""" + +from __future__ import annotations + +import json +import os +import sys +import urllib.error +import urllib.request + +ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100") +TIMEOUT_SECONDS = 10 + +# Minimum prompt length to bother capturing. Single-word acks, +# slash commands, and empty lines aren't useful interactions. +MIN_PROMPT_LENGTH = 15 + +# Maximum response length to capture. Truncate very long assistant +# responses to keep the interactions table manageable. +MAX_RESPONSE_LENGTH = 50_000 + + +def main() -> None: + """Entry point. Always exits 0.""" + try: + _capture() + except Exception as exc: + print(f"capture_stop: {exc}", file=sys.stderr) + + +def _capture() -> None: + if os.environ.get("ATOCORE_CAPTURE_DISABLED") == "1": + return + + raw = sys.stdin.read() + if not raw.strip(): + return + + hook_data = json.loads(raw) + session_id = hook_data.get("session_id", "") + assistant_message = hook_data.get("assistant_message", "") + transcript_path = hook_data.get("transcript_path", "") + cwd = hook_data.get("cwd", "") + + prompt = _extract_last_user_prompt(transcript_path) + if not prompt or len(prompt.strip()) < MIN_PROMPT_LENGTH: + return + + response = assistant_message or "" + if len(response) > MAX_RESPONSE_LENGTH: + response = response[:MAX_RESPONSE_LENGTH] + "\n\n[truncated]" + + project = _infer_project(cwd) + + payload = { + "prompt": prompt, + "response": response, + "client": "claude-code", + "session_id": session_id, + "project": project, + "reinforce": False, + } + + body = json.dumps(payload, ensure_ascii=True).encode("utf-8") + req = urllib.request.Request( + f"{ATOCORE_URL}/interactions", + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + resp = urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS) + result = json.loads(resp.read().decode("utf-8")) + print( + f"capture_stop: recorded interaction {result.get('id', '?')} " + f"(project={project or 'none'}, prompt_chars={len(prompt)}, " + f"response_chars={len(response)})", + file=sys.stderr, + ) + + +def _extract_last_user_prompt(transcript_path: str) -> str: + """Read the JSONL transcript and return the last real user prompt. + + Skips meta messages (isMeta=True) and system/command messages + (content starting with '<'). + """ + if not transcript_path: + return "" + + # Normalize path for the current OS + path = os.path.normpath(transcript_path) + if not os.path.isfile(path): + return "" + + last_prompt = "" + try: + with open(path, encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + + if entry.get("type") != "user": + continue + if entry.get("isMeta", False): + continue + + msg = entry.get("message", {}) + if not isinstance(msg, dict): + continue + + content = msg.get("content", "") + + if isinstance(content, str): + text = content.strip() + elif isinstance(content, list): + # Content blocks: extract text blocks + parts = [] + for block in content: + if isinstance(block, str): + parts.append(block) + elif isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + text = "\n".join(parts).strip() + else: + continue + + # Skip system/command XML and very short messages + if text.startswith("<") or len(text) < MIN_PROMPT_LENGTH: + continue + + last_prompt = text + except OSError: + pass + + return last_prompt + + +# Project inference from working directory. +# Maps known repo paths to AtoCore project IDs. The user can extend +# this table or replace it with a registry lookup later. +_PROJECT_PATH_MAP: dict[str, str] = { + # Add mappings as needed, e.g.: + # "C:\\Users\\antoi\\gigabit": "p04-gigabit", + # "C:\\Users\\antoi\\interferometer": "p05-interferometer", +} + + +def _infer_project(cwd: str) -> str: + """Try to map the working directory to an AtoCore project.""" + if not cwd: + return "" + norm = os.path.normpath(cwd).lower() + for path_prefix, project_id in _PROJECT_PATH_MAP.items(): + if norm.startswith(os.path.normpath(path_prefix).lower()): + return project_id + return "" + + +if __name__ == "__main__": + main() diff --git a/docs/backup-restore-procedure.md b/docs/backup-restore-procedure.md index 032d5a8..53b70f0 100644 --- a/docs/backup-restore-procedure.md +++ b/docs/backup-restore-procedure.md @@ -247,6 +247,18 @@ for i in 1 2 3 4 5 6 7 8 9 10; do done ``` +**Note on build_sha after restore:** The one-shot `docker compose run` +container does not carry the build provenance env vars that `deploy.sh` +exports at deploy time. After a restore, `/health` will report +`build_sha: "unknown"` until you re-run `deploy.sh` or manually +re-deploy. This is cosmetic — the data is correctly restored — but if +you need `build_sha` to be accurate, run a redeploy after the restore: + +```bash +cd /srv/storage/atocore/app +bash deploy/dalidou/deploy.sh +``` + ### Post-restore verification ```bash diff --git a/docs/current-state.md b/docs/current-state.md index 61b22fe..96a398b 100644 --- a/docs/current-state.md +++ b/docs/current-state.md @@ -244,12 +244,16 @@ This separation is healthy: ## Immediate Next Focus -1. Re-run the full backup/restore drill on Dalidou with the - Chroma bind-mount fix in place (end-to-end green, not the - partial pass from 2026-04-09) -2. Turn on auto-capture of Claude Code sessions in conservative - mode now that the restore path is trustworthy -3. Use the new T420-side organic routing layer in real OpenClaw workflows +1. ~~Re-run the full backup/restore drill~~ — DONE 2026-04-11, + full pass (db, registry, chroma, integrity all true) +2. ~~Turn on auto-capture of Claude Code sessions in conservative + mode~~ — DONE 2026-04-11, Stop hook wired via + `deploy/hooks/capture_stop.py` → `POST /interactions` + with `reinforce=false`; kill switch via + `ATOCORE_CAPTURE_DISABLED=1` +3. Run a short real-use pilot with auto-capture on, verify + interactions are landing in Dalidou, review quality +4. Use the new T420-side organic routing layer in real OpenClaw workflows 4. Tighten retrieval quality for the now fully ingested active project corpora 5. Move to Wave 2 trusted-operational ingestion instead of blindly widening raw corpus further 6. Keep the new engineering-knowledge architecture docs as implementation guidance while avoiding premature schema work diff --git a/docs/next-steps.md b/docs/next-steps.md index ac33f1a..a30a5cd 100644 --- a/docs/next-steps.md +++ b/docs/next-steps.md @@ -20,24 +20,14 @@ This working list should be read alongside: ## Immediate Next Steps -1. Re-run the backup/restore drill on Dalidou with the Chroma - bind-mount fix in place - - the 2026-04-09 drill was a PARTIAL PASS: db restore + marker - reversal worked cleanly, but the Chroma step failed with - `OSError [Errno 16] Device or resource busy` because - `shutil.rmtree` cannot unlink a Docker bind-mounted volume - - fix landed immediately after: `restore_runtime_backup()` now - clears the destination's CONTENTS and uses - `copytree(dirs_exist_ok=True)`, and the regression test - `test_restore_chroma_does_not_unlink_destination_directory` - asserts the destination inode is stable - - need a green end-to-end run with `--chroma` actually - working in-container before enabling write-path automation -2. Turn on auto-capture of Claude Code sessions once the drill - re-run is clean - - conservative mode: Stop hook posts to `/interactions`, - no auto-extraction into review queue without review cadence - in place +1. ~~Re-run the backup/restore drill~~ — DONE 2026-04-11, full pass +2. ~~Turn on auto-capture of Claude Code sessions~~ — DONE 2026-04-11, + Stop hook via `deploy/hooks/capture_stop.py` → `POST /interactions` + with `reinforce=false`; kill switch: `ATOCORE_CAPTURE_DISABLED=1` +2a. Run a short real-use pilot with auto-capture on + - verify interactions are landing in Dalidou + - check prompt/response quality and truncation + - confirm fail-open: no user-visible impact when Dalidou is down 3. Use the T420 `atocore-context` skill and the new organic routing layer in real OpenClaw workflows - confirm `auto-context` feels natural diff --git a/tests/test_capture_stop.py b/tests/test_capture_stop.py new file mode 100644 index 0000000..7adfea9 --- /dev/null +++ b/tests/test_capture_stop.py @@ -0,0 +1,249 @@ +"""Tests for deploy/hooks/capture_stop.py — Claude Code Stop hook.""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +import textwrap +from io import StringIO +from pathlib import Path +from unittest import mock + +import pytest + +# The hook script lives outside of the normal package tree, so import +# it by manipulating sys.path. +_HOOK_DIR = str(Path(__file__).resolve().parent.parent / "deploy" / "hooks") +if _HOOK_DIR not in sys.path: + sys.path.insert(0, _HOOK_DIR) + +import capture_stop # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write_transcript(tmp: Path, entries: list[dict]) -> str: + """Write a JSONL transcript and return the path.""" + path = tmp / "transcript.jsonl" + with open(path, "w", encoding="utf-8") as f: + for entry in entries: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + return str(path) + + +def _user_entry(content: str, *, is_meta: bool = False) -> dict: + return { + "type": "user", + "isMeta": is_meta, + "message": {"role": "user", "content": content}, + } + + +def _assistant_entry() -> dict: + return { + "type": "assistant", + "message": { + "role": "assistant", + "content": [{"type": "text", "text": "Sure, here's the answer."}], + }, + } + + +def _system_entry() -> dict: + return {"type": "system", "message": {"role": "system", "content": "system init"}} + + +# --------------------------------------------------------------------------- +# _extract_last_user_prompt +# --------------------------------------------------------------------------- + +class TestExtractLastUserPrompt: + def test_returns_last_real_prompt(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("First prompt that is long enough to capture"), + _assistant_entry(), + _user_entry("Second prompt that should be the one we capture"), + _assistant_entry(), + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "Second prompt that should be the one we capture" + + def test_skips_meta_messages(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("Real prompt that is definitely long enough"), + _user_entry("some system stuff"), + _user_entry("Meta message that looks real enough", is_meta=True), + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "Real prompt that is definitely long enough" + + def test_skips_xml_content(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("Actual prompt from a real human user"), + _user_entry("/help"), + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "Actual prompt from a real human user" + + def test_skips_short_messages(self, tmp_path): + path = _write_transcript(tmp_path, [ + _user_entry("This prompt is long enough to be captured"), + _user_entry("yes"), # too short + ]) + result = capture_stop._extract_last_user_prompt(path) + assert result == "This prompt is long enough to be captured" + + def test_handles_content_blocks(self, tmp_path): + entry = { + "type": "user", + "message": { + "role": "user", + "content": [ + {"type": "text", "text": "First paragraph of the prompt."}, + {"type": "text", "text": "Second paragraph continues here."}, + ], + }, + } + path = _write_transcript(tmp_path, [entry]) + result = capture_stop._extract_last_user_prompt(path) + assert "First paragraph" in result + assert "Second paragraph" in result + + def test_empty_transcript(self, tmp_path): + path = _write_transcript(tmp_path, []) + result = capture_stop._extract_last_user_prompt(path) + assert result == "" + + def test_missing_file(self): + result = capture_stop._extract_last_user_prompt("/nonexistent/path.jsonl") + assert result == "" + + def test_empty_path(self): + result = capture_stop._extract_last_user_prompt("") + assert result == "" + + +# --------------------------------------------------------------------------- +# _infer_project +# --------------------------------------------------------------------------- + +class TestInferProject: + def test_empty_cwd(self): + assert capture_stop._infer_project("") == "" + + def test_unknown_path(self): + assert capture_stop._infer_project("C:\\Users\\antoi\\random") == "" + + def test_mapped_path(self): + with mock.patch.dict(capture_stop._PROJECT_PATH_MAP, { + "C:\\Users\\antoi\\gigabit": "p04-gigabit", + }): + result = capture_stop._infer_project("C:\\Users\\antoi\\gigabit\\src") + assert result == "p04-gigabit" + + +# --------------------------------------------------------------------------- +# _capture (integration-style, mocking HTTP) +# --------------------------------------------------------------------------- + +class TestCapture: + def _hook_input(self, *, transcript_path: str = "", **overrides) -> str: + data = { + "session_id": "test-session-123", + "transcript_path": transcript_path, + "cwd": "C:\\Users\\antoi\\ATOCore", + "permission_mode": "default", + "hook_event_name": "Stop", + "assistant_message": "Here is the answer to your question about the code.", + "turn_number": 3, + } + data.update(overrides) + return json.dumps(data) + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_posts_to_atocore(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("Please explain how the backup system works in detail"), + _assistant_entry(), + ]) + mock_resp = mock.MagicMock() + mock_resp.read.return_value = json.dumps({"id": "int-001", "status": "recorded"}).encode() + mock_urlopen.return_value = mock_resp + + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + + mock_urlopen.assert_called_once() + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data.decode()) + assert body["prompt"] == "Please explain how the backup system works in detail" + assert body["client"] == "claude-code" + assert body["session_id"] == "test-session-123" + assert body["reinforce"] is False + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_skips_when_disabled(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("A prompt that would normally be captured"), + ]) + with mock.patch.dict(os.environ, {"ATOCORE_CAPTURE_DISABLED": "1"}): + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + mock_urlopen.assert_not_called() + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_skips_short_prompt(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("yes"), + ]) + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + mock_urlopen.assert_not_called() + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_truncates_long_response(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("Tell me everything about the entire codebase architecture"), + ]) + long_response = "x" * 60_000 + mock_resp = mock.MagicMock() + mock_resp.read.return_value = json.dumps({"id": "int-002"}).encode() + mock_urlopen.return_value = mock_resp + + with mock.patch("sys.stdin", StringIO( + self._hook_input(transcript_path=transcript, assistant_message=long_response) + )): + capture_stop._capture() + + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data.decode()) + assert len(body["response"]) <= capture_stop.MAX_RESPONSE_LENGTH + 20 + assert body["response"].endswith("[truncated]") + + def test_main_never_raises(self): + """main() must always exit 0, even on garbage input.""" + with mock.patch("sys.stdin", StringIO("not json at all")): + # Should not raise + capture_stop.main() + + @mock.patch("capture_stop.urllib.request.urlopen") + def test_uses_atocore_url_env(self, mock_urlopen, tmp_path): + transcript = _write_transcript(tmp_path, [ + _user_entry("Please help me with this particular problem in the code"), + ]) + mock_resp = mock.MagicMock() + mock_resp.read.return_value = json.dumps({"id": "int-003"}).encode() + mock_urlopen.return_value = mock_resp + + with mock.patch.dict(os.environ, {"ATOCORE_URL": "http://localhost:9999"}): + # Re-read the env var + with mock.patch.object(capture_stop, "ATOCORE_URL", "http://localhost:9999"): + with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))): + capture_stop._capture() + + req = mock_urlopen.call_args[0][0] + assert req.full_url == "http://localhost:9999/interactions" From 92fc250b547746ab26de39f8de84615759f42cd5 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Sat, 11 Apr 2026 09:17:21 -0400 Subject: [PATCH 2/4] fix: use correct hook field name last_assistant_message The Claude Code Stop hook sends `last_assistant_message`, not `assistant_message`. This was causing response_chars=0 on all captured interactions. Also removes the temporary debug log block. Co-Authored-By: Claude Opus 4.6 --- deploy/hooks/capture_stop.py | 3 ++- tests/test_capture_stop.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/deploy/hooks/capture_stop.py b/deploy/hooks/capture_stop.py index e1a419b..5661681 100644 --- a/deploy/hooks/capture_stop.py +++ b/deploy/hooks/capture_stop.py @@ -59,8 +59,9 @@ def _capture() -> None: return hook_data = json.loads(raw) + session_id = hook_data.get("session_id", "") - assistant_message = hook_data.get("assistant_message", "") + assistant_message = hook_data.get("last_assistant_message", "") transcript_path = hook_data.get("transcript_path", "") cwd = hook_data.get("cwd", "") diff --git a/tests/test_capture_stop.py b/tests/test_capture_stop.py index 7adfea9..2fcd481 100644 --- a/tests/test_capture_stop.py +++ b/tests/test_capture_stop.py @@ -158,7 +158,7 @@ class TestCapture: "cwd": "C:\\Users\\antoi\\ATOCore", "permission_mode": "default", "hook_event_name": "Stop", - "assistant_message": "Here is the answer to your question about the code.", + "last_assistant_message": "Here is the answer to your question about the code.", "turn_number": 3, } data.update(overrides) @@ -215,7 +215,7 @@ class TestCapture: mock_urlopen.return_value = mock_resp with mock.patch("sys.stdin", StringIO( - self._hook_input(transcript_path=transcript, assistant_message=long_response) + self._hook_input(transcript_path=transcript, last_assistant_message=long_response) )): capture_stop._capture() From a34a7a995f22a4e7901ac4963ae31626c1e648f6 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Sat, 11 Apr 2026 09:40:05 -0400 Subject: [PATCH 3/4] fix: token-overlap matcher for reinforcement (Phase 9B) Replace the substring-based _memory_matches() with a token-overlap matcher that tokenizes both memory content and response, applies lightweight stemming (trailing s/ed/ing) and stop-word removal, then checks whether >= 70% of the memory's tokens appear in the response. This fixes the paraphrase blindness that prevented reinforcement from ever firing on natural responses ("prefers" vs "prefer", "because history" vs "because the history"). 7 new tests (26 total reinforcement tests, all passing). Co-Authored-By: Claude Opus 4.6 --- src/atocore/memory/reinforcement.py | 70 ++++++++++++++--- tests/test_reinforcement.py | 117 ++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 10 deletions(-) diff --git a/src/atocore/memory/reinforcement.py b/src/atocore/memory/reinforcement.py index b7acd84..fc6ee7a 100644 --- a/src/atocore/memory/reinforcement.py +++ b/src/atocore/memory/reinforcement.py @@ -8,10 +8,11 @@ given memory, without ever promoting anything new into trusted state. Design notes ------------ -- Matching is intentionally simple and explainable: - * normalize both sides (lowercase, collapse whitespace) - * require the normalized memory content (or its first 80 chars) to - appear as a substring in the normalized response +- Matching uses token-overlap: tokenize both sides (lowercase, stem, + drop stop words), then check whether >= 70 % of the memory's content + tokens appear in the response token set. This handles natural + paraphrases (e.g. "prefers" vs "prefer", "because history" vs + "because the history") that substring matching missed. - Candidates and invalidated memories are NEVER considered — reinforcement must not revive history. - Reinforcement is capped at 1.0 and monotonically non-decreasing. @@ -43,9 +44,12 @@ log = get_logger("reinforcement") # memories like "prefers Python". _MIN_MEMORY_CONTENT_LENGTH = 12 -# When a memory's content is very long, match on its leading window only -# to avoid punishing small paraphrases further into the body. -_MATCH_WINDOW_CHARS = 80 +# Token-overlap matching constants. +_STOP_WORDS: frozenset[str] = frozenset({ + "the", "a", "an", "and", "or", "of", "to", "is", "was", + "that", "this", "with", "for", "from", "into", +}) +_MATCH_THRESHOLD = 0.70 DEFAULT_CONFIDENCE_DELTA = 0.02 @@ -144,12 +148,58 @@ def _normalize(text: str) -> str: return collapsed.strip() +def _stem(word: str) -> str: + """Aggressive suffix-folding so inflected forms collapse. + + Handles trailing ``ing``, ``ed``, and ``s`` — good enough for + reinforcement matching without pulling in nltk/snowball. + """ + # Order matters: try longest suffix first. + if word.endswith("ing") and len(word) >= 6: + return word[:-3] + if word.endswith("ed") and len(word) > 4: + stem = word[:-2] + # "preferred" → "preferr" → "prefer" (doubled consonant before -ed) + if len(stem) >= 3 and stem[-1] == stem[-2]: + stem = stem[:-1] + return stem + if word.endswith("s") and len(word) > 3: + return word[:-1] + return word + + +def _tokenize(text: str) -> set[str]: + """Split normalized text into a stemmed token set. + + Strips punctuation, drops words shorter than 3 chars and stop words. + """ + tokens: set[str] = set() + for raw in text.split(): + # Strip leading/trailing punctuation (commas, periods, quotes, etc.) + word = raw.strip(".,;:!?\"'()[]{}-/") + if len(word) < 3: + continue + if word in _STOP_WORDS: + continue + tokens.add(_stem(word)) + return tokens + + def _memory_matches(memory_content: str, normalized_response: str) -> bool: - """Return True if the memory content appears in the response.""" + """Return True if enough of the memory's tokens appear in the response. + + Uses token-overlap: tokenize both sides (lowercase, stem, drop stop + words), then check whether >= 70 % of the memory's content tokens + appear in the response token set. + """ if not memory_content: return False normalized_memory = _normalize(memory_content) if len(normalized_memory) < _MIN_MEMORY_CONTENT_LENGTH: return False - window = normalized_memory[:_MATCH_WINDOW_CHARS] - return window in normalized_response + memory_tokens = _tokenize(normalized_memory) + if not memory_tokens: + return False + response_tokens = _tokenize(normalized_response) + overlap = memory_tokens & response_tokens + return len(overlap) / len(memory_tokens) >= _MATCH_THRESHOLD diff --git a/tests/test_reinforcement.py b/tests/test_reinforcement.py index 7537fa4..9d3832b 100644 --- a/tests/test_reinforcement.py +++ b/tests/test_reinforcement.py @@ -6,6 +6,8 @@ from atocore.interactions.service import record_interaction from atocore.main import app from atocore.memory.reinforcement import ( DEFAULT_CONFIDENCE_DELTA, + _stem, + _tokenize, reinforce_from_interaction, ) from atocore.memory.service import ( @@ -373,3 +375,118 @@ def test_get_memories_filter_by_alias(project_registry): assert len(via_alias) == 2 assert len(via_canonical) == 2 assert {m.content for m in via_alias} == {"m1", "m2"} + + +# --- token-overlap matcher: unit tests ------------------------------------- + + +def test_stem_folds_s_ed_ing(): + assert _stem("prefers") == "prefer" + assert _stem("preferred") == "prefer" + assert _stem("services") == "service" + assert _stem("processing") == "process" + # Short words must not be over-stripped + assert _stem("red") == "red" # 3 chars, don't strip "ed" + assert _stem("bus") == "bus" # 3 chars, don't strip "s" + assert _stem("sing") == "sing" # 4 chars, don't strip "ing" + assert _stem("being") == "being" # 5 chars, "ing" strip leaves "be" (2) — too short + + +def test_tokenize_removes_stop_words(): + tokens = _tokenize("the quick brown fox jumps over the lazy dog") + assert "the" not in tokens + assert "quick" in tokens + assert "brown" in tokens + assert "fox" in tokens + assert "dog" in tokens + # "over" has len 4, not a stop word → kept (stemmed: "over") + assert "over" in tokens + + +# --- token-overlap matcher: paraphrase matching ---------------------------- + + +def test_reinforce_matches_paraphrase_prefers_vs_prefer(tmp_data_dir): + """The canonical rebase case from phase9-first-real-use.md.""" + init_db() + mem = create_memory( + memory_type="preference", + content="prefers rebase-based workflows because history stays linear", + confidence=0.5, + ) + interaction = _make_interaction( + response=( + "I prefer rebase-based workflows because the history stays " + "linear and reviewers have an easier time." + ), + ) + results = reinforce_from_interaction(interaction) + assert any(r.memory_id == mem.id for r in results) + + +def test_reinforce_matches_paraphrase_with_articles_and_ed(tmp_data_dir): + init_db() + mem = create_memory( + memory_type="preference", + content="preferred structured logging across all backend services", + confidence=0.5, + ) + interaction = _make_interaction( + response=( + "I set up structured logging across all the backend services, " + "which the team prefers for consistency." + ), + ) + results = reinforce_from_interaction(interaction) + assert any(r.memory_id == mem.id for r in results) + + +def test_reinforce_rejects_low_overlap(tmp_data_dir): + init_db() + mem = create_memory( + memory_type="preference", + content="always uses Python for data processing scripts", + confidence=0.5, + ) + interaction = _make_interaction( + response=( + "The CI pipeline runs on Node.js and deploys to Kubernetes " + "using Helm charts." + ), + ) + results = reinforce_from_interaction(interaction) + assert all(r.memory_id != mem.id for r in results) + + +def test_reinforce_matches_at_70_percent_threshold(tmp_data_dir): + """Exactly 7 of 10 content tokens present → should match.""" + init_db() + # After stop-word removal and stemming, this has 10 tokens: + # alpha, bravo, charlie, delta, echo, foxtrot, golf, hotel, india, juliet + mem = create_memory( + memory_type="preference", + content="alpha bravo charlie delta echo foxtrot golf hotel india juliet", + confidence=0.5, + ) + # Echo 7 of 10 tokens (70%) plus some noise + interaction = _make_interaction( + response="alpha bravo charlie delta echo foxtrot golf noise words here", + ) + results = reinforce_from_interaction(interaction) + assert any(r.memory_id == mem.id for r in results) + + +def test_reinforce_rejects_below_70_percent(tmp_data_dir): + """Only 6 of 10 content tokens present (60%) → should NOT match.""" + init_db() + mem = create_memory( + memory_type="preference", + content="alpha bravo charlie delta echo foxtrot golf hotel india juliet", + confidence=0.5, + ) + # Echo 6 of 10 tokens (60%) plus noise + interaction = _make_interaction( + response="alpha bravo charlie delta echo foxtrot noise words here only", + ) + results = reinforce_from_interaction(interaction) + assert all(r.memory_id != mem.id for r in results) From 58c744fd2f70c52dc9e503ddc9fa9bfcbec4817c Mon Sep 17 00:00:00 2001 From: Anto01 Date: Sat, 11 Apr 2026 09:46:46 -0400 Subject: [PATCH 4/4] feat: post-backup validation + retention cleanup (Tasks B & C) - create_runtime_backup() now auto-validates its output and includes validated/validation_errors fields in returned metadata - New cleanup_old_backups() with retention policy: 7 daily, 4 weekly (Sundays), 6 monthly (1st of month), dry-run by default - CLI `cleanup` subcommand added to backup module - 9 new tests (2 validation + 7 retention), 259 total passing Co-Authored-By: Claude Opus 4.6 --- src/atocore/ops/backup.py | 131 +++++++++++++++++++++ tests/test_backup.py | 235 +++++++++++++++++++++++++++++++++++++- 2 files changed, 364 insertions(+), 2 deletions(-) diff --git a/src/atocore/ops/backup.py b/src/atocore/ops/backup.py index 0c2e885..bb2c131 100644 --- a/src/atocore/ops/backup.py +++ b/src/atocore/ops/backup.py @@ -103,12 +103,27 @@ def create_runtime_backup( encoding="utf-8", ) + # Automatic post-backup validation. Failures log a warning but do + # not raise — the backup files are still on disk and may be useful. + validation = validate_backup(stamp) + validated = validation.get("valid", False) + validation_errors = validation.get("errors", []) + if not validated: + log.warning( + "post_backup_validation_failed", + backup_root=str(backup_root), + errors=validation_errors, + ) + metadata["validated"] = validated + metadata["validation_errors"] = validation_errors + log.info( "runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path), chroma_included=include_chroma, chroma_bytes=chroma_bytes_copied, + validated=validated, ) return metadata @@ -389,6 +404,113 @@ def restore_runtime_backup( return result +def cleanup_old_backups(*, confirm: bool = False) -> dict: + """Apply retention policy and remove old snapshots. + + Retention keeps: + - Last 7 daily snapshots (most recent per calendar day) + - Last 4 weekly snapshots (most recent on each Sunday) + - Last 6 monthly snapshots (most recent on the 1st of each month) + + All other snapshots are candidates for deletion. Runs as dry-run by + default; pass ``confirm=True`` to actually delete. + + Returns a dict with kept/deleted counts and any errors. + """ + snapshots_root = _config.settings.resolved_backup_dir / "snapshots" + if not snapshots_root.exists() or not snapshots_root.is_dir(): + return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []} + + # Parse all stamp directories into (datetime, dir_path) pairs. + stamps: list[tuple[datetime, Path]] = [] + unparseable: list[str] = [] + for entry in sorted(snapshots_root.iterdir()): + if not entry.is_dir(): + continue + try: + dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC) + stamps.append((dt, entry)) + except ValueError: + unparseable.append(entry.name) + + if not stamps: + return { + "kept": 0, "deleted": 0, "would_delete": 0, + "dry_run": not confirm, "errors": [], + "unparseable": unparseable, + } + + # Sort newest first so "most recent per bucket" is a simple first-seen. + stamps.sort(key=lambda t: t[0], reverse=True) + + keep_set: set[Path] = set() + + # Last 7 daily: most recent snapshot per calendar day. + seen_days: set[str] = set() + for dt, path in stamps: + day_key = dt.strftime("%Y-%m-%d") + if day_key not in seen_days: + seen_days.add(day_key) + keep_set.add(path) + if len(seen_days) >= 7: + break + + # Last 4 weekly: most recent snapshot that falls on a Sunday. + seen_weeks: set[str] = set() + for dt, path in stamps: + if dt.weekday() == 6: # Sunday + week_key = dt.strftime("%Y-W%W") + if week_key not in seen_weeks: + seen_weeks.add(week_key) + keep_set.add(path) + if len(seen_weeks) >= 4: + break + + # Last 6 monthly: most recent snapshot on the 1st of a month. + seen_months: set[str] = set() + for dt, path in stamps: + if dt.day == 1: + month_key = dt.strftime("%Y-%m") + if month_key not in seen_months: + seen_months.add(month_key) + keep_set.add(path) + if len(seen_months) >= 6: + break + + to_delete = [path for _, path in stamps if path not in keep_set] + + errors: list[str] = [] + deleted_count = 0 + if confirm: + for path in to_delete: + try: + shutil.rmtree(path) + deleted_count += 1 + except OSError as exc: + errors.append(f"{path.name}: {exc}") + + result: dict = { + "kept": len(keep_set), + "dry_run": not confirm, + "errors": errors, + } + if confirm: + result["deleted"] = deleted_count + else: + result["would_delete"] = len(to_delete) + if unparseable: + result["unparseable"] = unparseable + + log.info( + "cleanup_old_backups", + kept=len(keep_set), + deleted=deleted_count if confirm else 0, + would_delete=len(to_delete) if not confirm else 0, + dry_run=not confirm, + ) + return result + + def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None: source_conn = sqlite3.connect(str(source_path)) dest_conn = sqlite3.connect(str(dest_path)) @@ -448,6 +570,13 @@ def main() -> None: p_validate = sub.add_parser("validate", help="validate a snapshot by stamp") p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)") + p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy") + p_cleanup.add_argument( + "--confirm", + action="store_true", + help="actually delete (default is dry-run)", + ) + p_restore = sub.add_parser( "restore", help="restore a snapshot by stamp (service must be stopped)", @@ -488,6 +617,8 @@ def main() -> None: result = {"backups": list_runtime_backups()} elif command == "validate": result = validate_backup(args.stamp) + elif command == "cleanup": + result = cleanup_old_backups(confirm=getattr(args, "confirm", False)) elif command == "restore": result = restore_runtime_backup( args.stamp, diff --git a/tests/test_backup.py b/tests/test_backup.py index c617f16..db50a41 100644 --- a/tests/test_backup.py +++ b/tests/test_backup.py @@ -1,14 +1,15 @@ -"""Tests for runtime backup creation and restore.""" +"""Tests for runtime backup creation, restore, and retention cleanup.""" import json import sqlite3 -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta import pytest import atocore.config as config from atocore.models.database import init_db from atocore.ops.backup import ( + cleanup_old_backups, create_runtime_backup, list_runtime_backups, restore_runtime_backup, @@ -413,6 +414,56 @@ def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch): config.settings = original_settings +def test_create_backup_includes_validation_fields(tmp_path, monkeypatch): + """Task B: create_runtime_backup auto-validates and reports result.""" + monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) + monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) + monkeypatch.setenv( + "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") + ) + + original_settings = config.settings + try: + config.settings = config.Settings() + init_db() + result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC)) + finally: + config.settings = original_settings + + assert "validated" in result + assert "validation_errors" in result + assert result["validated"] is True + assert result["validation_errors"] == [] + + +def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch): + """Task B: if post-backup validation fails, backup still returns metadata.""" + monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) + monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) + monkeypatch.setenv( + "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") + ) + + def _broken_validate(stamp): + return {"valid": False, "errors": ["db_missing", "metadata_missing"]} + + original_settings = config.settings + try: + config.settings = config.Settings() + init_db() + monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate) + result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC)) + finally: + config.settings = original_settings + + # Should NOT have raised — backup still returned metadata + assert result["validated"] is False + assert result["validation_errors"] == ["db_missing", "metadata_missing"] + # Core backup fields still present + assert "db_snapshot_path" in result + assert "created_at" in result + + def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch): """Stale WAL/SHM sidecars must not carry bytes past the restore. @@ -457,3 +508,183 @@ def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch): ) finally: config.settings = original_settings + + +# --------------------------------------------------------------------------- +# Task C: Backup retention cleanup +# --------------------------------------------------------------------------- + + +def _setup_cleanup_env(tmp_path, monkeypatch): + """Helper: configure env, init db, return snapshots_root.""" + monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data")) + monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups")) + monkeypatch.setenv( + "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json") + ) + original = config.settings + config.settings = config.Settings() + init_db() + snapshots_root = config.settings.resolved_backup_dir / "snapshots" + snapshots_root.mkdir(parents=True, exist_ok=True) + return original, snapshots_root + + +def _seed_snapshots(snapshots_root, dates): + """Create minimal valid snapshot dirs for the given datetimes.""" + for dt in dates: + stamp = dt.strftime("%Y%m%dT%H%M%SZ") + snap_dir = snapshots_root / stamp + db_dir = snap_dir / "db" + db_dir.mkdir(parents=True, exist_ok=True) + db_path = db_dir / "atocore.db" + conn = sqlite3.connect(str(db_path)) + conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)") + conn.close() + metadata = { + "created_at": dt.isoformat(), + "backup_root": str(snap_dir), + "db_snapshot_path": str(db_path), + "db_size_bytes": db_path.stat().st_size, + "registry_snapshot_path": "", + "chroma_snapshot_path": "", + "chroma_snapshot_bytes": 0, + "chroma_snapshot_files": 0, + "chroma_snapshot_included": False, + "vector_store_note": "", + } + (snap_dir / "backup-metadata.json").write_text( + json.dumps(metadata, indent=2) + "\n", encoding="utf-8" + ) + + +def test_cleanup_empty_dir(tmp_path, monkeypatch): + original, _ = _setup_cleanup_env(tmp_path, monkeypatch) + try: + result = cleanup_old_backups() + assert result["kept"] == 0 + assert result["would_delete"] == 0 + assert result["dry_run"] is True + finally: + config.settings = original + + +def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch): + original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) + try: + # 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly). + base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC) + dates = [base + timedelta(days=i) for i in range(10)] + _seed_snapshots(snapshots_root, dates) + + result = cleanup_old_backups() + assert result["dry_run"] is True + # 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily. + # Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly. + # So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily. + # But Apr 5 is the 8th newest day from Apr 11... wait. + # Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily. + # Remaining: Apr 4,3,2. Apr 5 is already in daily. + # None of Apr 4,3,2 are Sunday or 1st → all 3 deleted. + assert result["kept"] == 7 + assert result["would_delete"] == 3 + assert len(list(snapshots_root.iterdir())) == 10 + finally: + config.settings = original + + +def test_cleanup_confirm_deletes(tmp_path, monkeypatch): + original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) + try: + base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC) + dates = [base + timedelta(days=i) for i in range(10)] + _seed_snapshots(snapshots_root, dates) + + result = cleanup_old_backups(confirm=True) + assert result["dry_run"] is False + assert result["deleted"] == 3 + assert result["kept"] == 7 + assert len(list(snapshots_root.iterdir())) == 7 + finally: + config.settings = original + + +def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch): + """Exactly 7 snapshots on different days → all kept.""" + original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) + try: + base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) + dates = [base + timedelta(days=i) for i in range(7)] + _seed_snapshots(snapshots_root, dates) + + result = cleanup_old_backups() + assert result["kept"] == 7 + assert result["would_delete"] == 0 + finally: + config.settings = original + + +def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch): + """Snapshots on Sundays outside the 7-day window are kept as weekly.""" + original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) + try: + # 7 daily snapshots covering Apr 5-11 + base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) + daily = [base + timedelta(days=i) for i in range(7)] + + # 2 older Sunday snapshots + sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday + sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday + # A non-Sunday old snapshot that should be deleted + wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday + + _seed_snapshots(snapshots_root, daily + [sun1, sun2, wed]) + + result = cleanup_old_backups() + # 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted + assert result["kept"] == 9 + assert result["would_delete"] == 1 + finally: + config.settings = original + + +def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch): + """Snapshots on the 1st of a month outside daily+weekly are kept as monthly.""" + original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) + try: + # 7 daily in April 2026 + base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) + daily = [base + timedelta(days=i) for i in range(7)] + + # Old monthly 1st snapshots + m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC) + m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC) + # Old non-1st, non-Sunday snapshot — should be deleted + old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC) + + _seed_snapshots(snapshots_root, daily + [m1, m2, old]) + + result = cleanup_old_backups() + # 7 daily + 2 monthly = 9 kept, 1 deleted + assert result["kept"] == 9 + assert result["would_delete"] == 1 + finally: + config.settings = original + + +def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch): + """Directories with unparseable names are ignored, not deleted.""" + original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch) + try: + base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC) + _seed_snapshots(snapshots_root, [base]) + + bad_dir = snapshots_root / "not-a-timestamp" + bad_dir.mkdir() + + result = cleanup_old_backups(confirm=True) + assert result.get("unparseable") == ["not-a-timestamp"] + assert bad_dir.exists() + assert result["kept"] == 1 + finally: + config.settings = original