Files
ATOCore/tests/test_memory_dedup.py
Anto01 90001c1956 fix(7A): host-side memory_dedup.py must stay stdlib-only
Broke the dedup-watcher cron when I wrote memory_dedup.py in session
7A: imported atocore.memory.similarity, which transitively pulls
sentence-transformers + pydantic_settings onto host Python that
intentionally doesn't have them. Every UI-triggered + cron dedup scan
since 7A deployed was silently crashing with ModuleNotFoundError
(visible only in /home/papa/atocore-logs/dedup-ondemand-*.log).

I even documented this architecture rule in atocore.memory._llm_prompt
('This module MUST stay stdlib-only') then violated it one session
later. Shame.

Real fix — matches the extractor pattern:
- New endpoint POST /admin/memory/dedup-cluster on the server: takes
  {project, similarity_threshold, max_clusters}, runs the embedding +
  transitive-clustering inside the container where
  sentence-transformers lives, returns cluster shape.
- scripts/memory_dedup.py now pure stdlib: pulls clusters via HTTP,
  LLM-drafts merges via claude CLI, POSTs proposals back. No atocore
  imports beyond the stdlib-only _dedup_prompt shared module.
- Regression test pins the rule: test_memory_dedup_script_is_stdlib_only
  snapshots sys.modules before/after importing the script and asserts
  no non-allowed atocore modules were pulled.

Also: similarity.py + cluster_by_threshold stay server-side, still
covered by the same tests that used to live in the host tier-helper
section.

