"""Phase 7A (Memory Consolidation): semantic similarity helpers. Thin wrapper over ``atocore.retrieval.embeddings`` that exposes pairwise + batch cosine similarity on normalized embeddings. Used by the dedup detector to cluster near-duplicate active memories. Embeddings from ``embed_texts()`` are already L2-normalized, so cosine similarity reduces to a dot product — no extra normalization needed. """ from __future__ import annotations from atocore.retrieval.embeddings import embed_texts def _dot(a: list[float], b: list[float]) -> float: return sum(x * y for x, y in zip(a, b)) def cosine(a: list[float], b: list[float]) -> float: """Cosine similarity on already-normalized vectors. Clamped to [0,1] (embeddings use paraphrase-multilingual-MiniLM which is unit-norm, and we never want negative values leaking into thresholds).""" return max(0.0, min(1.0, _dot(a, b))) def compute_memory_similarity(text_a: str, text_b: str) -> float: """Return cosine similarity of two memory contents in [0,1]. Convenience helper for one-off checks + tests. For batch work (the dedup detector), use ``embed_texts()`` directly and compute the similarity matrix yourself to avoid re-embedding shared texts. """ if not text_a or not text_b: return 0.0 vecs = embed_texts([text_a, text_b]) return cosine(vecs[0], vecs[1]) def similarity_matrix(texts: list[str]) -> list[list[float]]: """N×N cosine similarity matrix. Diagonal is 1.0, symmetric.""" if not texts: return [] vecs = embed_texts(texts) n = len(vecs) matrix = [[0.0] * n for _ in range(n)] for i in range(n): matrix[i][i] = 1.0 for j in range(i + 1, n): s = cosine(vecs[i], vecs[j]) matrix[i][j] = s matrix[j][i] = s return matrix def cluster_by_threshold(texts: list[str], threshold: float) -> list[list[int]]: """Greedy transitive clustering: if sim(i,j) >= threshold, merge. Returns a list of clusters, each a list of indices into ``texts``. Singletons are included. Used by the dedup detector to collapse A~B~C into one merge proposal rather than three pair proposals. """ if not texts: return [] matrix = similarity_matrix(texts) n = len(texts) parent = list(range(n)) def find(x: int) -> int: while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(x: int, y: int) -> None: rx, ry = find(x), find(y) if rx != ry: parent[rx] = ry for i in range(n): for j in range(i + 1, n): if matrix[i][j] >= threshold: union(i, j) groups: dict[int, list[int]] = {} for i in range(n): groups.setdefault(find(i), []).append(i) return list(groups.values())