2 Commits

Author SHA1 Message Date
58c744fd2f feat: post-backup validation + retention cleanup (Tasks B & C)
- create_runtime_backup() now auto-validates its output and includes
  validated/validation_errors fields in returned metadata
- New cleanup_old_backups() with retention policy: 7 daily, 4 weekly
  (Sundays), 6 monthly (1st of month), dry-run by default
- CLI `cleanup` subcommand added to backup module
- 9 new tests (2 validation + 7 retention), 259 total passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-11 09:46:46 -04:00
a34a7a995f fix: token-overlap matcher for reinforcement (Phase 9B)
Replace the substring-based _memory_matches() with a token-overlap
matcher that tokenizes both memory content and response, applies
lightweight stemming (trailing s/ed/ing) and stop-word removal, then
checks whether >= 70% of the memory's tokens appear in the response.

This fixes the paraphrase blindness that prevented reinforcement from
ever firing on natural responses ("prefers" vs "prefer", "because
history" vs "because the history").

7 new tests (26 total reinforcement tests, all passing).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-11 09:40:05 -04:00
4 changed files with 541 additions and 12 deletions

View File

@@ -8,10 +8,11 @@ given memory, without ever promoting anything new into trusted state.
Design notes
------------
- Matching is intentionally simple and explainable:
* normalize both sides (lowercase, collapse whitespace)
* require the normalized memory content (or its first 80 chars) to
appear as a substring in the normalized response
- Matching uses token-overlap: tokenize both sides (lowercase, stem,
drop stop words), then check whether >= 70 % of the memory's content
tokens appear in the response token set. This handles natural
paraphrases (e.g. "prefers" vs "prefer", "because history" vs
"because the history") that substring matching missed.
- Candidates and invalidated memories are NEVER considered — reinforcement
must not revive history.
- Reinforcement is capped at 1.0 and monotonically non-decreasing.
@@ -43,9 +44,12 @@ log = get_logger("reinforcement")
# memories like "prefers Python".
_MIN_MEMORY_CONTENT_LENGTH = 12
# When a memory's content is very long, match on its leading window only
# to avoid punishing small paraphrases further into the body.
_MATCH_WINDOW_CHARS = 80
# Token-overlap matching constants.
_STOP_WORDS: frozenset[str] = frozenset({
"the", "a", "an", "and", "or", "of", "to", "is", "was",
"that", "this", "with", "for", "from", "into",
})
_MATCH_THRESHOLD = 0.70
DEFAULT_CONFIDENCE_DELTA = 0.02
@@ -144,12 +148,58 @@ def _normalize(text: str) -> str:
return collapsed.strip()
def _stem(word: str) -> str:
"""Aggressive suffix-folding so inflected forms collapse.
Handles trailing ``ing``, ``ed``, and ``s`` — good enough for
reinforcement matching without pulling in nltk/snowball.
"""
# Order matters: try longest suffix first.
if word.endswith("ing") and len(word) >= 6:
return word[:-3]
if word.endswith("ed") and len(word) > 4:
stem = word[:-2]
# "preferred" → "preferr" → "prefer" (doubled consonant before -ed)
if len(stem) >= 3 and stem[-1] == stem[-2]:
stem = stem[:-1]
return stem
if word.endswith("s") and len(word) > 3:
return word[:-1]
return word
def _tokenize(text: str) -> set[str]:
"""Split normalized text into a stemmed token set.
Strips punctuation, drops words shorter than 3 chars and stop words.
"""
tokens: set[str] = set()
for raw in text.split():
# Strip leading/trailing punctuation (commas, periods, quotes, etc.)
word = raw.strip(".,;:!?\"'()[]{}-/")
if len(word) < 3:
continue
if word in _STOP_WORDS:
continue
tokens.add(_stem(word))
return tokens
def _memory_matches(memory_content: str, normalized_response: str) -> bool:
"""Return True if the memory content appears in the response."""
"""Return True if enough of the memory's tokens appear in the response.
Uses token-overlap: tokenize both sides (lowercase, stem, drop stop
words), then check whether >= 70 % of the memory's content tokens
appear in the response token set.
"""
if not memory_content:
return False
normalized_memory = _normalize(memory_content)
if len(normalized_memory) < _MIN_MEMORY_CONTENT_LENGTH:
return False
window = normalized_memory[:_MATCH_WINDOW_CHARS]
return window in normalized_response
memory_tokens = _tokenize(normalized_memory)
if not memory_tokens:
return False
response_tokens = _tokenize(normalized_response)
overlap = memory_tokens & response_tokens
return len(overlap) / len(memory_tokens) >= _MATCH_THRESHOLD

View File

@@ -103,12 +103,27 @@ def create_runtime_backup(
encoding="utf-8",
)
# Automatic post-backup validation. Failures log a warning but do
# not raise — the backup files are still on disk and may be useful.
validation = validate_backup(stamp)
validated = validation.get("valid", False)
validation_errors = validation.get("errors", [])
if not validated:
log.warning(
"post_backup_validation_failed",
backup_root=str(backup_root),
errors=validation_errors,
)
metadata["validated"] = validated
metadata["validation_errors"] = validation_errors
log.info(
"runtime_backup_created",
backup_root=str(backup_root),
db_snapshot=str(db_snapshot_path),
chroma_included=include_chroma,
chroma_bytes=chroma_bytes_copied,
validated=validated,
)
return metadata
@@ -389,6 +404,113 @@ def restore_runtime_backup(
return result
def cleanup_old_backups(*, confirm: bool = False) -> dict:
"""Apply retention policy and remove old snapshots.
Retention keeps:
- Last 7 daily snapshots (most recent per calendar day)
- Last 4 weekly snapshots (most recent on each Sunday)
- Last 6 monthly snapshots (most recent on the 1st of each month)
All other snapshots are candidates for deletion. Runs as dry-run by
default; pass ``confirm=True`` to actually delete.
Returns a dict with kept/deleted counts and any errors.
"""
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
if not snapshots_root.exists() or not snapshots_root.is_dir():
return {"kept": 0, "deleted": 0, "would_delete": 0, "dry_run": not confirm, "errors": []}
# Parse all stamp directories into (datetime, dir_path) pairs.
stamps: list[tuple[datetime, Path]] = []
unparseable: list[str] = []
for entry in sorted(snapshots_root.iterdir()):
if not entry.is_dir():
continue
try:
dt = datetime.strptime(entry.name, "%Y%m%dT%H%M%SZ").replace(tzinfo=UTC)
stamps.append((dt, entry))
except ValueError:
unparseable.append(entry.name)
if not stamps:
return {
"kept": 0, "deleted": 0, "would_delete": 0,
"dry_run": not confirm, "errors": [],
"unparseable": unparseable,
}
# Sort newest first so "most recent per bucket" is a simple first-seen.
stamps.sort(key=lambda t: t[0], reverse=True)
keep_set: set[Path] = set()
# Last 7 daily: most recent snapshot per calendar day.
seen_days: set[str] = set()
for dt, path in stamps:
day_key = dt.strftime("%Y-%m-%d")
if day_key not in seen_days:
seen_days.add(day_key)
keep_set.add(path)
if len(seen_days) >= 7:
break
# Last 4 weekly: most recent snapshot that falls on a Sunday.
seen_weeks: set[str] = set()
for dt, path in stamps:
if dt.weekday() == 6: # Sunday
week_key = dt.strftime("%Y-W%W")
if week_key not in seen_weeks:
seen_weeks.add(week_key)
keep_set.add(path)
if len(seen_weeks) >= 4:
break
# Last 6 monthly: most recent snapshot on the 1st of a month.
seen_months: set[str] = set()
for dt, path in stamps:
if dt.day == 1:
month_key = dt.strftime("%Y-%m")
if month_key not in seen_months:
seen_months.add(month_key)
keep_set.add(path)
if len(seen_months) >= 6:
break
to_delete = [path for _, path in stamps if path not in keep_set]
errors: list[str] = []
deleted_count = 0
if confirm:
for path in to_delete:
try:
shutil.rmtree(path)
deleted_count += 1
except OSError as exc:
errors.append(f"{path.name}: {exc}")
result: dict = {
"kept": len(keep_set),
"dry_run": not confirm,
"errors": errors,
}
if confirm:
result["deleted"] = deleted_count
else:
result["would_delete"] = len(to_delete)
if unparseable:
result["unparseable"] = unparseable
log.info(
"cleanup_old_backups",
kept=len(keep_set),
deleted=deleted_count if confirm else 0,
would_delete=len(to_delete) if not confirm else 0,
dry_run=not confirm,
)
return result
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn = sqlite3.connect(str(source_path))
dest_conn = sqlite3.connect(str(dest_path))
@@ -448,6 +570,13 @@ def main() -> None:
p_validate = sub.add_parser("validate", help="validate a snapshot by stamp")
p_validate.add_argument("stamp", help="snapshot stamp (e.g. 20260409T010203Z)")
p_cleanup = sub.add_parser("cleanup", help="remove old snapshots per retention policy")
p_cleanup.add_argument(
"--confirm",
action="store_true",
help="actually delete (default is dry-run)",
)
p_restore = sub.add_parser(
"restore",
help="restore a snapshot by stamp (service must be stopped)",
@@ -488,6 +617,8 @@ def main() -> None:
result = {"backups": list_runtime_backups()}
elif command == "validate":
result = validate_backup(args.stamp)
elif command == "cleanup":
result = cleanup_old_backups(confirm=getattr(args, "confirm", False))
elif command == "restore":
result = restore_runtime_backup(
args.stamp,

View File

@@ -1,14 +1,15 @@
"""Tests for runtime backup creation and restore."""
"""Tests for runtime backup creation, restore, and retention cleanup."""
import json
import sqlite3
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
import pytest
import atocore.config as config
from atocore.models.database import init_db
from atocore.ops.backup import (
cleanup_old_backups,
create_runtime_backup,
list_runtime_backups,
restore_runtime_backup,
@@ -413,6 +414,56 @@ def test_restore_skips_pre_snapshot_when_requested(tmp_path, monkeypatch):
config.settings = original_settings
def test_create_backup_includes_validation_fields(tmp_path, monkeypatch):
"""Task B: create_runtime_backup auto-validates and reports result."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
result = create_runtime_backup(datetime(2026, 4, 11, 10, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
assert "validated" in result
assert "validation_errors" in result
assert result["validated"] is True
assert result["validation_errors"] == []
def test_create_backup_validation_failure_does_not_raise(tmp_path, monkeypatch):
"""Task B: if post-backup validation fails, backup still returns metadata."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
def _broken_validate(stamp):
return {"valid": False, "errors": ["db_missing", "metadata_missing"]}
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
monkeypatch.setattr("atocore.ops.backup.validate_backup", _broken_validate)
result = create_runtime_backup(datetime(2026, 4, 11, 11, 0, 0, tzinfo=UTC))
finally:
config.settings = original_settings
# Should NOT have raised — backup still returned metadata
assert result["validated"] is False
assert result["validation_errors"] == ["db_missing", "metadata_missing"]
# Core backup fields still present
assert "db_snapshot_path" in result
assert "created_at" in result
def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
"""Stale WAL/SHM sidecars must not carry bytes past the restore.
@@ -457,3 +508,183 @@ def test_restore_cleans_stale_wal_sidecars(tmp_path, monkeypatch):
)
finally:
config.settings = original_settings
# ---------------------------------------------------------------------------
# Task C: Backup retention cleanup
# ---------------------------------------------------------------------------
def _setup_cleanup_env(tmp_path, monkeypatch):
"""Helper: configure env, init db, return snapshots_root."""
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original = config.settings
config.settings = config.Settings()
init_db()
snapshots_root = config.settings.resolved_backup_dir / "snapshots"
snapshots_root.mkdir(parents=True, exist_ok=True)
return original, snapshots_root
def _seed_snapshots(snapshots_root, dates):
"""Create minimal valid snapshot dirs for the given datetimes."""
for dt in dates:
stamp = dt.strftime("%Y%m%dT%H%M%SZ")
snap_dir = snapshots_root / stamp
db_dir = snap_dir / "db"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "atocore.db"
conn = sqlite3.connect(str(db_path))
conn.execute("CREATE TABLE IF NOT EXISTS _marker (id INTEGER)")
conn.close()
metadata = {
"created_at": dt.isoformat(),
"backup_root": str(snap_dir),
"db_snapshot_path": str(db_path),
"db_size_bytes": db_path.stat().st_size,
"registry_snapshot_path": "",
"chroma_snapshot_path": "",
"chroma_snapshot_bytes": 0,
"chroma_snapshot_files": 0,
"chroma_snapshot_included": False,
"vector_store_note": "",
}
(snap_dir / "backup-metadata.json").write_text(
json.dumps(metadata, indent=2) + "\n", encoding="utf-8"
)
def test_cleanup_empty_dir(tmp_path, monkeypatch):
original, _ = _setup_cleanup_env(tmp_path, monkeypatch)
try:
result = cleanup_old_backups()
assert result["kept"] == 0
assert result["would_delete"] == 0
assert result["dry_run"] is True
finally:
config.settings = original
def test_cleanup_dry_run_identifies_old_snapshots(tmp_path, monkeypatch):
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 10 daily snapshots Apr 2-11 (avoiding Apr 1 which is monthly).
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(10)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups()
assert result["dry_run"] is True
# 7 daily kept + Apr 5 is a Sunday (weekly) but already in daily.
# Apr 2, 3, 4 are oldest. Apr 5 is Sunday → kept as weekly.
# So: 7 daily (Apr 5-11) + 1 weekly (Apr 5 already counted) = 7 daily.
# But Apr 5 is the 8th newest day from Apr 11... wait.
# Newest 7 days: Apr 11,10,9,8,7,6,5 → all kept as daily.
# Remaining: Apr 4,3,2. Apr 5 is already in daily.
# None of Apr 4,3,2 are Sunday or 1st → all 3 deleted.
assert result["kept"] == 7
assert result["would_delete"] == 3
assert len(list(snapshots_root.iterdir())) == 10
finally:
config.settings = original
def test_cleanup_confirm_deletes(tmp_path, monkeypatch):
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 2, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(10)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups(confirm=True)
assert result["dry_run"] is False
assert result["deleted"] == 3
assert result["kept"] == 7
assert len(list(snapshots_root.iterdir())) == 7
finally:
config.settings = original
def test_cleanup_keeps_last_7_daily(tmp_path, monkeypatch):
"""Exactly 7 snapshots on different days → all kept."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
dates = [base + timedelta(days=i) for i in range(7)]
_seed_snapshots(snapshots_root, dates)
result = cleanup_old_backups()
assert result["kept"] == 7
assert result["would_delete"] == 0
finally:
config.settings = original
def test_cleanup_keeps_sunday_weekly(tmp_path, monkeypatch):
"""Snapshots on Sundays outside the 7-day window are kept as weekly."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 7 daily snapshots covering Apr 5-11
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
daily = [base + timedelta(days=i) for i in range(7)]
# 2 older Sunday snapshots
sun1 = datetime(2026, 3, 29, 12, 0, 0, tzinfo=UTC) # Sunday
sun2 = datetime(2026, 3, 22, 12, 0, 0, tzinfo=UTC) # Sunday
# A non-Sunday old snapshot that should be deleted
wed = datetime(2026, 3, 25, 12, 0, 0, tzinfo=UTC) # Wednesday
_seed_snapshots(snapshots_root, daily + [sun1, sun2, wed])
result = cleanup_old_backups()
# 7 daily + 2 Sunday weekly = 9 kept, 1 Wednesday deleted
assert result["kept"] == 9
assert result["would_delete"] == 1
finally:
config.settings = original
def test_cleanup_keeps_monthly_first(tmp_path, monkeypatch):
"""Snapshots on the 1st of a month outside daily+weekly are kept as monthly."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
# 7 daily in April 2026
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
daily = [base + timedelta(days=i) for i in range(7)]
# Old monthly 1st snapshots
m1 = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC)
m2 = datetime(2025, 12, 1, 12, 0, 0, tzinfo=UTC)
# Old non-1st, non-Sunday snapshot — should be deleted
old = datetime(2026, 1, 15, 12, 0, 0, tzinfo=UTC)
_seed_snapshots(snapshots_root, daily + [m1, m2, old])
result = cleanup_old_backups()
# 7 daily + 2 monthly = 9 kept, 1 deleted
assert result["kept"] == 9
assert result["would_delete"] == 1
finally:
config.settings = original
def test_cleanup_unparseable_stamp_skipped(tmp_path, monkeypatch):
"""Directories with unparseable names are ignored, not deleted."""
original, snapshots_root = _setup_cleanup_env(tmp_path, monkeypatch)
try:
base = datetime(2026, 4, 5, 12, 0, 0, tzinfo=UTC)
_seed_snapshots(snapshots_root, [base])
bad_dir = snapshots_root / "not-a-timestamp"
bad_dir.mkdir()
result = cleanup_old_backups(confirm=True)
assert result.get("unparseable") == ["not-a-timestamp"]
assert bad_dir.exists()
assert result["kept"] == 1
finally:
config.settings = original

View File

@@ -6,6 +6,8 @@ from atocore.interactions.service import record_interaction
from atocore.main import app
from atocore.memory.reinforcement import (
DEFAULT_CONFIDENCE_DELTA,
_stem,
_tokenize,
reinforce_from_interaction,
)
from atocore.memory.service import (
@@ -373,3 +375,118 @@ def test_get_memories_filter_by_alias(project_registry):
assert len(via_alias) == 2
assert len(via_canonical) == 2
assert {m.content for m in via_alias} == {"m1", "m2"}
# --- token-overlap matcher: unit tests -------------------------------------
def test_stem_folds_s_ed_ing():
assert _stem("prefers") == "prefer"
assert _stem("preferred") == "prefer"
assert _stem("services") == "service"
assert _stem("processing") == "process"
# Short words must not be over-stripped
assert _stem("red") == "red" # 3 chars, don't strip "ed"
assert _stem("bus") == "bus" # 3 chars, don't strip "s"
assert _stem("sing") == "sing" # 4 chars, don't strip "ing"
assert _stem("being") == "being" # 5 chars, "ing" strip leaves "be" (2) — too short
def test_tokenize_removes_stop_words():
tokens = _tokenize("the quick brown fox jumps over the lazy dog")
assert "the" not in tokens
assert "quick" in tokens
assert "brown" in tokens
assert "fox" in tokens
assert "dog" in tokens
# "over" has len 4, not a stop word → kept (stemmed: "over")
assert "over" in tokens
# --- token-overlap matcher: paraphrase matching ----------------------------
def test_reinforce_matches_paraphrase_prefers_vs_prefer(tmp_data_dir):
"""The canonical rebase case from phase9-first-real-use.md."""
init_db()
mem = create_memory(
memory_type="preference",
content="prefers rebase-based workflows because history stays linear",
confidence=0.5,
)
interaction = _make_interaction(
response=(
"I prefer rebase-based workflows because the history stays "
"linear and reviewers have an easier time."
),
)
results = reinforce_from_interaction(interaction)
assert any(r.memory_id == mem.id for r in results)
def test_reinforce_matches_paraphrase_with_articles_and_ed(tmp_data_dir):
init_db()
mem = create_memory(
memory_type="preference",
content="preferred structured logging across all backend services",
confidence=0.5,
)
interaction = _make_interaction(
response=(
"I set up structured logging across all the backend services, "
"which the team prefers for consistency."
),
)
results = reinforce_from_interaction(interaction)
assert any(r.memory_id == mem.id for r in results)
def test_reinforce_rejects_low_overlap(tmp_data_dir):
init_db()
mem = create_memory(
memory_type="preference",
content="always uses Python for data processing scripts",
confidence=0.5,
)
interaction = _make_interaction(
response=(
"The CI pipeline runs on Node.js and deploys to Kubernetes "
"using Helm charts."
),
)
results = reinforce_from_interaction(interaction)
assert all(r.memory_id != mem.id for r in results)
def test_reinforce_matches_at_70_percent_threshold(tmp_data_dir):
"""Exactly 7 of 10 content tokens present → should match."""
init_db()
# After stop-word removal and stemming, this has 10 tokens:
# alpha, bravo, charlie, delta, echo, foxtrot, golf, hotel, india, juliet
mem = create_memory(
memory_type="preference",
content="alpha bravo charlie delta echo foxtrot golf hotel india juliet",
confidence=0.5,
)
# Echo 7 of 10 tokens (70%) plus some noise
interaction = _make_interaction(
response="alpha bravo charlie delta echo foxtrot golf noise words here",
)
results = reinforce_from_interaction(interaction)
assert any(r.memory_id == mem.id for r in results)
def test_reinforce_rejects_below_70_percent(tmp_data_dir):
"""Only 6 of 10 content tokens present (60%) → should NOT match."""
init_db()
mem = create_memory(
memory_type="preference",
content="alpha bravo charlie delta echo foxtrot golf hotel india juliet",
confidence=0.5,
)
# Echo 6 of 10 tokens (60%) plus noise
interaction = _make_interaction(
response="alpha bravo charlie delta echo foxtrot noise words here only",
)
results = reinforce_from_interaction(interaction)
assert all(r.memory_id != mem.id for r in results)