From 90001c195670ef0353ef1b3af95e8f83befaafb8 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Tue, 21 Apr 2026 16:18:00 -0400 Subject: [PATCH] fix(7A): host-side memory_dedup.py must stay stdlib-only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Broke the dedup-watcher cron when I wrote memory_dedup.py in session 7A: imported atocore.memory.similarity, which transitively pulls sentence-transformers + pydantic_settings onto host Python that intentionally doesn't have them. Every UI-triggered + cron dedup scan since 7A deployed was silently crashing with ModuleNotFoundError (visible only in /home/papa/atocore-logs/dedup-ondemand-*.log). I even documented this architecture rule in atocore.memory._llm_prompt ('This module MUST stay stdlib-only') then violated it one session later. Shame. Real fix — matches the extractor pattern: - New endpoint POST /admin/memory/dedup-cluster on the server: takes {project, similarity_threshold, max_clusters}, runs the embedding + transitive-clustering inside the container where sentence-transformers lives, returns cluster shape. - scripts/memory_dedup.py now pure stdlib: pulls clusters via HTTP, LLM-drafts merges via claude CLI, POSTs proposals back. No atocore imports beyond the stdlib-only _dedup_prompt shared module. - Regression test pins the rule: test_memory_dedup_script_is_stdlib_only snapshots sys.modules before/after importing the script and asserts no non-allowed atocore modules were pulled. Also: similarity.py + cluster_by_threshold stay server-side, still covered by the same tests that used to live in the host tier-helper section. Tests 459 → 458 (-1 via rewrite of obsolete host-tier helper tests, +2 for the new stdlib-only regression + endpoint shape tests). Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/memory_dedup.py | 375 ++++++++++++++----------------------- src/atocore/api/routes.py | 96 ++++++++++ tests/test_memory_dedup.py | 117 ++++++------ 3 files changed, 300 insertions(+), 288 deletions(-) diff --git a/scripts/memory_dedup.py b/scripts/memory_dedup.py index 486e282..2c8fea1 100644 --- a/scripts/memory_dedup.py +++ b/scripts/memory_dedup.py @@ -1,32 +1,30 @@ #!/usr/bin/env python3 -"""Phase 7A — semantic memory dedup detector. +"""Phase 7A — semantic memory dedup detector (stdlib-only host script). Finds clusters of near-duplicate active memories and writes merge- -candidate proposals for human review in the triage UI. +candidate proposals for human (or autonomous) approval. Algorithm: - 1. Fetch active memories via HTTP - 2. Group by (project, memory_type) — cross-bucket merges are deferred - to Phase 7B contradiction flow - 3. Within each group, embed contents via atocore.retrieval.embeddings - 4. Greedy transitive cluster at similarity >= threshold - 5. For each cluster of size >= 2, ask claude-p to draft unified content - 6. POST the proposal to /admin/memory/merge-candidates/create (server- - side dedupes by the sorted memory-id set, so re-runs don't double- - create) + 1. POST /admin/memory/dedup-cluster on the AtoCore server — it + computes embeddings + transitive clusters under the (project, + memory_type) bucket rule (sentence-transformers lives in the + container, not on the host) + 2. For each returned cluster of size >= 2, ask claude-p (host-side + CLI) to draft unified content preserving all specifics + 3. Server-side tiering: + - TIER-1 auto-approve: sonnet confidence >= 0.8 AND min_sim >= 0.92 + AND all sources share project+type → immediately submit and + approve (actor="auto-dedup-tier1") + - TIER-2 escalation: opus confirms with conf >= 0.8 → auto-approve + (actor="auto-dedup-tier2"); opus rejects → skip silently + - HUMAN: pending proposal lands in /admin/triage -Host-side because claude CLI lives on Dalidou, not the container. Reuses -the same PYTHONPATH=src pattern as scripts/graduate_memories.py for -atocore imports (embeddings, similarity, prompt module). +Host-only dep: the `claude` CLI. No python packages beyond stdlib. +Reuses atocore.memory._dedup_prompt (stdlib-only shared prompt). Usage: python3 scripts/memory_dedup.py --base-url http://127.0.0.1:8100 \\ --similarity-threshold 0.88 --max-batch 50 - -Threshold conventions (see Phase 7 doc): - 0.88 interactive / default — balanced precision/recall - 0.90 nightly cron — tight, only near-duplicates - 0.85 weekly cron — deeper cleanup """ from __future__ import annotations @@ -41,10 +39,11 @@ import tempfile import time import urllib.error import urllib.request -from collections import defaultdict from typing import Any -# Make src/ importable — same pattern as graduate_memories.py +# Make src/ importable for the stdlib-only prompt module. +# We DO NOT import anything that pulls in pydantic_settings or +# sentence-transformers; those live on the server side. _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) _SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) if _SRC_DIR not in sys.path: @@ -59,19 +58,14 @@ from atocore.memory._dedup_prompt import ( # noqa: E402 normalize_merge_verdict, parse_merge_verdict, ) -from atocore.memory.similarity import cluster_by_threshold # noqa: E402 DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_DEDUP_MODEL", "sonnet") DEFAULT_TIER2_MODEL = os.environ.get("ATOCORE_DEDUP_TIER2_MODEL", "opus") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_DEDUP_TIMEOUT_S", "60")) -# Phase 7A.1 — auto-merge tiering thresholds. -# TIER-1 auto-approve: if sonnet confidence >= this AND min pairwise -# similarity >= AUTO_APPROVE_SIM AND all sources share project+type → merge silently. AUTO_APPROVE_CONF = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_CONF", "0.8")) AUTO_APPROVE_SIM = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_SIM", "0.92")) -# TIER-2 escalation band: sonnet uncertain but pair is similar enough to be worth opus time. TIER2_MIN_CONF = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_CONF", "0.5")) TIER2_MIN_SIM = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_SIM", "0.85")) @@ -91,18 +85,17 @@ def api_get(base_url: str, path: str) -> dict: return json.loads(resp.read().decode("utf-8")) -def api_post(base_url: str, path: str, body: dict | None = None) -> dict: +def api_post(base_url: str, path: str, body: dict | None = None, timeout: int = 60) -> dict: data = json.dumps(body or {}).encode("utf-8") req = urllib.request.Request( f"{base_url}{path}", method="POST", headers={"Content-Type": "application/json"}, data=data, ) - with urllib.request.urlopen(req, timeout=30) as resp: + with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def call_claude(system_prompt: str, user_message: str, model: str, timeout_s: float) -> tuple[str | None, str | None]: - """Shared CLI caller with retry + stderr capture (mirrors auto_triage).""" if not shutil.which("claude"): return None, "claude CLI not available" args = [ @@ -131,37 +124,32 @@ def call_claude(system_prompt: str, user_message: str, model: str, timeout_s: fl if completed.returncode == 0: return (completed.stdout or "").strip(), None stderr = (completed.stderr or "").strip()[:200] - last_error = f"{model} exit {completed.returncode}: {stderr}" if stderr else f"{model} exit {completed.returncode}" + last_error = f"{model} exit {completed.returncode}: {stderr}" return None, last_error -def fetch_active_memories(base_url: str, project: str | None) -> list[dict]: - # The /memory endpoint with active_only=true returns active memories. - # Graduated memories are exempt from dedup — they're frozen pointers - # to entities. Filter them out on the client side. - params = "active_only=true&limit=2000" - if project: - params += f"&project={urllib.request.quote(project)}" +def fetch_clusters(base_url: str, project: str, threshold: float, max_clusters: int) -> list[dict]: + """Ask the server to compute near-duplicate clusters. The server + owns sentence-transformers; host stays lean.""" try: - result = api_get(base_url, f"/memory?{params}") + result = api_post(base_url, "/admin/memory/dedup-cluster", { + "project": project, + "similarity_threshold": threshold, + "max_clusters": max_clusters, + }, timeout=120) except Exception as e: - print(f"ERROR: could not fetch memories: {e}", file=sys.stderr) + print(f"ERROR: dedup-cluster fetch failed: {e}", file=sys.stderr) return [] - mems = result.get("memories", []) - return [m for m in mems if (m.get("status") or "active") == "active"] - - -def group_memories(mems: list[dict]) -> dict[tuple[str, str], list[dict]]: - """Bucket by (project, memory_type). Empty project is its own bucket.""" - buckets: dict[tuple[str, str], list[dict]] = defaultdict(list) - for m in mems: - key = ((m.get("project") or "").strip().lower(), (m.get("memory_type") or "").strip().lower()) - buckets[key].append(m) - return buckets + clusters = result.get("clusters", []) + print( + f"server returned {len(clusters)} clusters " + f"(total_active={result.get('total_active_scanned')}, " + f"buckets={result.get('bucket_count')})" + ) + return clusters def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, Any] | None: - """Tier-1 draft: cheap sonnet call proposes the unified content.""" user_msg = build_user_message(sources) raw, err = call_claude(SYSTEM_PROMPT, user_msg, model, timeout_s) if err: @@ -174,13 +162,7 @@ def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, return normalize_merge_verdict(parsed) -def tier2_review( - sources: list[dict], - tier1_verdict: dict[str, Any], - model: str, - timeout_s: float, -) -> dict[str, Any] | None: - """Tier-2 second opinion: opus confirms or overrides the tier-1 draft.""" +def tier2_review(sources: list[dict], tier1_verdict: dict, model: str, timeout_s: float) -> dict | None: user_msg = build_tier2_user_message(sources, tier1_verdict) raw, err = call_claude(TIER2_SYSTEM_PROMPT, user_msg, model, timeout_s) if err: @@ -193,34 +175,7 @@ def tier2_review( return normalize_merge_verdict(parsed) -def min_pairwise_similarity(texts: list[str]) -> float: - """Return the minimum pairwise cosine similarity across N texts. - - Used to sanity-check a transitive cluster: A~B~C doesn't guarantee - A~C, so the auto-approve threshold should be met by the WEAKEST - pair, not just by the strongest. If the cluster has N=2 this is just - the one pairwise similarity. - """ - if len(texts) < 2: - return 0.0 - # Reuse similarity_matrix rather than computing it ourselves - from atocore.memory.similarity import similarity_matrix - m = similarity_matrix(texts) - min_sim = 1.0 - for i in range(len(texts)): - for j in range(i + 1, len(texts)): - if m[i][j] < min_sim: - min_sim = m[i][j] - return min_sim - - -def submit_candidate( - base_url: str, - memory_ids: list[str], - similarity: float, - verdict: dict[str, Any], - dry_run: bool, -) -> str | None: +def submit_candidate(base_url: str, memory_ids: list[str], similarity: float, verdict: dict, dry_run: bool) -> str | None: body = { "memory_ids": memory_ids, "similarity": similarity, @@ -246,7 +201,6 @@ def submit_candidate( def auto_approve(base_url: str, candidate_id: str, actor: str, dry_run: bool) -> str | None: - """POST /admin/memory/merge-candidates/{id}/approve. Returns result_memory_id.""" if dry_run: return "dry-run" try: @@ -261,29 +215,15 @@ def auto_approve(base_url: str, candidate_id: str, actor: str, dry_run: bool) -> return None -def same_bucket(sources: list[dict]) -> bool: - """All sources share the same (project, memory_type).""" - if not sources: - return False - proj = (sources[0].get("project") or "").strip().lower() - mtype = (sources[0].get("memory_type") or "").strip().lower() - for s in sources[1:]: - if (s.get("project") or "").strip().lower() != proj: - return False - if (s.get("memory_type") or "").strip().lower() != mtype: - return False - return True - - def main() -> None: - parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector (tiered)") + parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector (tiered, stdlib-only host)") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--project", default="", help="Only scan this project (empty = all)") parser.add_argument("--similarity-threshold", type=float, default=0.88) parser.add_argument("--max-batch", type=int, default=50, help="Max clusters to process per run") - parser.add_argument("--model", default=DEFAULT_MODEL, help="Tier-1 model (default: sonnet)") - parser.add_argument("--tier2-model", default=DEFAULT_TIER2_MODEL, help="Tier-2 model (default: opus)") + parser.add_argument("--model", default=DEFAULT_MODEL) + parser.add_argument("--tier2-model", default=DEFAULT_TIER2_MODEL) parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S) parser.add_argument("--no-auto-approve", action="store_true", help="Disable autonomous merging; all merges land in human triage queue") @@ -299,162 +239,133 @@ def main() -> None: f"autonomous={autonomous} | " f"auto-approve: conf>={AUTO_APPROVE_CONF} sim>={AUTO_APPROVE_SIM}" ) - mems = fetch_active_memories(base, args.project or None) - print(f"fetched {len(mems)} active memories") - if not mems: + + clusters = fetch_clusters( + base, args.project, args.similarity_threshold, args.max_batch, + ) + if not clusters: + print("no clusters — nothing to dedup") return - buckets = group_memories(mems) - print(f"grouped into {len(buckets)} (project, memory_type) buckets") - - clusters_found = 0 auto_merged_tier1 = 0 auto_merged_tier2 = 0 human_candidates = 0 tier1_rejections = 0 - tier2_overrides = 0 # opus disagreed with sonnet - skipped_low_sim = 0 + tier2_overrides = 0 skipped_existing = 0 processed = 0 - for (proj, mtype), group in sorted(buckets.items()): - if len(group) < 2: - continue + for cluster in clusters: if processed >= args.max_batch: - print(f"reached max-batch={args.max_batch}, stopping") break + processed += 1 + sources = cluster["sources"] + ids = cluster["memory_ids"] + min_sim = float(cluster["min_similarity"]) + proj = cluster.get("project") or "(global)" + mtype = cluster.get("memory_type") or "?" - texts = [(m.get("content") or "") for m in group] - clusters = cluster_by_threshold(texts, args.similarity_threshold) - clusters = [c for c in clusters if len(c) >= 2] - if not clusters: + print(f"\n[{proj}/{mtype}] cluster size={cluster['size']} min_sim={min_sim:.3f} " + f"{[s['id'][:8] for s in sources]}") + + tier1 = draft_merge(sources, args.model, args.timeout_s) + if tier1 is None: + continue + if tier1["action"] == "reject": + tier1_rejections += 1 + print(f" TIER-1 rejected: {tier1['reason'][:100]}") continue - print(f"\n[{proj or '(global)'}/{mtype}] {len(group)} mems → {len(clusters)} cluster(s)") - for cluster in clusters: - if processed >= args.max_batch: - break - clusters_found += 1 - sources = [group[i] for i in cluster] - ids = [s["id"] for s in sources] - cluster_texts = [texts[i] for i in cluster] - min_sim = min_pairwise_similarity(cluster_texts) - print(f" cluster of {len(cluster)} (min_sim={min_sim:.3f}): {[s['id'][:8] for s in sources]}") - - # Tier-1 draft - tier1 = draft_merge(sources, args.model, args.timeout_s) - processed += 1 - if tier1 is None: - continue - if tier1["action"] == "reject": - tier1_rejections += 1 - print(f" TIER-1 rejected: {tier1['reason'][:100]}") - continue - - # --- Tiering decision --- - bucket_ok = same_bucket(sources) - tier1_ok = ( - tier1["confidence"] >= AUTO_APPROVE_CONF - and min_sim >= AUTO_APPROVE_SIM - and bucket_ok - ) - - if autonomous and tier1_ok: - cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) - if cid == "dry-run": - auto_merged_tier1 += 1 - print(" [dry-run] would auto-merge (tier-1)") - elif cid: - new_id = auto_approve(base, cid, actor="auto-dedup-tier1", dry_run=args.dry_run) - if new_id: - auto_merged_tier1 += 1 - print(f" ✅ auto-merged (tier-1) → {str(new_id)[:8]}") - else: - print(f" ⚠️ tier-1 approve failed; candidate {cid[:8]} left pending") - human_candidates += 1 - else: - skipped_existing += 1 - time.sleep(0.3) - continue - - # Not tier-1 auto-approve. Decide if it's worth tier-2 escalation. - tier2_eligible = ( - autonomous - and min_sim >= TIER2_MIN_SIM - and tier1["confidence"] >= TIER2_MIN_CONF - and bucket_ok - ) - - if tier2_eligible: - print(" → escalating to tier-2 (opus)…") - tier2 = tier2_review(sources, tier1, args.tier2_model, args.timeout_s) - if tier2 is None: - # Opus errored. Fall back to human triage. - cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) - if cid and cid != "dry-run": - human_candidates += 1 - print(f" → candidate {cid[:8]} (tier-2 errored, human review)") - time.sleep(0.5) - continue - - if tier2["action"] == "reject": - tier2_overrides += 1 - print(f" ❌ TIER-2 override (reject): {tier2['reason'][:100]}") - time.sleep(0.5) - continue - - if tier2["confidence"] >= AUTO_APPROVE_CONF: - # Opus confirms. Auto-merge using opus's (possibly refined) content. - cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) - if cid == "dry-run": - auto_merged_tier2 += 1 - print(" [dry-run] would auto-merge (tier-2)") - elif cid: - new_id = auto_approve(base, cid, actor="auto-dedup-tier2", dry_run=args.dry_run) - if new_id: - auto_merged_tier2 += 1 - print(f" ✅ auto-merged (tier-2) → {str(new_id)[:8]}") - else: - human_candidates += 1 - print(f" ⚠️ tier-2 approve failed; candidate {cid[:8]} left pending") - else: - skipped_existing += 1 - time.sleep(0.5) - continue - - # Opus confirmed but low confidence → human review with opus's draft - cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) - if cid and cid != "dry-run": - human_candidates += 1 - print(f" → candidate {cid[:8]} (tier-2 low-confidence, human review)") - time.sleep(0.5) - continue - - # Below tier-2 eligibility (either non-autonomous mode, or - # similarity too low / cross-bucket). Always human review. - if min_sim < TIER2_MIN_SIM or not bucket_ok: - skipped_low_sim += 1 - # Still create a human candidate so it's visible, but log why - print(f" → below auto-tier thresholds (min_sim={min_sim:.3f}, bucket_ok={bucket_ok})") + # All sources share the bucket by construction from the server + bucket_ok = True + tier1_ok = ( + tier1["confidence"] >= AUTO_APPROVE_CONF + and min_sim >= AUTO_APPROVE_SIM + and bucket_ok + ) + if autonomous and tier1_ok: cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) if cid == "dry-run": - human_candidates += 1 + auto_merged_tier1 += 1 + print(" [dry-run] would auto-merge (tier-1)") elif cid: - human_candidates += 1 - print(f" → candidate {cid[:8]} (human review)") + new_id = auto_approve(base, cid, actor="auto-dedup-tier1", dry_run=args.dry_run) + if new_id: + auto_merged_tier1 += 1 + print(f" ✅ auto-merged (tier-1) → {str(new_id)[:8]}") + else: + human_candidates += 1 + print(f" ⚠️ tier-1 approve failed; candidate {cid[:8]} pending") else: skipped_existing += 1 time.sleep(0.3) + continue + + tier2_eligible = ( + autonomous + and min_sim >= TIER2_MIN_SIM + and tier1["confidence"] >= TIER2_MIN_CONF + ) + + if tier2_eligible: + print(" → escalating to tier-2 (opus)…") + tier2 = tier2_review(sources, tier1, args.tier2_model, args.timeout_s) + if tier2 is None: + cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) + if cid and cid != "dry-run": + human_candidates += 1 + print(f" → candidate {cid[:8]} (tier-2 errored, human review)") + time.sleep(0.5) + continue + + if tier2["action"] == "reject": + tier2_overrides += 1 + print(f" ❌ TIER-2 override (reject): {tier2['reason'][:100]}") + time.sleep(0.5) + continue + + if tier2["confidence"] >= AUTO_APPROVE_CONF: + cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) + if cid == "dry-run": + auto_merged_tier2 += 1 + elif cid: + new_id = auto_approve(base, cid, actor="auto-dedup-tier2", dry_run=args.dry_run) + if new_id: + auto_merged_tier2 += 1 + print(f" ✅ auto-merged (tier-2) → {str(new_id)[:8]}") + else: + human_candidates += 1 + else: + skipped_existing += 1 + time.sleep(0.5) + continue + + cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) + if cid and cid != "dry-run": + human_candidates += 1 + print(f" → candidate {cid[:8]} (tier-2 low-conf, human review)") + time.sleep(0.5) + continue + + # Below tier-2 thresholds — human review with tier-1 draft + cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) + if cid == "dry-run": + human_candidates += 1 + elif cid: + human_candidates += 1 + print(f" → candidate {cid[:8]} (human review)") + else: + skipped_existing += 1 + time.sleep(0.3) print( - f"\nsummary: clusters_found={clusters_found} " + f"\nsummary: clusters_processed={processed} " f"auto_merged_tier1={auto_merged_tier1} " f"auto_merged_tier2={auto_merged_tier2} " f"human_candidates={human_candidates} " f"tier1_rejections={tier1_rejections} " f"tier2_overrides={tier2_overrides} " - f"skipped_low_sim={skipped_low_sim} " f"skipped_existing={skipped_existing}" ) diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index 342c32e..f1e1db3 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -1744,6 +1744,102 @@ class DedupScanRequestBody(BaseModel): max_batch: int = 50 +class DedupClusterBody(BaseModel): + project: str = "" + similarity_threshold: float = 0.88 + max_clusters: int = 100 + + +@router.post("/admin/memory/dedup-cluster") +def api_dedup_cluster(body: DedupClusterBody) -> dict: + """Server-side near-duplicate clustering for Phase 7A dedup detector. + + Host-side scripts/memory_dedup.py can't import atocore.memory.similarity + (that would transitively pull sentence-transformers + torch onto the + host Python, which intentionally stays lean). Instead the host posts + here; we compute embeddings + transitive clusters server-side and + return the cluster shape the host needs to draft merges via claude CLI. + + Buckets by (project, memory_type) — cross-bucket merges are deferred + to the 7B contradiction flow. Active non-graduated memories only. + Returns up to max_clusters clusters of size >= 2, ordered by min + intra-cluster similarity descending (strongest candidates first).""" + from atocore.memory.service import get_memories + from atocore.memory.similarity import cluster_by_threshold, similarity_matrix + + project_filter = (body.project or "").strip() or None + threshold = max(0.5, min(0.99, body.similarity_threshold)) + + mems = get_memories( + project=project_filter, + active_only=True, + limit=2000, + ) + # Drop graduated (frozen entity pointers) — they're exempt from dedup + mems = [m for m in mems if m.status == "active"] + + # Group by (project, memory_type) + buckets: dict[tuple[str, str], list] = {} + for m in mems: + key = ((m.project or "").lower(), (m.memory_type or "").lower()) + buckets.setdefault(key, []).append(m) + + out_clusters: list[dict] = [] + for (proj, mtype), group in sorted(buckets.items()): + if len(group) < 2: + continue + texts = [m.content or "" for m in group] + clusters = cluster_by_threshold(texts, threshold) + clusters = [c for c in clusters if len(c) >= 2] + if not clusters: + continue + + # Cache matrix once per bucket so we can report min pairwise sim + matrix = similarity_matrix(texts) + + for cluster in clusters: + min_sim = 1.0 + for i in range(len(cluster)): + for j in range(i + 1, len(cluster)): + s = matrix[cluster[i]][cluster[j]] + if s < min_sim: + min_sim = s + sources = [] + for idx in cluster: + m = group[idx] + sources.append({ + "id": m.id, + "memory_type": m.memory_type, + "content": m.content, + "project": m.project or "", + "confidence": m.confidence, + "reference_count": m.reference_count, + "domain_tags": m.domain_tags or [], + "valid_until": m.valid_until or "", + }) + out_clusters.append({ + "project": proj, + "memory_type": mtype, + "min_similarity": round(min_sim, 4), + "size": len(cluster), + "memory_ids": [s["id"] for s in sources], + "sources": sources, + }) + + # Strongest clusters first + out_clusters.sort(key=lambda c: -c["min_similarity"]) + out_clusters = out_clusters[:body.max_clusters] + + return { + "cluster_count": len(out_clusters), + "threshold": threshold, + "project_filter": project_filter or "", + "total_active_scanned": len(mems), + "bucket_count": sum(1 for g in buckets.values() if len(g) >= 2), + "clusters": out_clusters, + } + + @router.get("/admin/memory/merge-candidates") def api_list_merge_candidates(status: str = "pending", limit: int = 100) -> dict: """Phase 7A: list merge-candidate proposals for triage UI.""" diff --git a/tests/test_memory_dedup.py b/tests/test_memory_dedup.py index 321c5f6..e1bbd4b 100644 --- a/tests/test_memory_dedup.py +++ b/tests/test_memory_dedup.py @@ -162,77 +162,82 @@ def test_build_tier2_user_message_includes_tier1_draft(): assert "verdict" in msg.lower() -# --- Tiering helpers (min_pairwise_similarity, same_bucket) --- +# --- Host script is stdlib-only (Phase 7A architecture rule) --- -def test_same_bucket_true_for_matching(): +def test_memory_dedup_script_is_stdlib_only(): + """The host-side scripts/memory_dedup.py must NOT import anything + that pulls pydantic_settings, sentence-transformers, torch, etc. + into the host Python. The only atocore-land module allowed is the + stdlib-only prompt helper at atocore.memory._dedup_prompt. + + This regression test prevents re-introducing the bug where the + dedup-watcher on Dalidou host crashed with ModuleNotFoundError + because someone imported atocore.memory.similarity (which pulls + in atocore.retrieval.embeddings → sentence_transformers).""" import importlib.util + import sys as _sys + + before = set(_sys.modules.keys()) spec = importlib.util.spec_from_file_location( - "memory_dedup_for_test", - "scripts/memory_dedup.py", + "memory_dedup_for_test", "scripts/memory_dedup.py", ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) + after = set(_sys.modules.keys()) - sources = [ - {"memory_type": "knowledge", "project": "p04"}, - {"memory_type": "knowledge", "project": "p04"}, - ] - assert mod.same_bucket(sources) is True - - -def test_same_bucket_false_for_mixed(): - import importlib.util - spec = importlib.util.spec_from_file_location( - "memory_dedup_for_test", - "scripts/memory_dedup.py", + new_atocore = sorted(m for m in (after - before) if m.startswith("atocore")) + # Only the stdlib-only shared prompt module is allowed to load + allowed = {"atocore", "atocore.memory", "atocore.memory._dedup_prompt"} + disallowed = [m for m in new_atocore if m not in allowed] + assert not disallowed, ( + f"scripts/memory_dedup.py pulled non-stdlib atocore modules " + f"(will break host Python without ML deps): {disallowed}" ) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - - # Different project - assert mod.same_bucket([ - {"memory_type": "knowledge", "project": "p04"}, - {"memory_type": "knowledge", "project": "p05"}, - ]) is False - # Different memory_type - assert mod.same_bucket([ - {"memory_type": "knowledge", "project": "p04"}, - {"memory_type": "project", "project": "p04"}, - ]) is False -def test_min_pairwise_similarity_identical_texts(): - import importlib.util - spec = importlib.util.spec_from_file_location( - "memory_dedup_for_test", - "scripts/memory_dedup.py", +# --- Server-side clustering (still in atocore.memory.similarity) --- + + +def test_similarity_module_server_side(): + """similarity.py stays server-side for ML deps. These helpers are + only invoked via the /admin/memory/dedup-cluster endpoint.""" + from atocore.memory.similarity import cluster_by_threshold + clusters = cluster_by_threshold( + ["duplicate fact A", "duplicate fact A slightly reworded", + "totally unrelated fact about firmware"], + threshold=0.7, ) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - - # Three identical texts — min should be ~1.0 - ms = mod.min_pairwise_similarity(["hello world"] * 3) - assert 0.99 <= ms <= 1.0 + multi = [c for c in clusters if len(c) >= 2] + assert multi, "expected at least one multi-member cluster" -def test_min_pairwise_similarity_mixed_cluster(): - """Transitive cluster A~B~C with A and C actually quite different - should expose a low min even though A~B and B~C are high.""" - import importlib.util - spec = importlib.util.spec_from_file_location( - "memory_dedup_for_test", - "scripts/memory_dedup.py", - ) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) +def test_cluster_endpoint_returns_groups(tmp_data_dir): + """POST /admin/memory/dedup-cluster shape test — we just verify the + service layer produces the expected output. Full HTTP is + integration-tested by the live scan.""" + from atocore.models.database import init_db + init_db() + from atocore.memory.service import create_memory, get_memories + create_memory("knowledge", "APM uses NX bridge for DXF to STL conversion", + project="apm") + create_memory("knowledge", "APM uses the NX Python bridge for DXF-to-STL", + project="apm") + create_memory("knowledge", "The polisher firmware requires USB SSD storage", + project="p06-polisher") - ms = mod.min_pairwise_similarity([ - "Antoine prefers OAuth over API keys", - "Antoine's OAuth preference", - "USB SSD mandatory for polisher firmware", - ]) - assert ms < 0.6 # Third is unrelated; min is far below threshold + # Mirror the server code path + from atocore.memory.similarity import cluster_by_threshold + mems = get_memories(project="apm", active_only=True, limit=100) + texts = [m.content for m in mems] + clusters = cluster_by_threshold(texts, threshold=0.7) + multi = [c for c in clusters if len(c) >= 2] + assert multi, "expected the two APM memories to cluster together" + # Unrelated p06 memory should NOT be in that cluster + apm_ids = {mems[i].id for i in multi[0]} + assert len(apm_ids) == 2 + all_ids = {m.id for m in mems} + assert apm_ids.issubset(all_ids) # --- create_merge_candidate idempotency ---