Tests 459 → 458 (-1 via rewrite of obsolete host-tier helper tests,
+2 for the new stdlib-only regression + endpoint shape tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 16:18:00 -04:00

502 lines
17 KiB
Python

"""Phase 7A — memory consolidation tests.
Covers:
- similarity helpers (cosine bounds, matrix symmetry, clustering)
- _dedup_prompt parser / normalizer robustness
- create_merge_candidate idempotency
- get_merge_candidates inlines source memories
- merge_memories end-to-end happy path (sources → superseded,
new merged memory active, audit rows, result_memory_id)
- reject_merge_candidate leaves sources untouched
"""
from __future__ import annotations
import pytest
from atocore.memory._dedup_prompt import (
TIER2_SYSTEM_PROMPT,
build_tier2_user_message,
normalize_merge_verdict,
parse_merge_verdict,
)
from atocore.memory.service import (
create_memory,
create_merge_candidate,
get_memory_audit,
get_merge_candidates,
merge_memories,
reject_merge_candidate,
)
from atocore.memory.similarity import (
cluster_by_threshold,
cosine,
compute_memory_similarity,
similarity_matrix,
)
from atocore.models.database import get_connection, init_db
# --- Similarity helpers ---
def test_cosine_bounds():
assert cosine([1.0, 0.0], [1.0, 0.0]) == pytest.approx(1.0)
assert cosine([1.0, 0.0], [0.0, 1.0]) == pytest.approx(0.0)
# Negative dot product clamped to 0
assert cosine([1.0, 0.0], [-1.0, 0.0]) == 0.0
def test_compute_memory_similarity_identical_high():
s = compute_memory_similarity("the sky is blue", "the sky is blue")
assert 0.99 <= s <= 1.0
def test_compute_memory_similarity_unrelated_low():
s = compute_memory_similarity(
"APM integrates with NX via a Python bridge",
"the polisher firmware must use USB SSD not SD card",
)
assert 0.0 <= s < 0.7
def test_similarity_matrix_symmetric():
texts = ["alpha beta gamma", "alpha beta gamma", "completely unrelated text"]
m = similarity_matrix(texts)
assert len(m) == 3 and all(len(r) == 3 for r in m)
for i in range(3):
assert m[i][i] == pytest.approx(1.0)
for i in range(3):
for j in range(3):
assert m[i][j] == pytest.approx(m[j][i])
def test_cluster_by_threshold_transitive():
# Three near-paraphrases should land in one cluster
texts = [
"Antoine prefers OAuth over API keys",
"Antoine's preference is OAuth, not API keys",
"the polisher firmware uses USB SSD storage",
]
clusters = cluster_by_threshold(texts, threshold=0.7)
# At least one cluster of size 2+ containing the paraphrases
big = [c for c in clusters if len(c) >= 2]
assert big, f"expected at least one multi-member cluster, got {clusters}"
assert 0 in big[0] and 1 in big[0]
# --- Prompt parser robustness ---
def test_parse_merge_verdict_strips_fences():
raw = "```json\n{\"action\":\"merge\",\"content\":\"x\"}\n```"
parsed = parse_merge_verdict(raw)
assert parsed == {"action": "merge", "content": "x"}
def test_parse_merge_verdict_handles_prose_prefix():
raw = "Sure! Here's the result:\n{\"action\":\"reject\",\"content\":\"no\"}"
parsed = parse_merge_verdict(raw)
assert parsed is not None
assert parsed["action"] == "reject"
def test_normalize_merge_verdict_fills_defaults():
v = normalize_merge_verdict({
"action": "merge",
"content": "unified text",
})
assert v is not None
assert v["memory_type"] == "knowledge"
assert v["project"] == ""
assert v["domain_tags"] == []
assert v["confidence"] == 0.5
def test_normalize_merge_verdict_rejects_empty_content():
assert normalize_merge_verdict({"action": "merge", "content": ""}) is None
def test_normalize_merge_verdict_rejects_unknown_action():
assert normalize_merge_verdict({"action": "?", "content": "x"}) is None
# --- Tier-2 (Phase 7A.1) ---
def test_tier2_prompt_is_stricter():
# The tier-2 system prompt must explicitly instruct the model to be
# stricter than tier-1 — that's the whole point of escalation.
assert "STRICTER" in TIER2_SYSTEM_PROMPT
assert "REJECT" in TIER2_SYSTEM_PROMPT
def test_build_tier2_user_message_includes_tier1_draft():
sources = [{
"id": "abc12345", "content": "source text A",
"memory_type": "knowledge", "project": "p04",
"domain_tags": ["optics"], "confidence": 0.6,
"valid_until": "", "reference_count": 2,
}, {
"id": "def67890", "content": "source text B",
"memory_type": "knowledge", "project": "p04",
"domain_tags": ["optics"], "confidence": 0.7,
"valid_until": "", "reference_count": 1,
}]
tier1 = {
"action": "merge",
"content": "unified draft by tier1",
"memory_type": "knowledge",
"project": "p04",
"domain_tags": ["optics"],
"confidence": 0.65,
"reason": "near-paraphrase",
}
msg = build_tier2_user_message(sources, tier1)
assert "source text A" in msg
assert "source text B" in msg
assert "TIER-1 DRAFT" in msg
assert "unified draft by tier1" in msg
assert "near-paraphrase" in msg
# Should end asking for a verdict
assert "verdict" in msg.lower()
# --- Host script is stdlib-only (Phase 7A architecture rule) ---
def test_memory_dedup_script_is_stdlib_only():
"""The host-side scripts/memory_dedup.py must NOT import anything
that pulls pydantic_settings, sentence-transformers, torch, etc.
into the host Python. The only atocore-land module allowed is the
stdlib-only prompt helper at atocore.memory._dedup_prompt.
This regression test prevents re-introducing the bug where the
dedup-watcher on Dalidou host crashed with ModuleNotFoundError
because someone imported atocore.memory.similarity (which pulls
in atocore.retrieval.embeddings → sentence_transformers)."""
import importlib.util
import sys as _sys
before = set(_sys.modules.keys())
spec = importlib.util.spec_from_file_location(
"memory_dedup_for_test", "scripts/memory_dedup.py",
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
after = set(_sys.modules.keys())
new_atocore = sorted(m for m in (after - before) if m.startswith("atocore"))
# Only the stdlib-only shared prompt module is allowed to load
allowed = {"atocore", "atocore.memory", "atocore.memory._dedup_prompt"}
disallowed = [m for m in new_atocore if m not in allowed]
assert not disallowed, (
f"scripts/memory_dedup.py pulled non-stdlib atocore modules "
f"(will break host Python without ML deps): {disallowed}"
)
# --- Server-side clustering (still in atocore.memory.similarity) ---
def test_similarity_module_server_side():
"""similarity.py stays server-side for ML deps. These helpers are
only invoked via the /admin/memory/dedup-cluster endpoint."""
from atocore.memory.similarity import cluster_by_threshold
clusters = cluster_by_threshold(
["duplicate fact A", "duplicate fact A slightly reworded",
"totally unrelated fact about firmware"],
threshold=0.7,
)
multi = [c for c in clusters if len(c) >= 2]
assert multi, "expected at least one multi-member cluster"
def test_cluster_endpoint_returns_groups(tmp_data_dir):
"""POST /admin/memory/dedup-cluster shape test — we just verify the
service layer produces the expected output. Full HTTP is
integration-tested by the live scan."""
from atocore.models.database import init_db
init_db()
from atocore.memory.service import create_memory, get_memories
create_memory("knowledge", "APM uses NX bridge for DXF to STL conversion",
project="apm")
create_memory("knowledge", "APM uses the NX Python bridge for DXF-to-STL",
project="apm")
create_memory("knowledge", "The polisher firmware requires USB SSD storage",
project="p06-polisher")
# Mirror the server code path
from atocore.memory.similarity import cluster_by_threshold
mems = get_memories(project="apm", active_only=True, limit=100)
texts = [m.content for m in mems]
clusters = cluster_by_threshold(texts, threshold=0.7)
multi = [c for c in clusters if len(c) >= 2]
assert multi, "expected the two APM memories to cluster together"
# Unrelated p06 memory should NOT be in that cluster
apm_ids = {mems[i].id for i in multi[0]}
assert len(apm_ids) == 2
all_ids = {m.id for m in mems}
assert apm_ids.issubset(all_ids)
# --- create_merge_candidate idempotency ---
def test_create_merge_candidate_inserts_row(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "APM uses NX for DXF conversion")
m2 = create_memory("knowledge", "APM uses NX for DXF-to-STL")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id],
similarity=0.92,
proposed_content="APM uses NX for DXF→STL conversion",
proposed_memory_type="knowledge",
proposed_project="",
proposed_tags=["apm", "nx"],
proposed_confidence=0.6,
reason="near-paraphrase",
)
assert cid is not None
pending = get_merge_candidates(status="pending")
assert len(pending) == 1
assert pending[0]["id"] == cid
assert pending[0]["similarity"] == pytest.approx(0.92)
assert len(pending[0]["sources"]) == 2
def test_create_merge_candidate_idempotent(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "Fact A")
m2 = create_memory("knowledge", "Fact A slightly reworded")
first = create_merge_candidate(
memory_ids=[m1.id, m2.id],
similarity=0.9,
proposed_content="merged",
proposed_memory_type="knowledge",
proposed_project="",
)
# Same id set, different order → dedupe skips
second = create_merge_candidate(
memory_ids=[m2.id, m1.id],
similarity=0.9,
proposed_content="merged (again)",
proposed_memory_type="knowledge",
proposed_project="",
)
assert first is not None
assert second is None
def test_create_merge_candidate_requires_two_ids(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "lonely")
with pytest.raises(ValueError):
create_merge_candidate(
memory_ids=[m1.id],
similarity=1.0,
proposed_content="x",
proposed_memory_type="knowledge",
proposed_project="",
)
# --- merge_memories end-to-end ---
def test_merge_memories_happy_path(tmp_data_dir):
init_db()
m1 = create_memory(
"knowledge", "APM uses NX for DXF conversion",
project="apm", confidence=0.6, domain_tags=["apm", "nx"],
)
m2 = create_memory(
"knowledge", "APM does DXF to STL via NX bridge",
project="apm", confidence=0.8, domain_tags=["apm", "bridge"],
)
# Bump reference counts so sum is meaningful
with get_connection() as conn:
conn.execute("UPDATE memories SET reference_count = 3 WHERE id = ?", (m1.id,))
conn.execute("UPDATE memories SET reference_count = 5 WHERE id = ?", (m2.id,))
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id],
similarity=0.92,
proposed_content="APM uses NX bridge for DXF→STL conversion",
proposed_memory_type="knowledge",
proposed_project="apm",
proposed_tags=["apm", "nx", "bridge"],
proposed_confidence=0.7,
reason="duplicates",
)
new_id = merge_memories(candidate_id=cid, actor="human-triage")
assert new_id is not None
# Sources superseded
with get_connection() as conn:
s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone()
s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone()
merged = conn.execute(
"SELECT content, status, confidence, reference_count, project "
"FROM memories WHERE id = ?", (new_id,)
).fetchone()
cand = conn.execute(
"SELECT status, result_memory_id FROM memory_merge_candidates WHERE id = ?",
(cid,),
).fetchone()
assert s1["status"] == "superseded"
assert s2["status"] == "superseded"
assert merged["status"] == "active"
assert merged["project"] == "apm"
# confidence = max of sources (0.8), not the proposed 0.7 (proposed is hint;
# merge_memories picks max of actual source confidences — verify).
assert merged["confidence"] == pytest.approx(0.8)
# reference_count = sum (3 + 5 = 8)
assert int(merged["reference_count"]) == 8
assert cand["status"] == "approved"
assert cand["result_memory_id"] == new_id
def test_merge_memories_content_override(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "draft A", project="p05-interferometer")
m2 = create_memory("knowledge", "draft B", project="p05-interferometer")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id],
similarity=0.9,
proposed_content="AI draft",
proposed_memory_type="knowledge",
proposed_project="p05-interferometer",
)
new_id = merge_memories(
candidate_id=cid,
actor="human-triage",
override_content="human-edited final text",
override_tags=["optics", "custom"],
)
assert new_id is not None
with get_connection() as conn:
row = conn.execute(
"SELECT content, domain_tags FROM memories WHERE id = ?", (new_id,)
).fetchone()
assert row["content"] == "human-edited final text"
# domain_tags JSON should contain the override
assert "optics" in row["domain_tags"]
assert "custom" in row["domain_tags"]
def test_merge_memories_writes_audit(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "alpha")
m2 = create_memory("knowledge", "alpha variant")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id], similarity=0.9,
proposed_content="alpha merged",
proposed_memory_type="knowledge", proposed_project="",
)
new_id = merge_memories(candidate_id=cid)
assert new_id
audit_new = get_memory_audit(new_id)
actions_new = {a["action"] for a in audit_new}
assert "created_via_merge" in actions_new
audit_m1 = get_memory_audit(m1.id)
actions_m1 = {a["action"] for a in audit_m1}
assert "superseded" in actions_m1
def test_merge_memories_aborts_if_source_not_active(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "one")
m2 = create_memory("knowledge", "two")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id], similarity=0.9,
proposed_content="merged",
proposed_memory_type="knowledge", proposed_project="",
)
# Tamper: supersede one source before the merge runs
with get_connection() as conn:
conn.execute("UPDATE memories SET status = 'superseded' WHERE id = ?", (m1.id,))
result = merge_memories(candidate_id=cid)
assert result is None
# Candidate still pending
pending = get_merge_candidates(status="pending")
assert any(c["id"] == cid for c in pending)
def test_merge_memories_rejects_already_resolved(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "x")
m2 = create_memory("knowledge", "y")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id], similarity=0.9,
proposed_content="xy",
proposed_memory_type="knowledge", proposed_project="",
)
first = merge_memories(candidate_id=cid)
assert first is not None
# second call — already approved, should return None
second = merge_memories(candidate_id=cid)
assert second is None
# --- reject_merge_candidate ---
def test_reject_merge_candidate_leaves_sources_untouched(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "a")
m2 = create_memory("knowledge", "b")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id], similarity=0.9,
proposed_content="a+b",
proposed_memory_type="knowledge", proposed_project="",
)
ok = reject_merge_candidate(cid, actor="human-triage", note="false positive")
assert ok
# Sources still active
with get_connection() as conn:
s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone()
s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone()
cand = conn.execute(
"SELECT status FROM memory_merge_candidates WHERE id = ?", (cid,)
).fetchone()
assert s1["status"] == "active"
assert s2["status"] == "active"
assert cand["status"] == "rejected"
def test_reject_merge_candidate_idempotent(tmp_data_dir):
init_db()
m1 = create_memory("knowledge", "p")
m2 = create_memory("knowledge", "q")
cid = create_merge_candidate(
memory_ids=[m1.id, m2.id], similarity=0.9,
proposed_content="pq",
proposed_memory_type="knowledge", proposed_project="",
)
assert reject_merge_candidate(cid) is True
# second reject — already rejected, returns False
assert reject_merge_candidate(cid) is False
# --- Schema sanity ---
def test_merge_candidates_table_exists(tmp_data_dir):
init_db()
with get_connection() as conn:
cols = [r["name"] for r in conn.execute("PRAGMA table_info(memory_merge_candidates)").fetchall()]
expected = {"id", "status", "memory_ids", "similarity", "proposed_content",
"proposed_memory_type", "proposed_project", "proposed_tags",
"proposed_confidence", "reason", "created_at", "resolved_at",
"resolved_by", "result_memory_id"}
assert expected.issubset(set(cols))