diff --git a/deploy/hooks/capture_stop.py b/deploy/hooks/capture_stop.py
new file mode 100644
index 0000000..5661681
--- /dev/null
+++ b/deploy/hooks/capture_stop.py
@@ -0,0 +1,188 @@
+#!/usr/bin/env python3
+"""Claude Code Stop hook: capture interaction to AtoCore.
+
+Reads the Stop hook JSON from stdin, extracts the last user prompt
+from the transcript JSONL, and POSTs to the AtoCore /interactions
+endpoint in conservative mode (reinforce=false, no extraction).
+
+Fail-open: always exits 0, logs errors to stderr only.
+
+Environment variables:
+ ATOCORE_URL Base URL of the AtoCore instance (default: http://dalidou:8100)
+ ATOCORE_CAPTURE_DISABLED Set to "1" to disable capture (kill switch)
+
+Usage in ~/.claude/settings.json:
+ "Stop": [{
+ "matcher": "",
+ "hooks": [{
+ "type": "command",
+ "command": "python /path/to/capture_stop.py",
+ "timeout": 15
+ }]
+ }]
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import sys
+import urllib.error
+import urllib.request
+
+ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100")
+TIMEOUT_SECONDS = 10
+
+# Minimum prompt length to bother capturing. Single-word acks,
+# slash commands, and empty lines aren't useful interactions.
+MIN_PROMPT_LENGTH = 15
+
+# Maximum response length to capture. Truncate very long assistant
+# responses to keep the interactions table manageable.
+MAX_RESPONSE_LENGTH = 50_000
+
+
+def main() -> None:
+ """Entry point. Always exits 0."""
+ try:
+ _capture()
+ except Exception as exc:
+ print(f"capture_stop: {exc}", file=sys.stderr)
+
+
+def _capture() -> None:
+ if os.environ.get("ATOCORE_CAPTURE_DISABLED") == "1":
+ return
+
+ raw = sys.stdin.read()
+ if not raw.strip():
+ return
+
+ hook_data = json.loads(raw)
+
+ session_id = hook_data.get("session_id", "")
+ assistant_message = hook_data.get("last_assistant_message", "")
+ transcript_path = hook_data.get("transcript_path", "")
+ cwd = hook_data.get("cwd", "")
+
+ prompt = _extract_last_user_prompt(transcript_path)
+ if not prompt or len(prompt.strip()) < MIN_PROMPT_LENGTH:
+ return
+
+ response = assistant_message or ""
+ if len(response) > MAX_RESPONSE_LENGTH:
+ response = response[:MAX_RESPONSE_LENGTH] + "\n\n[truncated]"
+
+ project = _infer_project(cwd)
+
+ payload = {
+ "prompt": prompt,
+ "response": response,
+ "client": "claude-code",
+ "session_id": session_id,
+ "project": project,
+ "reinforce": False,
+ }
+
+ body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
+ req = urllib.request.Request(
+ f"{ATOCORE_URL}/interactions",
+ data=body,
+ headers={"Content-Type": "application/json"},
+ method="POST",
+ )
+ resp = urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS)
+ result = json.loads(resp.read().decode("utf-8"))
+ print(
+ f"capture_stop: recorded interaction {result.get('id', '?')} "
+ f"(project={project or 'none'}, prompt_chars={len(prompt)}, "
+ f"response_chars={len(response)})",
+ file=sys.stderr,
+ )
+
+
+def _extract_last_user_prompt(transcript_path: str) -> str:
+ """Read the JSONL transcript and return the last real user prompt.
+
+ Skips meta messages (isMeta=True) and system/command messages
+ (content starting with '<').
+ """
+ if not transcript_path:
+ return ""
+
+ # Normalize path for the current OS
+ path = os.path.normpath(transcript_path)
+ if not os.path.isfile(path):
+ return ""
+
+ last_prompt = ""
+ try:
+ with open(path, encoding="utf-8", errors="replace") as f:
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ entry = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+
+ if entry.get("type") != "user":
+ continue
+ if entry.get("isMeta", False):
+ continue
+
+ msg = entry.get("message", {})
+ if not isinstance(msg, dict):
+ continue
+
+ content = msg.get("content", "")
+
+ if isinstance(content, str):
+ text = content.strip()
+ elif isinstance(content, list):
+ # Content blocks: extract text blocks
+ parts = []
+ for block in content:
+ if isinstance(block, str):
+ parts.append(block)
+ elif isinstance(block, dict) and block.get("type") == "text":
+ parts.append(block.get("text", ""))
+ text = "\n".join(parts).strip()
+ else:
+ continue
+
+ # Skip system/command XML and very short messages
+ if text.startswith("<") or len(text) < MIN_PROMPT_LENGTH:
+ continue
+
+ last_prompt = text
+ except OSError:
+ pass
+
+ return last_prompt
+
+
+# Project inference from working directory.
+# Maps known repo paths to AtoCore project IDs. The user can extend
+# this table or replace it with a registry lookup later.
+_PROJECT_PATH_MAP: dict[str, str] = {
+ # Add mappings as needed, e.g.:
+ # "C:\\Users\\antoi\\gigabit": "p04-gigabit",
+ # "C:\\Users\\antoi\\interferometer": "p05-interferometer",
+}
+
+
+def _infer_project(cwd: str) -> str:
+ """Try to map the working directory to an AtoCore project."""
+ if not cwd:
+ return ""
+ norm = os.path.normpath(cwd).lower()
+ for path_prefix, project_id in _PROJECT_PATH_MAP.items():
+ if norm.startswith(os.path.normpath(path_prefix).lower()):
+ return project_id
+ return ""
+
+
+if __name__ == "__main__":
+ main()
diff --git a/docs/backup-restore-procedure.md b/docs/backup-restore-procedure.md
index 032d5a8..53b70f0 100644
--- a/docs/backup-restore-procedure.md
+++ b/docs/backup-restore-procedure.md
@@ -247,6 +247,18 @@ for i in 1 2 3 4 5 6 7 8 9 10; do
done
```
+**Note on build_sha after restore:** The one-shot `docker compose run`
+container does not carry the build provenance env vars that `deploy.sh`
+exports at deploy time. After a restore, `/health` will report
+`build_sha: "unknown"` until you re-run `deploy.sh` or manually
+re-deploy. This is cosmetic — the data is correctly restored — but if
+you need `build_sha` to be accurate, run a redeploy after the restore:
+
+```bash
+cd /srv/storage/atocore/app
+bash deploy/dalidou/deploy.sh
+```
+
### Post-restore verification
```bash
diff --git a/docs/current-state.md b/docs/current-state.md
index 61b22fe..96a398b 100644
--- a/docs/current-state.md
+++ b/docs/current-state.md
@@ -244,12 +244,16 @@ This separation is healthy:
## Immediate Next Focus
-1. Re-run the full backup/restore drill on Dalidou with the
- Chroma bind-mount fix in place (end-to-end green, not the
- partial pass from 2026-04-09)
-2. Turn on auto-capture of Claude Code sessions in conservative
- mode now that the restore path is trustworthy
-3. Use the new T420-side organic routing layer in real OpenClaw workflows
+1. ~~Re-run the full backup/restore drill~~ — DONE 2026-04-11,
+ full pass (db, registry, chroma, integrity all true)
+2. ~~Turn on auto-capture of Claude Code sessions in conservative
+ mode~~ — DONE 2026-04-11, Stop hook wired via
+ `deploy/hooks/capture_stop.py` → `POST /interactions`
+ with `reinforce=false`; kill switch via
+ `ATOCORE_CAPTURE_DISABLED=1`
+3. Run a short real-use pilot with auto-capture on, verify
+ interactions are landing in Dalidou, review quality
+4. Use the new T420-side organic routing layer in real OpenClaw workflows
4. Tighten retrieval quality for the now fully ingested active project corpora
5. Move to Wave 2 trusted-operational ingestion instead of blindly widening raw corpus further
6. Keep the new engineering-knowledge architecture docs as implementation guidance while avoiding premature schema work
diff --git a/docs/next-steps.md b/docs/next-steps.md
index ac33f1a..a30a5cd 100644
--- a/docs/next-steps.md
+++ b/docs/next-steps.md
@@ -20,24 +20,14 @@ This working list should be read alongside:
## Immediate Next Steps
-1. Re-run the backup/restore drill on Dalidou with the Chroma
- bind-mount fix in place
- - the 2026-04-09 drill was a PARTIAL PASS: db restore + marker
- reversal worked cleanly, but the Chroma step failed with
- `OSError [Errno 16] Device or resource busy` because
- `shutil.rmtree` cannot unlink a Docker bind-mounted volume
- - fix landed immediately after: `restore_runtime_backup()` now
- clears the destination's CONTENTS and uses
- `copytree(dirs_exist_ok=True)`, and the regression test
- `test_restore_chroma_does_not_unlink_destination_directory`
- asserts the destination inode is stable
- - need a green end-to-end run with `--chroma` actually
- working in-container before enabling write-path automation
-2. Turn on auto-capture of Claude Code sessions once the drill
- re-run is clean
- - conservative mode: Stop hook posts to `/interactions`,
- no auto-extraction into review queue without review cadence
- in place
+1. ~~Re-run the backup/restore drill~~ — DONE 2026-04-11, full pass
+2. ~~Turn on auto-capture of Claude Code sessions~~ — DONE 2026-04-11,
+ Stop hook via `deploy/hooks/capture_stop.py` → `POST /interactions`
+ with `reinforce=false`; kill switch: `ATOCORE_CAPTURE_DISABLED=1`
+2a. Run a short real-use pilot with auto-capture on
+ - verify interactions are landing in Dalidou
+ - check prompt/response quality and truncation
+ - confirm fail-open: no user-visible impact when Dalidou is down
3. Use the T420 `atocore-context` skill and the new organic routing layer in
real OpenClaw workflows
- confirm `auto-context` feels natural
diff --git a/src/atocore/memory/reinforcement.py b/src/atocore/memory/reinforcement.py
index b7acd84..fc6ee7a 100644
--- a/src/atocore/memory/reinforcement.py
+++ b/src/atocore/memory/reinforcement.py
@@ -8,10 +8,11 @@ given memory, without ever promoting anything new into trusted state.
Design notes
------------
-- Matching is intentionally simple and explainable:
- * normalize both sides (lowercase, collapse whitespace)
- * require the normalized memory content (or its first 80 chars) to
- appear as a substring in the normalized response
+- Matching uses token-overlap: tokenize both sides (lowercase, stem,
+ drop stop words), then check whether >= 70 % of the memory's content
+ tokens appear in the response token set. This handles natural
+ paraphrases (e.g. "prefers" vs "prefer", "because history" vs
+ "because the history") that substring matching missed.
- Candidates and invalidated memories are NEVER considered — reinforcement
must not revive history.
- Reinforcement is capped at 1.0 and monotonically non-decreasing.
@@ -43,9 +44,12 @@ log = get_logger("reinforcement")
# memories like "prefers Python".
_MIN_MEMORY_CONTENT_LENGTH = 12
-# When a memory's content is very long, match on its leading window only
-# to avoid punishing small paraphrases further into the body.
-_MATCH_WINDOW_CHARS = 80
+# Token-overlap matching constants.
+_STOP_WORDS: frozenset[str] = frozenset({
+ "the", "a", "an", "and", "or", "of", "to", "is", "was",
+ "that", "this", "with", "for", "from", "into",
+})
+_MATCH_THRESHOLD = 0.70
DEFAULT_CONFIDENCE_DELTA = 0.02
@@ -144,12 +148,58 @@ def _normalize(text: str) -> str:
return collapsed.strip()
+def _stem(word: str) -> str:
+ """Aggressive suffix-folding so inflected forms collapse.
+
+ Handles trailing ``ing``, ``ed``, and ``s`` — good enough for
+ reinforcement matching without pulling in nltk/snowball.
+ """
+ # Order matters: try longest suffix first.
+ if word.endswith("ing") and len(word) >= 6:
+ return word[:-3]
+ if word.endswith("ed") and len(word) > 4:
+ stem = word[:-2]
+ # "preferred" → "preferr" → "prefer" (doubled consonant before -ed)
+ if len(stem) >= 3 and stem[-1] == stem[-2]:
+ stem = stem[:-1]
+ return stem
+ if word.endswith("s") and len(word) > 3:
+ return word[:-1]
+ return word
+
+
+def _tokenize(text: str) -> set[str]:
+ """Split normalized text into a stemmed token set.
+
+ Strips punctuation, drops words shorter than 3 chars and stop words.
+ """
+ tokens: set[str] = set()
+ for raw in text.split():
+ # Strip leading/trailing punctuation (commas, periods, quotes, etc.)
+ word = raw.strip(".,;:!?\"'()[]{}-/")
+ if len(word) < 3:
+ continue
+ if word in _STOP_WORDS:
+ continue
+ tokens.add(_stem(word))
+ return tokens
+
+
def _memory_matches(memory_content: str, normalized_response: str) -> bool:
- """Return True if the memory content appears in the response."""
+ """Return True if enough of the memory's tokens appear in the response.
+
+ Uses token-overlap: tokenize both sides (lowercase, stem, drop stop
+ words), then check whether >= 70 % of the memory's content tokens
+ appear in the response token set.
+ """
if not memory_content:
return False
normalized_memory = _normalize(memory_content)
if len(normalized_memory) < _MIN_MEMORY_CONTENT_LENGTH:
return False
- window = normalized_memory[:_MATCH_WINDOW_CHARS]
- return window in normalized_response
+ memory_tokens = _tokenize(normalized_memory)
+ if not memory_tokens:
+ return False
+ response_tokens = _tokenize(normalized_response)
+ overlap = memory_tokens & response_tokens
+ return len(overlap) / len(memory_tokens) >= _MATCH_THRESHOLD
diff --git a/src/atocore/ops/backup.py b/src/atocore/ops/backup.py
index 0c2e885..bb2c131 100644
--- a/src/atocore/ops/backup.py
+++ b/src/atocore/ops/backup.py
@@ -103,12 +103,27 @@ def create_runtime_backup(
encoding="utf-8",
)
+ # Automatic post-backup validation. Failures log a warning but do
+ # not raise — the backup files are still on disk and may be useful.
+ validation = validate_backup(stamp)
+ validated = validation.get("valid", False)
+ validation_errors = validation.get("errors", [])
+ if not validated:
+ log.warning(
+ "post_backup_validation_failed",
+ backup_root=str(backup_root),
+ errors=validation_errors,
+ )
+ metadata["validated"] = validated
+ metadata["validation_errors"] = validation_errors
+
log.info(
"runtime_backup_created",
backup_root=str(backup_root),
db_snapshot=str(db_snapshot_path),
chroma_included=include_chroma,
chroma_bytes=chroma_bytes_copied,
+ validated=validated,
)
return metadata
@@ -389,6 +404,113 @@ def restore_runtime_backup(
return result
+def cleanup_old_backups(*, confirm: bool = False) -> dict:
+ """Apply retention policy and remove old snapshots.
+
+ Retention keeps:
+ - Last 7 daily snapshots (most recent per calendar day)
+ - Last 4 weekly snapshots (most recent on each Sunday)
+ - Last 6 monthly snapshots (most recent on the 1st of each month)
+
+ All other snapshots are candidates for deletion. Runs as dry-run by
+ default; pass ``confirm=True`` to actually delete.
+
+ Returns a dict with kept/deleted counts and any errors.
+ """
+ snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
+ if not snapshots_root.exists() or not snapshots_root.is_dir():
+ return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []}
+
+ # Parse all stamp directories into (datetime, dir_path) pairs.
+ stamps: list[tuple[datetime, Path]] = []
+ unparseable: list[str] = []
+ for entry in sorted(snapshots_root.iterdir()):
+ if not entry.is_dir():
+ continue
+ try:
+ dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
+ stamps.append((dt, entry))
+ except ValueError:
+ unparseable.append(entry.name)
+
+ if not stamps:
+ return {
+ "kept": 0, "deleted": 0, "would_delete": 0,
+ "dry_run": not confirm, "errors": [],
+ "unparseable": unparseable,
+ }
+
+ # Sort newest first so "most recent per bucket" is a simple first-seen.
+ stamps.sort(key=lambda t: t[0], reverse=True)
+
+ keep_set: set[Path] = set()
+
+ # Last 7 daily: most recent snapshot per calendar day.
+ seen_days: set[str] = set()
+ for dt, path in stamps:
+ day_key = dt.strftime("%Y-%m-%d")
+ if day_key not in seen_days:
+ seen_days.add(day_key)
+ keep_set.add(path)
+ if len(seen_days) >= 7:
+ break
+
+ # Last 4 weekly: most recent snapshot that falls on a Sunday.
+ seen_weeks: set[str] = set()
+ for dt, path in stamps:
+ if dt.weekday() == 6: # Sunday
+ week_key = dt.strftime("%Y-W%W")
+ if week_key not in seen_weeks:
+ seen_weeks.add(week_key)
+ keep_set.add(path)
+ if len(seen_weeks) >= 4:
+ break
+
+ # Last 6 monthly: most recent snapshot on the 1st of a month.
+ seen_months: set[str] = set()
+ for dt, path in stamps:
+ if dt.day == 1:
+ month_key = dt.strftime("%Y-%m")
+ if month_key not in seen_months:
+ seen_months.add(month_key)
+ keep_set.add(path)
+ if len(seen_months) >= 6:
+ break
+
+ to_delete = [path for _, path in stamps if path not in keep_set]
+
+ errors: list[str] = []
+ deleted_count = 0
+ if confirm:
+ for path in to_delete:
+ try:
+ shutil.rmtree(path)
+ deleted_count += 1
+ except OSError as exc:
+ errors.append(f"{path.name}: {exc}")
+
+ result: dict = {
+ "kept": len(keep_set),
+ "dry_run": not confirm,
+ "errors": errors,
+ }
+ if confirm:
+ result["deleted"] = deleted_count
+ else:
+ result["would_delete"] = len(to_delete)
+ if unparseable:
+ result["unparseable"] = unparseable
+
+ log.info(
+ "cleanup_old_backups",
+ kept=len(keep_set),
+ deleted=deleted_count if confirm else 0,
+ would_delete=len(to_delete) if not confirm else 0,
+ dry_run=not confirm,
+ )
+ return result
+
+
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn = sqlite3.connect(str(source_path))
dest_conn = sqlite3.connect(str(dest_path))
@@ -448,6 +570,13 @@ def main() -> None:
p_validate = sub.add_parser("validate", help="validate a snapshot by stamp")
p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)")
+ p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy")
+ p_cleanup.add_argument(
+ "--confirm",
+ action="store_true",
+ help="actually delete (default is dry-run)",
+ )
+
p_restore = sub.add_parser(
"restore",
help="restore a snapshot by stamp (service must be stopped)",
@@ -488,6 +617,8 @@ def main() -> None:
result = {"backups": list_runtime_backups()}
elif command == "validate":
result = validate_backup(args.stamp)
+ elif command == "cleanup":
+ result = cleanup_old_backups(confirm=getattr(args, "confirm", False))
elif command == "restore":
result = restore_runtime_backup(
args.stamp,
diff --git a/tests/test_backup.py b/tests/test_backup.py
index c617f16..db50a41 100644
--- a/tests/test_backup.py
+++ b/tests/test_backup.py
@@ -1,14 +1,15 @@
-"""Tests for runtime backup creation and restore."""
+"""Tests for runtime backup creation, restore, and retention cleanup."""
import json
import sqlite3
-from datetime import UTC, datetime
+from datetime import UTC, datetime, timedelta
import pytest
import atocore.config as config
from atocore.models.database import init_db
from atocore.ops.backup import (
+ cleanup_old_backups,
create_runtime_backup,
list_runtime_backups,
restore_runtime_backup,
@@ -413,6 +414,56 @@ def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch):
config.settings = original_settings
+def test_create_backup_includes_validation_fields(tmp_path, monkeypatch):
+ """Task B: create_runtime_backup auto-validates and reports result."""
+ monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
+ monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
+ monkeypatch.setenv(
+ "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
+ )
+
+ original_settings = config.settings
+ try:
+ config.settings = config.Settings()
+ init_db()
+ result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC))
+ finally:
+ config.settings = original_settings
+
+ assert "validated" in result
+ assert "validation_errors" in result
+ assert result["validated"] is True
+ assert result["validation_errors"] == []
+
+
+def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch):
+ """Task B: if post-backup validation fails, backup still returns metadata."""
+ monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
+ monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
+ monkeypatch.setenv(
+ "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
+ )
+
+ def _broken_validate(stamp):
+ return {"valid": False, "errors": ["db_missing", "metadata_missing"]}
+
+ original_settings = config.settings
+ try:
+ config.settings = config.Settings()
+ init_db()
+ monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate)
+ result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC))
+ finally:
+ config.settings = original_settings
+
+ # Should NOT have raised — backup still returned metadata
+ assert result["validated"] is False
+ assert result["validation_errors"] == ["db_missing", "metadata_missing"]
+ # Core backup fields still present
+ assert "db_snapshot_path" in result
+ assert "created_at" in result
+
+
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
@@ -457,3 +508,183 @@ def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
)
finally:
config.settings = original_settings
+
+
+# ---------------------------------------------------------------------------
+# Task C: Backup retention cleanup
+# ---------------------------------------------------------------------------
+
+
+def _setup_cleanup_env(tmp_path, monkeypatch):
+ """Helper: configure env, init db, return snapshots_root."""
+ monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
+ monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
+ monkeypatch.setenv(
+ "ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
+ )
+ original = config.settings
+ config.settings = config.Settings()
+ init_db()
+ snapshots_root = config.settings.resolved_backup_dir / "snapshots"
+ snapshots_root.mkdir(parents=True, exist_ok=True)
+ return original, snapshots_root
+
+
+def _seed_snapshots(snapshots_root, dates):
+ """Create minimal valid snapshot dirs for the given datetimes."""
+ for dt in dates:
+ stamp = dt.strftime("%Y%m%dT%H%M%SZ")
+ snap_dir = snapshots_root / stamp
+ db_dir = snap_dir / "db"
+ db_dir.mkdir(parents=True, exist_ok=True)
+ db_path = db_dir / "atocore.db"
+ conn = sqlite3.connect(str(db_path))
+ conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)")
+ conn.close()
+ metadata = {
+ "created_at": dt.isoformat(),
+ "backup_root": str(snap_dir),
+ "db_snapshot_path": str(db_path),
+ "db_size_bytes": db_path.stat().st_size,
+ "registry_snapshot_path": "",
+ "chroma_snapshot_path": "",
+ "chroma_snapshot_bytes": 0,
+ "chroma_snapshot_files": 0,
+ "chroma_snapshot_included": False,
+ "vector_store_note": "",
+ }
+ (snap_dir / "backup-metadata.json").write_text(
+ json.dumps(metadata, indent=2) + "\n", encoding="utf-8"
+ )
+
+
+def test_cleanup_empty_dir(tmp_path, monkeypatch):
+ original, _ = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ result = cleanup_old_backups()
+ assert result["kept"] == 0
+ assert result["would_delete"] == 0
+ assert result["dry_run"] is True
+ finally:
+ config.settings = original
+
+
+def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch):
+ original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ # 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly).
+ base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
+ dates = [base + timedelta(days=i) for i in range(10)]
+ _seed_snapshots(snapshots_root, dates)
+
+ result = cleanup_old_backups()
+ assert result["dry_run"] is True
+ # 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily.
+ # Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly.
+ # So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily.
+ # But Apr 5 is the 8th newest day from Apr 11... wait.
+ # Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily.
+ # Remaining: Apr 4,3,2. Apr 5 is already in daily.
+ # None of Apr 4,3,2 are Sunday or 1st → all 3 deleted.
+ assert result["kept"] == 7
+ assert result["would_delete"] == 3
+ assert len(list(snapshots_root.iterdir())) == 10
+ finally:
+ config.settings = original
+
+
+def test_cleanup_confirm_deletes(tmp_path, monkeypatch):
+ original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
+ dates = [base + timedelta(days=i) for i in range(10)]
+ _seed_snapshots(snapshots_root, dates)
+
+ result = cleanup_old_backups(confirm=True)
+ assert result["dry_run"] is False
+ assert result["deleted"] == 3
+ assert result["kept"] == 7
+ assert len(list(snapshots_root.iterdir())) == 7
+ finally:
+ config.settings = original
+
+
+def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch):
+ """Exactly 7 snapshots on different days → all kept."""
+ original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
+ dates = [base + timedelta(days=i) for i in range(7)]
+ _seed_snapshots(snapshots_root, dates)
+
+ result = cleanup_old_backups()
+ assert result["kept"] == 7
+ assert result["would_delete"] == 0
+ finally:
+ config.settings = original
+
+
+def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch):
+ """Snapshots on Sundays outside the 7-day window are kept as weekly."""
+ original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ # 7 daily snapshots covering Apr 5-11
+ base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
+ daily = [base + timedelta(days=i) for i in range(7)]
+
+ # 2 older Sunday snapshots
+ sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday
+ sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday
+ # A non-Sunday old snapshot that should be deleted
+ wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday
+
+ _seed_snapshots(snapshots_root, daily + [sun1, sun2, wed])
+
+ result = cleanup_old_backups()
+ # 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted
+ assert result["kept"] == 9
+ assert result["would_delete"] == 1
+ finally:
+ config.settings = original
+
+
+def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch):
+ """Snapshots on the 1st of a month outside daily+weekly are kept as monthly."""
+ original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ # 7 daily in April 2026
+ base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
+ daily = [base + timedelta(days=i) for i in range(7)]
+
+ # Old monthly 1st snapshots
+ m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC)
+ m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC)
+ # Old non-1st, non-Sunday snapshot — should be deleted
+ old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC)
+
+ _seed_snapshots(snapshots_root, daily + [m1, m2, old])
+
+ result = cleanup_old_backups()
+ # 7 daily + 2 monthly = 9 kept, 1 deleted
+ assert result["kept"] == 9
+ assert result["would_delete"] == 1
+ finally:
+ config.settings = original
+
+
+def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch):
+ """Directories with unparseable names are ignored, not deleted."""
+ original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
+ try:
+ base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
+ _seed_snapshots(snapshots_root, [base])
+
+ bad_dir = snapshots_root / "not-a-timestamp"
+ bad_dir.mkdir()
+
+ result = cleanup_old_backups(confirm=True)
+ assert result.get("unparseable") == ["not-a-timestamp"]
+ assert bad_dir.exists()
+ assert result["kept"] == 1
+ finally:
+ config.settings = original
diff --git a/tests/test_capture_stop.py b/tests/test_capture_stop.py
new file mode 100644
index 0000000..2fcd481
--- /dev/null
+++ b/tests/test_capture_stop.py
@@ -0,0 +1,249 @@
+"""Tests for deploy/hooks/capture_stop.py — Claude Code Stop hook."""
+
+from __future__ import annotations
+
+import json
+import os
+import sys
+import tempfile
+import textwrap
+from io import StringIO
+from pathlib import Path
+from unittest import mock
+
+import pytest
+
+# The hook script lives outside of the normal package tree, so import
+# it by manipulating sys.path.
+_HOOK_DIR = str(Path(__file__).resolve().parent.parent / "deploy" / "hooks")
+if _HOOK_DIR not in sys.path:
+ sys.path.insert(0, _HOOK_DIR)
+
+import capture_stop # noqa: E402
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _write_transcript(tmp: Path, entries: list[dict]) -> str:
+ """Write a JSONL transcript and return the path."""
+ path = tmp / "transcript.jsonl"
+ with open(path, "w", encoding="utf-8") as f:
+ for entry in entries:
+ f.write(json.dumps(entry, ensure_ascii=False) + "\n")
+ return str(path)
+
+
+def _user_entry(content: str, *, is_meta: bool = False) -> dict:
+ return {
+ "type": "user",
+ "isMeta": is_meta,
+ "message": {"role": "user", "content": content},
+ }
+
+
+def _assistant_entry() -> dict:
+ return {
+ "type": "assistant",
+ "message": {
+ "role": "assistant",
+ "content": [{"type": "text", "text": "Sure, here's the answer."}],
+ },
+ }
+
+
+def _system_entry() -> dict:
+ return {"type": "system", "message": {"role": "system", "content": "system init"}}
+
+
+# ---------------------------------------------------------------------------
+# _extract_last_user_prompt
+# ---------------------------------------------------------------------------
+
+class TestExtractLastUserPrompt:
+ def test_returns_last_real_prompt(self, tmp_path):
+ path = _write_transcript(tmp_path, [
+ _user_entry("First prompt that is long enough to capture"),
+ _assistant_entry(),
+ _user_entry("Second prompt that should be the one we capture"),
+ _assistant_entry(),
+ ])
+ result = capture_stop._extract_last_user_prompt(path)
+ assert result == "Second prompt that should be the one we capture"
+
+ def test_skips_meta_messages(self, tmp_path):
+ path = _write_transcript(tmp_path, [
+ _user_entry("Real prompt that is definitely long enough"),
+ _user_entry("some system stuff"),
+ _user_entry("Meta message that looks real enough", is_meta=True),
+ ])
+ result = capture_stop._extract_last_user_prompt(path)
+ assert result == "Real prompt that is definitely long enough"
+
+ def test_skips_xml_content(self, tmp_path):
+ path = _write_transcript(tmp_path, [
+ _user_entry("Actual prompt from a real human user"),
+ _user_entry("/help"),
+ ])
+ result = capture_stop._extract_last_user_prompt(path)
+ assert result == "Actual prompt from a real human user"
+
+ def test_skips_short_messages(self, tmp_path):
+ path = _write_transcript(tmp_path, [
+ _user_entry("This prompt is long enough to be captured"),
+ _user_entry("yes"), # too short
+ ])
+ result = capture_stop._extract_last_user_prompt(path)
+ assert result == "This prompt is long enough to be captured"
+
+ def test_handles_content_blocks(self, tmp_path):
+ entry = {
+ "type": "user",
+ "message": {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": "First paragraph of the prompt."},
+ {"type": "text", "text": "Second paragraph continues here."},
+ ],
+ },
+ }
+ path = _write_transcript(tmp_path, [entry])
+ result = capture_stop._extract_last_user_prompt(path)
+ assert "First paragraph" in result
+ assert "Second paragraph" in result
+
+ def test_empty_transcript(self, tmp_path):
+ path = _write_transcript(tmp_path, [])
+ result = capture_stop._extract_last_user_prompt(path)
+ assert result == ""
+
+ def test_missing_file(self):
+ result = capture_stop._extract_last_user_prompt("/nonexistent/path.jsonl")
+ assert result == ""
+
+ def test_empty_path(self):
+ result = capture_stop._extract_last_user_prompt("")
+ assert result == ""
+
+
+# ---------------------------------------------------------------------------
+# _infer_project
+# ---------------------------------------------------------------------------
+
+class TestInferProject:
+ def test_empty_cwd(self):
+ assert capture_stop._infer_project("") == ""
+
+ def test_unknown_path(self):
+ assert capture_stop._infer_project("C:\\Users\\antoi\\random") == ""
+
+ def test_mapped_path(self):
+ with mock.patch.dict(capture_stop._PROJECT_PATH_MAP, {
+ "C:\\Users\\antoi\\gigabit": "p04-gigabit",
+ }):
+ result = capture_stop._infer_project("C:\\Users\\antoi\\gigabit\\src")
+ assert result == "p04-gigabit"
+
+
+# ---------------------------------------------------------------------------
+# _capture (integration-style, mocking HTTP)
+# ---------------------------------------------------------------------------
+
+class TestCapture:
+ def _hook_input(self, *, transcript_path: str = "", **overrides) -> str:
+ data = {
+ "session_id": "test-session-123",
+ "transcript_path": transcript_path,
+ "cwd": "C:\\Users\\antoi\\ATOCore",
+ "permission_mode": "default",
+ "hook_event_name": "Stop",
+ "last_assistant_message": "Here is the answer to your question about the code.",
+ "turn_number": 3,
+ }
+ data.update(overrides)
+ return json.dumps(data)
+
+ @mock.patch("capture_stop.urllib.request.urlopen")
+ def test_posts_to_atocore(self, mock_urlopen, tmp_path):
+ transcript = _write_transcript(tmp_path, [
+ _user_entry("Please explain how the backup system works in detail"),
+ _assistant_entry(),
+ ])
+ mock_resp = mock.MagicMock()
+ mock_resp.read.return_value = json.dumps({"id": "int-001", "status": "recorded"}).encode()
+ mock_urlopen.return_value = mock_resp
+
+ with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
+ capture_stop._capture()
+
+ mock_urlopen.assert_called_once()
+ req = mock_urlopen.call_args[0][0]
+ body = json.loads(req.data.decode())
+ assert body["prompt"] == "Please explain how the backup system works in detail"
+ assert body["client"] == "claude-code"
+ assert body["session_id"] == "test-session-123"
+ assert body["reinforce"] is False
+
+ @mock.patch("capture_stop.urllib.request.urlopen")
+ def test_skips_when_disabled(self, mock_urlopen, tmp_path):
+ transcript = _write_transcript(tmp_path, [
+ _user_entry("A prompt that would normally be captured"),
+ ])
+ with mock.patch.dict(os.environ, {"ATOCORE_CAPTURE_DISABLED": "1"}):
+ with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
+ capture_stop._capture()
+ mock_urlopen.assert_not_called()
+
+ @mock.patch("capture_stop.urllib.request.urlopen")
+ def test_skips_short_prompt(self, mock_urlopen, tmp_path):
+ transcript = _write_transcript(tmp_path, [
+ _user_entry("yes"),
+ ])
+ with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
+ capture_stop._capture()
+ mock_urlopen.assert_not_called()
+
+ @mock.patch("capture_stop.urllib.request.urlopen")
+ def test_truncates_long_response(self, mock_urlopen, tmp_path):
+ transcript = _write_transcript(tmp_path, [
+ _user_entry("Tell me everything about the entire codebase architecture"),
+ ])
+ long_response = "x" * 60_000
+ mock_resp = mock.MagicMock()
+ mock_resp.read.return_value = json.dumps({"id": "int-002"}).encode()
+ mock_urlopen.return_value = mock_resp
+
+ with mock.patch("sys.stdin", StringIO(
+ self._hook_input(transcript_path=transcript, last_assistant_message=long_response)
+ )):
+ capture_stop._capture()
+
+ req = mock_urlopen.call_args[0][0]
+ body = json.loads(req.data.decode())
+ assert len(body["response"]) <= capture_stop.MAX_RESPONSE_LENGTH + 20
+ assert body["response"].endswith("[truncated]")
+
+ def test_main_never_raises(self):
+ """main() must always exit 0, even on garbage input."""
+ with mock.patch("sys.stdin", StringIO("not json at all")):
+ # Should not raise
+ capture_stop.main()
+
+ @mock.patch("capture_stop.urllib.request.urlopen")
+ def test_uses_atocore_url_env(self, mock_urlopen, tmp_path):
+ transcript = _write_transcript(tmp_path, [
+ _user_entry("Please help me with this particular problem in the code"),
+ ])
+ mock_resp = mock.MagicMock()
+ mock_resp.read.return_value = json.dumps({"id": "int-003"}).encode()
+ mock_urlopen.return_value = mock_resp
+
+ with mock.patch.dict(os.environ, {"ATOCORE_URL": "http://localhost:9999"}):
+ # Re-read the env var
+ with mock.patch.object(capture_stop, "ATOCORE_URL", "http://localhost:9999"):
+ with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
+ capture_stop._capture()
+
+ req = mock_urlopen.call_args[0][0]
+ assert req.full_url == "http://localhost:9999/interactions"
diff --git a/tests/test_reinforcement.py b/tests/test_reinforcement.py
index 7537fa4..9d3832b 100644
--- a/tests/test_reinforcement.py
+++ b/tests/test_reinforcement.py
@@ -6,6 +6,8 @@ from atocore.interactions.service import record_interaction
from atocore.main import app
from atocore.memory.reinforcement import (
DEFAULT_CONFIDENCE_DELTA,
+ _stem,
+ _tokenize,
reinforce_from_interaction,
)
from atocore.memory.service import (
@@ -373,3 +375,118 @@ def test_get_memories_filter_by_alias(project_registry):
assert len(via_alias) == 2
assert len(via_canonical) == 2
assert {m.content for m in via_alias} == {"m1", "m2"}
+
+
+# --- token-overlap matcher: unit tests -------------------------------------
+
+
+def test_stem_folds_s_ed_ing():
+ assert _stem("prefers") == "prefer"
+ assert _stem("preferred") == "prefer"
+ assert _stem("services") == "service"
+ assert _stem("processing") == "process"
+ # Short words must not be over-stripped
+ assert _stem("red") == "red" # 3 chars, don't strip "ed"
+ assert _stem("bus") == "bus" # 3 chars, don't strip "s"
+ assert _stem("sing") == "sing" # 4 chars, don't strip "ing"
+ assert _stem("being") == "being" # 5 chars, "ing" strip leaves "be" (2) — too short
+
+
+def test_tokenize_removes_stop_words():
+ tokens = _tokenize("the quick brown fox jumps over the lazy dog")
+ assert "the" not in tokens
+ assert "quick" in tokens
+ assert "brown" in tokens
+ assert "fox" in tokens
+ assert "dog" in tokens
+ # "over" has len 4, not a stop word → kept (stemmed: "over")
+ assert "over" in tokens
+
+
+# --- token-overlap matcher: paraphrase matching ----------------------------
+
+
+def test_reinforce_matches_paraphrase_prefers_vs_prefer(tmp_data_dir):
+ """The canonical rebase case from phase9-first-real-use.md."""
+ init_db()
+ mem = create_memory(
+ memory_type="preference",
+ content="prefers rebase-based workflows because history stays linear",
+ confidence=0.5,
+ )
+ interaction = _make_interaction(
+ response=(
+ "I prefer rebase-based workflows because the history stays "
+ "linear and reviewers have an easier time."
+ ),
+ )
+ results = reinforce_from_interaction(interaction)
+ assert any(r.memory_id == mem.id for r in results)
+
+
+def test_reinforce_matches_paraphrase_with_articles_and_ed(tmp_data_dir):
+ init_db()
+ mem = create_memory(
+ memory_type="preference",
+ content="preferred structured logging across all backend services",
+ confidence=0.5,
+ )
+ interaction = _make_interaction(
+ response=(
+ "I set up structured logging across all the backend services, "
+ "which the team prefers for consistency."
+ ),
+ )
+ results = reinforce_from_interaction(interaction)
+ assert any(r.memory_id == mem.id for r in results)
+
+
+def test_reinforce_rejects_low_overlap(tmp_data_dir):
+ init_db()
+ mem = create_memory(
+ memory_type="preference",
+ content="always uses Python for data processing scripts",
+ confidence=0.5,
+ )
+ interaction = _make_interaction(
+ response=(
+ "The CI pipeline runs on Node.js and deploys to Kubernetes "
+ "using Helm charts."
+ ),
+ )
+ results = reinforce_from_interaction(interaction)
+ assert all(r.memory_id != mem.id for r in results)
+
+
+def test_reinforce_matches_at_70_percent_threshold(tmp_data_dir):
+ """Exactly 7 of 10 content tokens present → should match."""
+ init_db()
+ # After stop-word removal and stemming, this has 10 tokens:
+ # alpha, bravo, charlie, delta, echo, foxtrot, golf, hotel, india, juliet
+ mem = create_memory(
+ memory_type="preference",
+ content="alpha bravo charlie delta echo foxtrot golf hotel india juliet",
+ confidence=0.5,
+ )
+ # Echo 7 of 10 tokens (70%) plus some noise
+ interaction = _make_interaction(
+ response="alpha bravo charlie delta echo foxtrot golf noise words here",
+ )
+ results = reinforce_from_interaction(interaction)
+ assert any(r.memory_id == mem.id for r in results)
+
+
+def test_reinforce_rejects_below_70_percent(tmp_data_dir):
+ """Only 6 of 10 content tokens present (60%) → should NOT match."""
+ init_db()
+ mem = create_memory(
+ memory_type="preference",
+ content="alpha bravo charlie delta echo foxtrot golf hotel india juliet",
+ confidence=0.5,
+ )
+ # Echo 6 of 10 tokens (60%) plus noise
+ interaction = _make_interaction(
+ response="alpha bravo charlie delta echo foxtrot noise words here only",
+ )
+ results = reinforce_from_interaction(interaction)
+ assert all(r.memory_id != mem.id for r in results)