"""Phase 7A — memory consolidation tests. Covers: - similarity helpers (cosine bounds, matrix symmetry, clustering) - _dedup_prompt parser / normalizer robustness - create_merge_candidate idempotency - get_merge_candidates inlines source memories - merge_memories end-to-end happy path (sources → superseded, new merged memory active, audit rows, result_memory_id) - reject_merge_candidate leaves sources untouched """ from __future__ import annotations import pytest from atocore.memory._dedup_prompt import ( TIER2_SYSTEM_PROMPT, build_tier2_user_message, normalize_merge_verdict, parse_merge_verdict, ) from atocore.memory.service import ( create_memory, create_merge_candidate, get_memory_audit, get_merge_candidates, merge_memories, reject_merge_candidate, ) from atocore.memory.similarity import ( cluster_by_threshold, cosine, compute_memory_similarity, similarity_matrix, ) from atocore.models.database import get_connection, init_db # --- Similarity helpers --- def test_cosine_bounds(): assert cosine([1.0, 0.0], [1.0, 0.0]) == pytest.approx(1.0) assert cosine([1.0, 0.0], [0.0, 1.0]) == pytest.approx(0.0) # Negative dot product clamped to 0 assert cosine([1.0, 0.0], [-1.0, 0.0]) == 0.0 def test_compute_memory_similarity_identical_high(): s = compute_memory_similarity("the sky is blue", "the sky is blue") assert 0.99 <= s <= 1.0 def test_compute_memory_similarity_unrelated_low(): s = compute_memory_similarity( "APM integrates with NX via a Python bridge", "the polisher firmware must use USB SSD not SD card", ) assert 0.0 <= s < 0.7 def test_similarity_matrix_symmetric(): texts = ["alpha beta gamma", "alpha beta gamma", "completely unrelated text"] m = similarity_matrix(texts) assert len(m) == 3 and all(len(r) == 3 for r in m) for i in range(3): assert m[i][i] == pytest.approx(1.0) for i in range(3): for j in range(3): assert m[i][j] == pytest.approx(m[j][i]) def test_cluster_by_threshold_transitive(): # Three near-paraphrases should land in one cluster texts = [ "Antoine prefers OAuth over API keys", "Antoine's preference is OAuth, not API keys", "the polisher firmware uses USB SSD storage", ] clusters = cluster_by_threshold(texts, threshold=0.7) # At least one cluster of size 2+ containing the paraphrases big = [c for c in clusters if len(c) >= 2] assert big, f"expected at least one multi-member cluster, got {clusters}" assert 0 in big[0] and 1 in big[0] # --- Prompt parser robustness --- def test_parse_merge_verdict_strips_fences(): raw = "```json\n{\"action\":\"merge\",\"content\":\"x\"}\n```" parsed = parse_merge_verdict(raw) assert parsed == {"action": "merge", "content": "x"} def test_parse_merge_verdict_handles_prose_prefix(): raw = "Sure! Here's the result:\n{\"action\":\"reject\",\"content\":\"no\"}" parsed = parse_merge_verdict(raw) assert parsed is not None assert parsed["action"] == "reject" def test_normalize_merge_verdict_fills_defaults(): v = normalize_merge_verdict({ "action": "merge", "content": "unified text", }) assert v is not None assert v["memory_type"] == "knowledge" assert v["project"] == "" assert v["domain_tags"] == [] assert v["confidence"] == 0.5 def test_normalize_merge_verdict_rejects_empty_content(): assert normalize_merge_verdict({"action": "merge", "content": ""}) is None def test_normalize_merge_verdict_rejects_unknown_action(): assert normalize_merge_verdict({"action": "?", "content": "x"}) is None # --- Tier-2 (Phase 7A.1) --- def test_tier2_prompt_is_stricter(): # The tier-2 system prompt must explicitly instruct the model to be # stricter than tier-1 — that's the whole point of escalation. assert "STRICTER" in TIER2_SYSTEM_PROMPT assert "REJECT" in TIER2_SYSTEM_PROMPT def test_build_tier2_user_message_includes_tier1_draft(): sources = [{ "id": "abc12345", "content": "source text A", "memory_type": "knowledge", "project": "p04", "domain_tags": ["optics"], "confidence": 0.6, "valid_until": "", "reference_count": 2, }, { "id": "def67890", "content": "source text B", "memory_type": "knowledge", "project": "p04", "domain_tags": ["optics"], "confidence": 0.7, "valid_until": "", "reference_count": 1, }] tier1 = { "action": "merge", "content": "unified draft by tier1", "memory_type": "knowledge", "project": "p04", "domain_tags": ["optics"], "confidence": 0.65, "reason": "near-paraphrase", } msg = build_tier2_user_message(sources, tier1) assert "source text A" in msg assert "source text B" in msg assert "TIER-1 DRAFT" in msg assert "unified draft by tier1" in msg assert "near-paraphrase" in msg # Should end asking for a verdict assert "verdict" in msg.lower() # --- Tiering helpers (min_pairwise_similarity, same_bucket) --- def test_same_bucket_true_for_matching(): import importlib.util spec = importlib.util.spec_from_file_location( "memory_dedup_for_test", "scripts/memory_dedup.py", ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) sources = [ {"memory_type": "knowledge", "project": "p04"}, {"memory_type": "knowledge", "project": "p04"}, ] assert mod.same_bucket(sources) is True def test_same_bucket_false_for_mixed(): import importlib.util spec = importlib.util.spec_from_file_location( "memory_dedup_for_test", "scripts/memory_dedup.py", ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) # Different project assert mod.same_bucket([ {"memory_type": "knowledge", "project": "p04"}, {"memory_type": "knowledge", "project": "p05"}, ]) is False # Different memory_type assert mod.same_bucket([ {"memory_type": "knowledge", "project": "p04"}, {"memory_type": "project", "project": "p04"}, ]) is False def test_min_pairwise_similarity_identical_texts(): import importlib.util spec = importlib.util.spec_from_file_location( "memory_dedup_for_test", "scripts/memory_dedup.py", ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) # Three identical texts — min should be ~1.0 ms = mod.min_pairwise_similarity(["hello world"] * 3) assert 0.99 <= ms <= 1.0 def test_min_pairwise_similarity_mixed_cluster(): """Transitive cluster A~B~C with A and C actually quite different should expose a low min even though A~B and B~C are high.""" import importlib.util spec = importlib.util.spec_from_file_location( "memory_dedup_for_test", "scripts/memory_dedup.py", ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) ms = mod.min_pairwise_similarity([ "Antoine prefers OAuth over API keys", "Antoine's OAuth preference", "USB SSD mandatory for polisher firmware", ]) assert ms < 0.6 # Third is unrelated; min is far below threshold # --- create_merge_candidate idempotency --- def test_create_merge_candidate_inserts_row(tmp_data_dir): init_db() m1 = create_memory("knowledge", "APM uses NX for DXF conversion") m2 = create_memory("knowledge", "APM uses NX for DXF-to-STL") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.92, proposed_content="APM uses NX for DXF→STL conversion", proposed_memory_type="knowledge", proposed_project="", proposed_tags=["apm", "nx"], proposed_confidence=0.6, reason="near-paraphrase", ) assert cid is not None pending = get_merge_candidates(status="pending") assert len(pending) == 1 assert pending[0]["id"] == cid assert pending[0]["similarity"] == pytest.approx(0.92) assert len(pending[0]["sources"]) == 2 def test_create_merge_candidate_idempotent(tmp_data_dir): init_db() m1 = create_memory("knowledge", "Fact A") m2 = create_memory("knowledge", "Fact A slightly reworded") first = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="merged", proposed_memory_type="knowledge", proposed_project="", ) # Same id set, different order → dedupe skips second = create_merge_candidate( memory_ids=[m2.id, m1.id], similarity=0.9, proposed_content="merged (again)", proposed_memory_type="knowledge", proposed_project="", ) assert first is not None assert second is None def test_create_merge_candidate_requires_two_ids(tmp_data_dir): init_db() m1 = create_memory("knowledge", "lonely") with pytest.raises(ValueError): create_merge_candidate( memory_ids=[m1.id], similarity=1.0, proposed_content="x", proposed_memory_type="knowledge", proposed_project="", ) # --- merge_memories end-to-end --- def test_merge_memories_happy_path(tmp_data_dir): init_db() m1 = create_memory( "knowledge", "APM uses NX for DXF conversion", project="apm", confidence=0.6, domain_tags=["apm", "nx"], ) m2 = create_memory( "knowledge", "APM does DXF to STL via NX bridge", project="apm", confidence=0.8, domain_tags=["apm", "bridge"], ) # Bump reference counts so sum is meaningful with get_connection() as conn: conn.execute("UPDATE memories SET reference_count = 3 WHERE id = ?", (m1.id,)) conn.execute("UPDATE memories SET reference_count = 5 WHERE id = ?", (m2.id,)) cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.92, proposed_content="APM uses NX bridge for DXF→STL conversion", proposed_memory_type="knowledge", proposed_project="apm", proposed_tags=["apm", "nx", "bridge"], proposed_confidence=0.7, reason="duplicates", ) new_id = merge_memories(candidate_id=cid, actor="human-triage") assert new_id is not None # Sources superseded with get_connection() as conn: s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone() s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone() merged = conn.execute( "SELECT content, status, confidence, reference_count, project " "FROM memories WHERE id = ?", (new_id,) ).fetchone() cand = conn.execute( "SELECT status, result_memory_id FROM memory_merge_candidates WHERE id = ?", (cid,), ).fetchone() assert s1["status"] == "superseded" assert s2["status"] == "superseded" assert merged["status"] == "active" assert merged["project"] == "apm" # confidence = max of sources (0.8), not the proposed 0.7 (proposed is hint; # merge_memories picks max of actual source confidences — verify). assert merged["confidence"] == pytest.approx(0.8) # reference_count = sum (3 + 5 = 8) assert int(merged["reference_count"]) == 8 assert cand["status"] == "approved" assert cand["result_memory_id"] == new_id def test_merge_memories_content_override(tmp_data_dir): init_db() m1 = create_memory("knowledge", "draft A", project="p05-interferometer") m2 = create_memory("knowledge", "draft B", project="p05-interferometer") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="AI draft", proposed_memory_type="knowledge", proposed_project="p05-interferometer", ) new_id = merge_memories( candidate_id=cid, actor="human-triage", override_content="human-edited final text", override_tags=["optics", "custom"], ) assert new_id is not None with get_connection() as conn: row = conn.execute( "SELECT content, domain_tags FROM memories WHERE id = ?", (new_id,) ).fetchone() assert row["content"] == "human-edited final text" # domain_tags JSON should contain the override assert "optics" in row["domain_tags"] assert "custom" in row["domain_tags"] def test_merge_memories_writes_audit(tmp_data_dir): init_db() m1 = create_memory("knowledge", "alpha") m2 = create_memory("knowledge", "alpha variant") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="alpha merged", proposed_memory_type="knowledge", proposed_project="", ) new_id = merge_memories(candidate_id=cid) assert new_id audit_new = get_memory_audit(new_id) actions_new = {a["action"] for a in audit_new} assert "created_via_merge" in actions_new audit_m1 = get_memory_audit(m1.id) actions_m1 = {a["action"] for a in audit_m1} assert "superseded" in actions_m1 def test_merge_memories_aborts_if_source_not_active(tmp_data_dir): init_db() m1 = create_memory("knowledge", "one") m2 = create_memory("knowledge", "two") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="merged", proposed_memory_type="knowledge", proposed_project="", ) # Tamper: supersede one source before the merge runs with get_connection() as conn: conn.execute("UPDATE memories SET status = 'superseded' WHERE id = ?", (m1.id,)) result = merge_memories(candidate_id=cid) assert result is None # Candidate still pending pending = get_merge_candidates(status="pending") assert any(c["id"] == cid for c in pending) def test_merge_memories_rejects_already_resolved(tmp_data_dir): init_db() m1 = create_memory("knowledge", "x") m2 = create_memory("knowledge", "y") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="xy", proposed_memory_type="knowledge", proposed_project="", ) first = merge_memories(candidate_id=cid) assert first is not None # second call — already approved, should return None second = merge_memories(candidate_id=cid) assert second is None # --- reject_merge_candidate --- def test_reject_merge_candidate_leaves_sources_untouched(tmp_data_dir): init_db() m1 = create_memory("knowledge", "a") m2 = create_memory("knowledge", "b") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="a+b", proposed_memory_type="knowledge", proposed_project="", ) ok = reject_merge_candidate(cid, actor="human-triage", note="false positive") assert ok # Sources still active with get_connection() as conn: s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone() s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone() cand = conn.execute( "SELECT status FROM memory_merge_candidates WHERE id = ?", (cid,) ).fetchone() assert s1["status"] == "active" assert s2["status"] == "active" assert cand["status"] == "rejected" def test_reject_merge_candidate_idempotent(tmp_data_dir): init_db() m1 = create_memory("knowledge", "p") m2 = create_memory("knowledge", "q") cid = create_merge_candidate( memory_ids=[m1.id, m2.id], similarity=0.9, proposed_content="pq", proposed_memory_type="knowledge", proposed_project="", ) assert reject_merge_candidate(cid) is True # second reject — already rejected, returns False assert reject_merge_candidate(cid) is False # --- Schema sanity --- def test_merge_candidates_table_exists(tmp_data_dir): init_db() with get_connection() as conn: cols = [r["name"] for r in conn.execute("PRAGMA table_info(memory_merge_candidates)").fetchall()] expected = {"id", "status", "memory_ids", "similarity", "proposed_content", "proposed_memory_type", "proposed_project", "proposed_tags", "proposed_confidence", "reason", "created_at", "resolved_at", "resolved_by", "result_memory_id"} assert expected.issubset(set(cols))