Merge hardening sprint: reinforcement matcher + backup ops

- Task A: token-overlap reinforcement matcher (fixes broken substring matching)
- Task B: automatic post-backup validation
- Task C: backup retention cleanup with CLI subcommand

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-11 10:02:35 -04:00
9 changed files with 1008 additions and 36 deletions

View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""Claude Code Stop hook: capture interaction to AtoCore.
Reads the Stop hook JSON from stdin, extracts the last user prompt
from the transcript JSONL, and POSTs to the AtoCore /interactions
endpoint in conservative mode (reinforce=false, no extraction).
Fail-open: always exits 0, logs errors to stderr only.
Environment variables:
ATOCORE_URL Base URL of the AtoCore instance (default: http://dalidou:8100)
ATOCORE_CAPTURE_DISABLED Set to "1" to disable capture (kill switch)
Usage in ~/.claude/settings.json:
"Stop": [{
"matcher": "",
"hooks": [{
"type": "command",
"command": "python /path/to/capture_stop.py",
"timeout": 15
}]
}]
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.request
ATOCORE_URL = os.environ.get("ATOCORE_URL", "http://dalidou:8100")
TIMEOUT_SECONDS = 10
# Minimum prompt length to bother capturing. Single-word acks,
# slash commands, and empty lines aren't useful interactions.
MIN_PROMPT_LENGTH = 15
# Maximum response length to capture. Truncate very long assistant
# responses to keep the interactions table manageable.
MAX_RESPONSE_LENGTH = 50_000
def main() -> None:
"""Entry point. Always exits 0."""
try:
_capture()
except Exception as exc:
print(f"capture_stop: {exc}", file=sys.stderr)
def _capture() -> None:
if os.environ.get("ATOCORE_CAPTURE_DISABLED") == "1":
return
raw = sys.stdin.read()
if not raw.strip():
return
hook_data = json.loads(raw)
session_id = hook_data.get("session_id", "")
assistant_message = hook_data.get("last_assistant_message", "")
transcript_path = hook_data.get("transcript_path", "")
cwd = hook_data.get("cwd", "")
prompt = _extract_last_user_prompt(transcript_path)
if not prompt or len(prompt.strip()) < MIN_PROMPT_LENGTH:
return
response = assistant_message or ""
if len(response) > MAX_RESPONSE_LENGTH:
response = response[:MAX_RESPONSE_LENGTH] + "\n\n[truncated]"
project = _infer_project(cwd)
payload = {
"prompt": prompt,
"response": response,
"client": "claude-code",
"session_id": session_id,
"project": project,
"reinforce": False,
}
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
req = urllib.request.Request(
f"{ATOCORE_URL}/interactions",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS)
result = json.loads(resp.read().decode("utf-8"))
print(
f"capture_stop: recorded interaction {result.get('id', '?')} "
f"(project={project or 'none'}, prompt_chars={len(prompt)}, "
f"response_chars={len(response)})",
file=sys.stderr,
)
def _extract_last_user_prompt(transcript_path: str) -> str:
"""Read the JSONL transcript and return the last real user prompt.
Skips meta messages (isMeta=True) and system/command messages
(content starting with '<').
"""
if not transcript_path:
return ""
# Normalize path for the current OS
path = os.path.normpath(transcript_path)
if not os.path.isfile(path):
return ""
last_prompt = ""
try:
with open(path, encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("type") != "user":
continue
if entry.get("isMeta", False):
continue
msg = entry.get("message", {})
if not isinstance(msg, dict):
continue
content = msg.get("content", "")
if isinstance(content, str):
text = content.strip()
elif isinstance(content, list):
# Content blocks: extract text blocks
parts = []
for block in content:
if isinstance(block, str):
parts.append(block)
elif isinstance(block, dict) and block.get("type") == "text":
parts.append(block.get("text", ""))
text = "\n".join(parts).strip()
else:
continue
# Skip system/command XML and very short messages
if text.startswith("<") or len(text) < MIN_PROMPT_LENGTH:
continue
last_prompt = text
except OSError:
pass
return last_prompt
# Project inference from working directory.
# Maps known repo paths to AtoCore project IDs. The user can extend
# this table or replace it with a registry lookup later.
_PROJECT_PATH_MAP: dict[str, str] = {
# Add mappings as needed, e.g.:
# "C:\\Users\\antoi\\gigabit": "p04-gigabit",
# "C:\\Users\\antoi\\interferometer": "p05-interferometer",
}
def _infer_project(cwd: str) -> str:
"""Try to map the working directory to an AtoCore project."""
if not cwd:
return ""
norm = os.path.normpath(cwd).lower()
for path_prefix, project_id in _PROJECT_PATH_MAP.items():
if norm.startswith(os.path.normpath(path_prefix).lower()):
return project_id
return ""
if __name__ == "__main__":
main()

View File

@@ -247,6 +247,18 @@ for i in 1 2 3 4 5 6 7 8 9 10; do
done
```
**Note on build_sha after restore:** The one-shot `docker compose run`
container does not carry the build provenance env vars that `deploy.sh`
exports at deploy time. After a restore, `/health` will report
`build_sha: "unknown"` until you re-run `deploy.sh` or manually
re-deploy. This is cosmetic — the data is correctly restored — but if
you need `build_sha` to be accurate, run a redeploy after the restore:
```bash
cd /srv/storage/atocore/app
bash deploy/dalidou/deploy.sh
```
### Post-restore verification
```bash

View File

@@ -244,12 +244,16 @@ This separation is healthy:
## Immediate Next Focus
1. Re-run the full backup/restore drill on Dalidou with the
Chroma bind-mount fix in place (end-to-end green, not the
partial pass from 2026-04-09)
2. Turn on auto-capture of Claude Code sessions in conservative
mode now that the restore path is trustworthy
3. Use the new T420-side organic routing layer in real OpenClaw workflows
1. ~~Re-run the full backup/restore drill~~ — DONE 2026-04-11,
full pass (db, registry, chroma, integrity all true)
2. ~~Turn on auto-capture of Claude Code sessions in conservative
mode~~ — DONE 2026-04-11, Stop hook wired via
`deploy/hooks/capture_stop.py``POST /interactions`
with `reinforce=false`; kill switch via
`ATOCORE_CAPTURE_DISABLED=1`
3. Run a short real-use pilot with auto-capture on, verify
interactions are landing in Dalidou, review quality
4. Use the new T420-side organic routing layer in real OpenClaw workflows
4. Tighten retrieval quality for the now fully ingested active project corpora
5. Move to Wave 2 trusted-operational ingestion instead of blindly widening raw corpus further
6. Keep the new engineering-knowledge architecture docs as implementation guidance while avoiding premature schema work

View File

@@ -20,24 +20,14 @@ This working list should be read alongside:
## Immediate Next Steps
1. Re-run the backup/restore drill on Dalidou with the Chroma
bind-mount fix in place
- the 2026-04-09 drill was a PARTIAL PASS: db restore + marker
reversal worked cleanly, but the Chroma step failed with
`OSError [Errno 16] Device or resource busy` because
`shutil.rmtree` cannot unlink a Docker bind-mounted volume
- fix landed immediately after: `restore_runtime_backup()` now
clears the destination's CONTENTS and uses
`copytree(dirs_exist_ok=True)`, and the regression test
`test_restore_chroma_does_not_unlink_destination_directory`
asserts the destination inode is stable
- need a green end-to-end run with `--chroma` actually
working in-container before enabling write-path automation
2. Turn on auto-capture of Claude Code sessions once the drill
re-run is clean
- conservative mode: Stop hook posts to `/interactions`,
no auto-extraction into review queue without review cadence
in place
1. ~~Re-run the backup/restore drill~~ — DONE 2026-04-11, full pass
2. ~~Turn on auto-capture of Claude Code sessions~~ — DONE 2026-04-11,
Stop hook via `deploy/hooks/capture_stop.py``POST /interactions`
with `reinforce=false`; kill switch: `ATOCORE_CAPTURE_DISABLED=1`
2a. Run a short real-use pilot with auto-capture on
- verify interactions are landing in Dalidou
- check prompt/response quality and truncation
- confirm fail-open: no user-visible impact when Dalidou is down
3. Use the T420 `atocore-context` skill and the new organic routing layer in
real OpenClaw workflows
- confirm `auto-context` feels natural

View File

@@ -8,10 +8,11 @@ given memory, without ever promoting anything new into trusted state.
Design notes
------------
- Matching is intentionally simple and explainable:
* normalize both sides (lowercase, collapse whitespace)
* require the normalized memory content (or its first 80 chars) to
appear as a substring in the normalized response
- Matching uses token-overlap: tokenize both sides (lowercase, stem,
drop stop words), then check whether >= 70 % of the memory's content
tokens appear in the response token set. This handles natural
paraphrases (e.g. "prefers" vs "prefer", "because history" vs
"because the history") that substring matching missed.
- Candidates and invalidated memories are NEVER considered — reinforcement
must not revive history.
- Reinforcement is capped at 1.0 and monotonically non-decreasing.
@@ -43,9 +44,12 @@ log = get_logger("reinforcement")
# memories like "prefers Python".
_MIN_MEMORY_CONTENT_LENGTH = 12
# When a memory's content is very long, match on its leading window only
# to avoid punishing small paraphrases further into the body.
_MATCH_WINDOW_CHARS = 80
# Token-overlap matching constants.
_STOP_WORDS: frozenset[str] = frozenset({
"the", "a", "an", "and", "or", "of", "to", "is", "was",
"that", "this", "with", "for", "from", "into",
})
_MATCH_THRESHOLD = 0.70
DEFAULT_CONFIDENCE_DELTA = 0.02
@@ -144,12 +148,58 @@ def _normalize(text: str) -> str:
return collapsed.strip()
def _stem(word: str) -> str:
"""Aggressive suffix-folding so inflected forms collapse.
Handles trailing ``ing``, ``ed``, and ``s`` — good enough for
reinforcement matching without pulling in nltk/snowball.
"""
# Order matters: try longest suffix first.
if word.endswith("ing") and len(word) >= 6:
return word[:-3]
if word.endswith("ed") and len(word) > 4:
stem = word[:-2]
# "preferred" → "preferr" → "prefer" (doubled consonant before -ed)
if len(stem) >= 3 and stem[-1] == stem[-2]:
stem = stem[:-1]
return stem
if word.endswith("s") and len(word) > 3:
return word[:-1]
return word
def _tokenize(text: str) -> set[str]:
"""Split normalized text into a stemmed token set.
Strips punctuation, drops words shorter than 3 chars and stop words.
"""
tokens: set[str] = set()
for raw in text.split():
# Strip leading/trailing punctuation (commas, periods, quotes, etc.)
word = raw.strip(".,;:!?\"'()[]{}-/")
if len(word) < 3:
continue
if word in _STOP_WORDS:
continue
tokens.add(_stem(word))
return tokens
def _memory_matches(memory_content: str, normalized_response: str) -> bool:
"""Return True if the memory content appears in the response."""
"""Return True if enough of the memory's tokens appear in the response.
Uses token-overlap: tokenize both sides (lowercase, stem, drop stop
words), then check whether >= 70 % of the memory's content tokens
appear in the response token set.
"""
if not memory_content:
return False
normalized_memory = _normalize(memory_content)
if len(normalized_memory) < _MIN_MEMORY_CONTENT_LENGTH:
return False
window = normalized_memory[:_MATCH_WINDOW_CHARS]
return window in normalized_response
memory_tokens = _tokenize(normalized_memory)
if not memory_tokens:
return False
response_tokens = _tokenize(normalized_response)
overlap = memory_tokens & response_tokens
return len(overlap) / len(memory_tokens) >= _MATCH_THRESHOLD

View File

@@ -103,12 +103,27 @@ def create_runtime_backup(
encoding="utf-8",
)
# Automatic post-backup validation. Failures log a warning but do
# not raise — the backup files are still on disk and may be useful.
validation = validate_backup(stamp)
validated = validation.get("valid", False)
validation_errors = validation.get("errors", [])
if not validated:
log.warning(
"post_backup_validation_failed",
backup_root=str(backup_root),
errors=validation_errors,
)
metadata["validated"] = validated
metadata["validation_errors"] = validation_errors
log.info(
"runtime_backup_created",
backup_root=str(backup_root),
db_snapshot=str(db_snapshot_path),
chroma_included=include_chroma,
chroma_bytes=chroma_bytes_copied,
validated=validated,
)
return metadata
@@ -389,6 +404,113 @@ def restore_runtime_backup(
return result
def cleanup_old_backups(*, confirm: bool = False) -> dict:
"""Apply retention policy and remove old snapshots.
Retention keeps:
- Last 7 daily snapshots (most recent per calendar day)
- Last 4 weekly snapshots (most recent on each Sunday)
- Last 6 monthly snapshots (most recent on the 1st of each month)
All other snapshots are candidates for deletion. Runs as dry-run by
default; pass ``confirm=True`` to actually delete.
Returns a dict with kept/deleted counts and any errors.
"""
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
if not snapshots_root.exists() or not snapshots_root.is_dir():
return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []}
# Parse all stamp directories into (datetime, dir_path) pairs.
stamps: list[tuple[datetime, Path]] = []
unparseable: list[str] = []
for entry in sorted(snapshots_root.iterdir()):
if not entry.is_dir():
continue
try:
dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
stamps.append((dt, entry))
except ValueError:
unparseable.append(entry.name)
if not stamps:
return {
"kept": 0, "deleted": 0, "would_delete": 0,
"dry_run": not confirm, "errors": [],
"unparseable": unparseable,
}
# Sort newest first so "most recent per bucket" is a simple first-seen.
stamps.sort(key=lambda t: t[0], reverse=True)
keep_set: set[Path] = set()
# Last 7 daily: most recent snapshot per calendar day.
seen_days: set[str] = set()
for dt, path in stamps:
day_key = dt.strftime("%Y-%m-%d")
if day_key not in seen_days:
seen_days.add(day_key)
keep_set.add(path)
if len(seen_days) >= 7:
break
# Last 4 weekly: most recent snapshot that falls on a Sunday.
seen_weeks: set[str] = set()
for dt, path in stamps:
if dt.weekday() == 6: # Sunday
week_key = dt.strftime("%Y-W%W")
if week_key not in seen_weeks:
seen_weeks.add(week_key)
keep_set.add(path)
if len(seen_weeks) >= 4:
break
# Last 6 monthly: most recent snapshot on the 1st of a month.
seen_months: set[str] = set()
for dt, path in stamps:
if dt.day == 1:
month_key = dt.strftime("%Y-%m")
if month_key not in seen_months:
seen_months.add(month_key)
keep_set.add(path)
if len(seen_months) >= 6:
break
to_delete = [path for _, path in stamps if path not in keep_set]
errors: list[str] = []
deleted_count = 0
if confirm:
for path in to_delete:
try:
shutil.rmtree(path)
deleted_count += 1
except OSError as exc:
errors.append(f"{path.name}: {exc}")
result: dict = {
"kept": len(keep_set),
"dry_run": not confirm,
"errors": errors,
}
if confirm:
result["deleted"] = deleted_count
else:
result["would_delete"] = len(to_delete)
if unparseable:
result["unparseable"] = unparseable
log.info(
"cleanup_old_backups",
kept=len(keep_set),
deleted=deleted_count if confirm else 0,
would_delete=len(to_delete) if not confirm else 0,
dry_run=not confirm,
)
return result
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn = sqlite3.connect(str(source_path))
dest_conn = sqlite3.connect(str(dest_path))
@@ -448,6 +570,13 @@ def main() -> None:
p_validate = sub.add_parser("validate", help="validate a snapshot by stamp")
p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)")
p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy")
p_cleanup.add_argument(
"--confirm",
action="store_true",
help="actually delete (default is dry-run)",
)
p_restore = sub.add_parser(
"restore",
help="restore a snapshot by stamp (service must be stopped)",
@@ -488,6 +617,8 @@ def main() -> None:
result = {"backups": list_runtime_backups()}
elif command == "validate":
result = validate_backup(args.stamp)
elif command == "cleanup":
result = cleanup_old_backups(confirm=getattr(args, "confirm", False))
elif command == "restore":
result = restore_runtime_backup(
args.stamp,

View File

@@ -1,14 +1,15 @@
"""Tests for runtime backup creation and restore."""
"""Tests for runtime backup creation, restore, and retention cleanup."""
import json
import sqlite3
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
import pytest
import atocore.config as config
from atocore.models.database import init_db
from atocore.ops.backup import (
cleanup_old_backups,
create_runtime_backup,
list_runtime_backups,
restore_runtime_backup,
@@ -413,6 +414,56 @@ def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch):
config.settings = original_settings
def test_create_backup_includes_validation_fields(tmp_path, monkeypatch):
"""Task B: create_runtime_backup auto-validates and reports result."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
assert "validated" in result
assert "validation_errors" in result
assert result["validated"] is True
assert result["validation_errors"] == []
def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch):
"""Task B: if post-backup validation fails, backup still returns metadata."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
def _broken_validate(stamp):
return {"valid": False, "errors": ["db_missing", "metadata_missing"]}
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate)
result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
# Should NOT have raised — backup still returned metadata
assert result["validated"] is False
assert result["validation_errors"] == ["db_missing", "metadata_missing"]
# Core backup fields still present
assert "db_snapshot_path" in result
assert "created_at" in result
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
@@ -457,3 +508,183 @@ def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
)
finally:
config.settings = original_settings
# ---------------------------------------------------------------------------
# Task C: Backup retention cleanup
# ---------------------------------------------------------------------------
def _setup_cleanup_env(tmp_path, monkeypatch):
"""Helper: configure env, init db, return snapshots_root."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original = config.settings
config.settings = config.Settings()
init_db()
snapshots_root = config.settings.resolved_backup_dir / "snapshots"
snapshots_root.mkdir(parents=True, exist_ok=True)
return original, snapshots_root
def _seed_snapshots(snapshots_root, dates):
"""Create minimal valid snapshot dirs for the given datetimes."""
for dt in dates:
stamp = dt.strftime("%Y%m%dT%H%M%SZ")
snap_dir = snapshots_root / stamp
db_dir = snap_dir / "db"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "atocore.db"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)")
conn.close()
metadata = {
"created_at": dt.isoformat(),
"backup_root": str(snap_dir),
"db_snapshot_path": str(db_path),
"db_size_bytes": db_path.stat().st_size,
"registry_snapshot_path": "",
"chroma_snapshot_path": "",
"chroma_snapshot_bytes": 0,
"chroma_snapshot_files": 0,
"chroma_snapshot_included": False,
"vector_store_note": "",
}
(snap_dir / "backup-metadata.json").write_text(
json.dumps(metadata, indent=2) + "\n", encoding="utf-8"
)
def test_cleanup_empty_dir(tmp_path, monkeypatch):
original, _ = _setup_cleanup_env(tmp_path, monkeypatch)
try:
result = cleanup_old_backups()
assert result["kept"] == 0
assert result["would_delete"] == 0
assert result["dry_run"] is True
finally:
config.settings = original
def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch):
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly).
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(10)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups()
assert result["dry_run"] is True
# 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily.
# Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly.
# So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily.
# But Apr 5 is the 8th newest day from Apr 11... wait.
# Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily.
# Remaining: Apr 4,3,2. Apr 5 is already in daily.
# None of Apr 4,3,2 are Sunday or 1st → all 3 deleted.
assert result["kept"] == 7
assert result["would_delete"] == 3
assert len(list(snapshots_root.iterdir())) == 10
finally:
config.settings = original
def test_cleanup_confirm_deletes(tmp_path, monkeypatch):
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(10)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups(confirm=True)
assert result["dry_run"] is False
assert result["deleted"] == 3
assert result["kept"] == 7
assert len(list(snapshots_root.iterdir())) == 7
finally:
config.settings = original
def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch):
"""Exactly 7 snapshots on different days → all kept."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(7)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups()
assert result["kept"] == 7
assert result["would_delete"] == 0
finally:
config.settings = original
def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch):
"""Snapshots on Sundays outside the 7-day window are kept as weekly."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 7 daily snapshots covering Apr 5-11
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
daily = [base + timedelta(days=i) for i in range(7)]
# 2 older Sunday snapshots
sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday
sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday
# A non-Sunday old snapshot that should be deleted
wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday
_seed_snapshots(snapshots_root, daily + [sun1, sun2, wed])
result = cleanup_old_backups()
# 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted
assert result["kept"] == 9
assert result["would_delete"] == 1
finally:
config.settings = original
def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch):
"""Snapshots on the 1st of a month outside daily+weekly are kept as monthly."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 7 daily in April 2026
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
daily = [base + timedelta(days=i) for i in range(7)]
# Old monthly 1st snapshots
m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC)
m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC)
# Old non-1st, non-Sunday snapshot — should be deleted
old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC)
_seed_snapshots(snapshots_root, daily + [m1, m2, old])
result = cleanup_old_backups()
# 7 daily + 2 monthly = 9 kept, 1 deleted
assert result["kept"] == 9
assert result["would_delete"] == 1
finally:
config.settings = original
def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch):
"""Directories with unparseable names are ignored, not deleted."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
_seed_snapshots(snapshots_root, [base])
bad_dir = snapshots_root / "not-a-timestamp"
bad_dir.mkdir()
result = cleanup_old_backups(confirm=True)
assert result.get("unparseable") == ["not-a-timestamp"]
assert bad_dir.exists()
assert result["kept"] == 1
finally:
config.settings = original

249
tests/test_capture_stop.py Normal file
View File

@@ -0,0 +1,249 @@
"""Tests for deploy/hooks/capture_stop.py — Claude Code Stop hook."""
from __future__ import annotations
import json
import os
import sys
import tempfile
import textwrap
from io import StringIO
from pathlib import Path
from unittest import mock
import pytest
# The hook script lives outside of the normal package tree, so import
# it by manipulating sys.path.
_HOOK_DIR = str(Path(__file__).resolve().parent.parent / "deploy" / "hooks")
if _HOOK_DIR not in sys.path:
sys.path.insert(0, _HOOK_DIR)
import capture_stop # noqa: E402
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _write_transcript(tmp: Path, entries: list[dict]) -> str:
"""Write a JSONL transcript and return the path."""
path = tmp / "transcript.jsonl"
with open(path, "w", encoding="utf-8") as f:
for entry in entries:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
return str(path)
def _user_entry(content: str, *, is_meta: bool = False) -> dict:
return {
"type": "user",
"isMeta": is_meta,
"message": {"role": "user", "content": content},
}
def _assistant_entry() -> dict:
return {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Sure, here's the answer."}],
},
}
def _system_entry() -> dict:
return {"type": "system", "message": {"role": "system", "content": "system init"}}
# ---------------------------------------------------------------------------
# _extract_last_user_prompt
# ---------------------------------------------------------------------------
class TestExtractLastUserPrompt:
def test_returns_last_real_prompt(self, tmp_path):
path = _write_transcript(tmp_path, [
_user_entry("First prompt that is long enough to capture"),
_assistant_entry(),
_user_entry("Second prompt that should be the one we capture"),
_assistant_entry(),
])
result = capture_stop._extract_last_user_prompt(path)
assert result == "Second prompt that should be the one we capture"
def test_skips_meta_messages(self, tmp_path):
path = _write_transcript(tmp_path, [
_user_entry("Real prompt that is definitely long enough"),
_user_entry("<local-command>some system stuff</local-command>"),
_user_entry("Meta message that looks real enough", is_meta=True),
])
result = capture_stop._extract_last_user_prompt(path)
assert result == "Real prompt that is definitely long enough"
def test_skips_xml_content(self, tmp_path):
path = _write_transcript(tmp_path, [
_user_entry("Actual prompt from a real human user"),
_user_entry("<command-name>/help</command-name>"),
])
result = capture_stop._extract_last_user_prompt(path)
assert result == "Actual prompt from a real human user"
def test_skips_short_messages(self, tmp_path):
path = _write_transcript(tmp_path, [
_user_entry("This prompt is long enough to be captured"),
_user_entry("yes"), # too short
])
result = capture_stop._extract_last_user_prompt(path)
assert result == "This prompt is long enough to be captured"
def test_handles_content_blocks(self, tmp_path):
entry = {
"type": "user",
"message": {
"role": "user",
"content": [
{"type": "text", "text": "First paragraph of the prompt."},
{"type": "text", "text": "Second paragraph continues here."},
],
},
}
path = _write_transcript(tmp_path, [entry])
result = capture_stop._extract_last_user_prompt(path)
assert "First paragraph" in result
assert "Second paragraph" in result
def test_empty_transcript(self, tmp_path):
path = _write_transcript(tmp_path, [])
result = capture_stop._extract_last_user_prompt(path)
assert result == ""
def test_missing_file(self):
result = capture_stop._extract_last_user_prompt("/nonexistent/path.jsonl")
assert result == ""
def test_empty_path(self):
result = capture_stop._extract_last_user_prompt("")
assert result == ""
# ---------------------------------------------------------------------------
# _infer_project
# ---------------------------------------------------------------------------
class TestInferProject:
def test_empty_cwd(self):
assert capture_stop._infer_project("") == ""
def test_unknown_path(self):
assert capture_stop._infer_project("C:\\Users\\antoi\\random") == ""
def test_mapped_path(self):
with mock.patch.dict(capture_stop._PROJECT_PATH_MAP, {
"C:\\Users\\antoi\\gigabit": "p04-gigabit",
}):
result = capture_stop._infer_project("C:\\Users\\antoi\\gigabit\\src")
assert result == "p04-gigabit"
# ---------------------------------------------------------------------------
# _capture (integration-style, mocking HTTP)
# ---------------------------------------------------------------------------
class TestCapture:
def _hook_input(self, *, transcript_path: str = "", **overrides) -> str:
data = {
"session_id": "test-session-123",
"transcript_path": transcript_path,
"cwd": "C:\\Users\\antoi\\ATOCore",
"permission_mode": "default",
"hook_event_name": "Stop",
"last_assistant_message": "Here is the answer to your question about the code.",
"turn_number": 3,
}
data.update(overrides)
return json.dumps(data)
@mock.patch("capture_stop.urllib.request.urlopen")
def test_posts_to_atocore(self, mock_urlopen, tmp_path):
transcript = _write_transcript(tmp_path, [
_user_entry("Please explain how the backup system works in detail"),
_assistant_entry(),
])
mock_resp = mock.MagicMock()
mock_resp.read.return_value = json.dumps({"id": "int-001", "status": "recorded"}).encode()
mock_urlopen.return_value = mock_resp
with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
capture_stop._capture()
mock_urlopen.assert_called_once()
req = mock_urlopen.call_args[0][0]
body = json.loads(req.data.decode())
assert body["prompt"] == "Please explain how the backup system works in detail"
assert body["client"] == "claude-code"
assert body["session_id"] == "test-session-123"
assert body["reinforce"] is False
@mock.patch("capture_stop.urllib.request.urlopen")
def test_skips_when_disabled(self, mock_urlopen, tmp_path):
transcript = _write_transcript(tmp_path, [
_user_entry("A prompt that would normally be captured"),
])
with mock.patch.dict(os.environ, {"ATOCORE_CAPTURE_DISABLED": "1"}):
with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
capture_stop._capture()
mock_urlopen.assert_not_called()
@mock.patch("capture_stop.urllib.request.urlopen")
def test_skips_short_prompt(self, mock_urlopen, tmp_path):
transcript = _write_transcript(tmp_path, [
_user_entry("yes"),
])
with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
capture_stop._capture()
mock_urlopen.assert_not_called()
@mock.patch("capture_stop.urllib.request.urlopen")
def test_truncates_long_response(self, mock_urlopen, tmp_path):
transcript = _write_transcript(tmp_path, [
_user_entry("Tell me everything about the entire codebase architecture"),
])
long_response = "x" * 60_000
mock_resp = mock.MagicMock()
mock_resp.read.return_value = json.dumps({"id": "int-002"}).encode()
mock_urlopen.return_value = mock_resp
with mock.patch("sys.stdin", StringIO(
self._hook_input(transcript_path=transcript, last_assistant_message=long_response)
)):
capture_stop._capture()
req = mock_urlopen.call_args[0][0]
body = json.loads(req.data.decode())
assert len(body["response"]) <= capture_stop.MAX_RESPONSE_LENGTH + 20
assert body["response"].endswith("[truncated]")
def test_main_never_raises(self):
"""main() must always exit 0, even on garbage input."""
with mock.patch("sys.stdin", StringIO("not json at all")):
# Should not raise
capture_stop.main()
@mock.patch("capture_stop.urllib.request.urlopen")
def test_uses_atocore_url_env(self, mock_urlopen, tmp_path):
transcript = _write_transcript(tmp_path, [
_user_entry("Please help me with this particular problem in the code"),
])
mock_resp = mock.MagicMock()
mock_resp.read.return_value = json.dumps({"id": "int-003"}).encode()
mock_urlopen.return_value = mock_resp
with mock.patch.dict(os.environ, {"ATOCORE_URL": "http://localhost:9999"}):
# Re-read the env var
with mock.patch.object(capture_stop, "ATOCORE_URL", "http://localhost:9999"):
with mock.patch("sys.stdin", StringIO(self._hook_input(transcript_path=transcript))):
capture_stop._capture()
req = mock_urlopen.call_args[0][0]
assert req.full_url == "http://localhost:9999/interactions"

View File

@@ -6,6 +6,8 @@ from atocore.interactions.service import record_interaction
from atocore.main import app
from atocore.memory.reinforcement import (
DEFAULT_CONFIDENCE_DELTA,
_stem,
_tokenize,
reinforce_from_interaction,
)
from atocore.memory.service import (
@@ -373,3 +375,118 @@ def test_get_memories_filter_by_alias(project_registry):
assert len(via_alias) == 2
assert len(via_canonical) == 2
assert {m.content for m in via_alias} == {"m1", "m2"}
# --- token-overlap matcher: unit tests -------------------------------------
def test_stem_folds_s_ed_ing():
assert _stem("prefers") == "prefer"
assert _stem("preferred") == "prefer"
assert _stem("services") == "service"
assert _stem("processing") == "process"
# Short words must not be over-stripped
assert _stem("red") == "red" # 3 chars, don't strip "ed"
assert _stem("bus") == "bus" # 3 chars, don't strip "s"
assert _stem("sing") == "sing" # 4 chars, don't strip "ing"
assert _stem("being") == "being" # 5 chars, "ing" strip leaves "be" (2) — too short
def test_tokenize_removes_stop_words():
tokens = _tokenize("the quick brown fox jumps over the lazy dog")
assert "the" not in tokens
assert "quick" in tokens
assert "brown" in tokens
assert "fox" in tokens
assert "dog" in tokens
# "over" has len 4, not a stop word → kept (stemmed: "over")
assert "over" in tokens
# --- token-overlap matcher: paraphrase matching ----------------------------
def test_reinforce_matches_paraphrase_prefers_vs_prefer(tmp_data_dir):
"""The canonical rebase case from phase9-first-real-use.md."""
init_db()
mem = create_memory(
memory_type="preference",
content="prefers rebase-based workflows because history stays linear",
confidence=0.5,
)
interaction = _make_interaction(
response=(
"I prefer rebase-based workflows because the history stays "
"linear and reviewers have an easier time."
),
)
results = reinforce_from_interaction(interaction)
assert any(r.memory_id == mem.id for r in results)
def test_reinforce_matches_paraphrase_with_articles_and_ed(tmp_data_dir):
init_db()
mem = create_memory(
memory_type="preference",
content="preferred structured logging across all backend services",
confidence=0.5,
)
interaction = _make_interaction(
response=(
"I set up structured logging across all the backend services, "
"which the team prefers for consistency."
),
)
results = reinforce_from_interaction(interaction)
assert any(r.memory_id == mem.id for r in results)
def test_reinforce_rejects_low_overlap(tmp_data_dir):
init_db()
mem = create_memory(
memory_type="preference",
content="always uses Python for data processing scripts",
confidence=0.5,
)
interaction = _make_interaction(
response=(
"The CI pipeline runs on Node.js and deploys to Kubernetes "
"using Helm charts."
),
)
results = reinforce_from_interaction(interaction)
assert all(r.memory_id != mem.id for r in results)
def test_reinforce_matches_at_70_percent_threshold(tmp_data_dir):
"""Exactly 7 of 10 content tokens present → should match."""
init_db()
# After stop-word removal and stemming, this has 10 tokens:
# alpha, bravo, charlie, delta, echo, foxtrot, golf, hotel, india, juliet
mem = create_memory(
memory_type="preference",
content="alpha bravo charlie delta echo foxtrot golf hotel india juliet",
confidence=0.5,
)
# Echo 7 of 10 tokens (70%) plus some noise
interaction = _make_interaction(
response="alpha bravo charlie delta echo foxtrot golf noise words here",
)
results = reinforce_from_interaction(interaction)
assert any(r.memory_id == mem.id for r in results)
def test_reinforce_rejects_below_70_percent(tmp_data_dir):
"""Only 6 of 10 content tokens present (60%) → should NOT match."""
init_db()
mem = create_memory(
memory_type="preference",
content="alpha bravo charlie delta echo foxtrot golf hotel india juliet",
confidence=0.5,
)
# Echo 6 of 10 tokens (60%) plus noise
interaction = _make_interaction(
response="alpha bravo charlie delta echo foxtrot noise words here only",
)
results = reinforce_from_interaction(interaction)
assert all(r.memory_id != mem.id for r in results)