#!/usr/bin/env python3 """Phase 7A — semantic memory dedup detector (stdlib-only host script). Finds clusters of near-duplicate active memories and writes merge- candidate proposals for human (or autonomous) approval. Algorithm: 1. POST /admin/memory/dedup-cluster on the AtoCore server — it computes embeddings + transitive clusters under the (project, memory_type) bucket rule (sentence-transformers lives in the container, not on the host) 2. For each returned cluster of size >= 2, ask claude-p (host-side CLI) to draft unified content preserving all specifics 3. Server-side tiering: - TIER-1 auto-approve: sonnet confidence >= 0.8 AND min_sim >= 0.92 AND all sources share project+type → immediately submit and approve (actor="auto-dedup-tier1") - TIER-2 escalation: opus confirms with conf >= 0.8 → auto-approve (actor="auto-dedup-tier2"); opus rejects → skip silently - HUMAN: pending proposal lands in /admin/triage Host-only dep: the `claude` CLI. No python packages beyond stdlib. Reuses atocore.memory._dedup_prompt (stdlib-only shared prompt). Usage: python3 scripts/memory_dedup.py --base-url http://127.0.0.1:8100 \\ --similarity-threshold 0.88 --max-batch 50 """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import tempfile import time import urllib.error import urllib.request from typing import Any # Make src/ importable for the stdlib-only prompt module. # We DO NOT import anything that pulls in pydantic_settings or # sentence-transformers; those live on the server side. _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) _SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) if _SRC_DIR not in sys.path: sys.path.insert(0, _SRC_DIR) from atocore.memory._dedup_prompt import ( # noqa: E402 DEDUP_PROMPT_VERSION, SYSTEM_PROMPT, TIER2_SYSTEM_PROMPT, build_tier2_user_message, build_user_message, normalize_merge_verdict, parse_merge_verdict, ) DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_DEDUP_MODEL", "sonnet") DEFAULT_TIER2_MODEL = os.environ.get("ATOCORE_DEDUP_TIER2_MODEL", "opus") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_DEDUP_TIMEOUT_S", "60")) AUTO_APPROVE_CONF = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_CONF", "0.8")) AUTO_APPROVE_SIM = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_SIM", "0.92")) TIER2_MIN_CONF = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_CONF", "0.5")) TIER2_MIN_SIM = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_SIM", "0.85")) _sandbox_cwd = None def get_sandbox_cwd() -> str: global _sandbox_cwd if _sandbox_cwd is None: _sandbox_cwd = tempfile.mkdtemp(prefix="ato-dedup-") return _sandbox_cwd def api_get(base_url: str, path: str) -> dict: req = urllib.request.Request(f"{base_url}{path}") with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode("utf-8")) def api_post(base_url: str, path: str, body: dict | None = None, timeout: int = 60) -> dict: data = json.dumps(body or {}).encode("utf-8") req = urllib.request.Request( f"{base_url}{path}", method="POST", headers={"Content-Type": "application/json"}, data=data, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def call_claude(system_prompt: str, user_message: str, model: str, timeout_s: float) -> tuple[str | None, str | None]: if not shutil.which("claude"): return None, "claude CLI not available" args = [ "claude", "-p", "--model", model, "--append-system-prompt", system_prompt, "--disable-slash-commands", user_message, ] last_error = "" for attempt in range(3): if attempt > 0: time.sleep(2 ** attempt) try: completed = subprocess.run( args, capture_output=True, text=True, timeout=timeout_s, cwd=get_sandbox_cwd(), encoding="utf-8", errors="replace", ) except subprocess.TimeoutExpired: last_error = f"{model} timed out" continue except Exception as exc: last_error = f"subprocess error: {exc}" continue if completed.returncode == 0: return (completed.stdout or "").strip(), None stderr = (completed.stderr or "").strip()[:200] last_error = f"{model} exit {completed.returncode}: {stderr}" return None, last_error def fetch_clusters(base_url: str, project: str, threshold: float, max_clusters: int) -> list[dict]: """Ask the server to compute near-duplicate clusters. The server owns sentence-transformers; host stays lean.""" try: result = api_post(base_url, "/admin/memory/dedup-cluster", { "project": project, "similarity_threshold": threshold, "max_clusters": max_clusters, }, timeout=120) except Exception as e: print(f"ERROR: dedup-cluster fetch failed: {e}", file=sys.stderr) return [] clusters = result.get("clusters", []) print( f"server returned {len(clusters)} clusters " f"(total_active={result.get('total_active_scanned')}, " f"buckets={result.get('bucket_count')})" ) return clusters def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, Any] | None: user_msg = build_user_message(sources) raw, err = call_claude(SYSTEM_PROMPT, user_msg, model, timeout_s) if err: print(f" WARN: claude tier-1 failed: {err}", file=sys.stderr) return None parsed = parse_merge_verdict(raw or "") if parsed is None: print(f" WARN: could not parse tier-1 verdict: {(raw or '')[:200]}", file=sys.stderr) return None return normalize_merge_verdict(parsed) def tier2_review(sources: list[dict], tier1_verdict: dict, model: str, timeout_s: float) -> dict | None: user_msg = build_tier2_user_message(sources, tier1_verdict) raw, err = call_claude(TIER2_SYSTEM_PROMPT, user_msg, model, timeout_s) if err: print(f" WARN: claude tier-2 failed: {err}", file=sys.stderr) return None parsed = parse_merge_verdict(raw or "") if parsed is None: print(f" WARN: could not parse tier-2 verdict: {(raw or '')[:200]}", file=sys.stderr) return None return normalize_merge_verdict(parsed) def submit_candidate(base_url: str, memory_ids: list[str], similarity: float, verdict: dict, dry_run: bool) -> str | None: body = { "memory_ids": memory_ids, "similarity": similarity, "proposed_content": verdict["content"], "proposed_memory_type": verdict["memory_type"], "proposed_project": verdict["project"], "proposed_tags": verdict["domain_tags"], "proposed_confidence": verdict["confidence"], "reason": verdict["reason"], } if dry_run: print(f" [dry-run] would POST: {json.dumps(body)[:200]}...") return "dry-run" try: result = api_post(base_url, "/admin/memory/merge-candidates/create", body) return result.get("candidate_id") except urllib.error.HTTPError as e: print(f" ERROR: submit failed: {e.code} {e.read().decode()[:200]}", file=sys.stderr) return None except Exception as e: print(f" ERROR: submit failed: {e}", file=sys.stderr) return None def auto_approve(base_url: str, candidate_id: str, actor: str, dry_run: bool) -> str | None: if dry_run: return "dry-run" try: result = api_post( base_url, f"/admin/memory/merge-candidates/{candidate_id}/approve", {"actor": actor}, ) return result.get("result_memory_id") except Exception as e: print(f" ERROR: auto-approve failed: {e}", file=sys.stderr) return None def main() -> None: parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector (tiered, stdlib-only host)") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--project", default="", help="Only scan this project (empty = all)") parser.add_argument("--similarity-threshold", type=float, default=0.88) parser.add_argument("--max-batch", type=int, default=50, help="Max clusters to process per run") parser.add_argument("--model", default=DEFAULT_MODEL) parser.add_argument("--tier2-model", default=DEFAULT_TIER2_MODEL) parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S) parser.add_argument("--no-auto-approve", action="store_true", help="Disable autonomous merging; all merges land in human triage queue") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() base = args.base_url.rstrip("/") autonomous = not args.no_auto_approve print( f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | " f"tier1={args.model} tier2={args.tier2_model} | " f"autonomous={autonomous} | " f"auto-approve: conf>={AUTO_APPROVE_CONF} sim>={AUTO_APPROVE_SIM}" ) clusters = fetch_clusters( base, args.project, args.similarity_threshold, args.max_batch, ) if not clusters: print("no clusters — nothing to dedup") return auto_merged_tier1 = 0 auto_merged_tier2 = 0 human_candidates = 0 tier1_rejections = 0 tier2_overrides = 0 skipped_existing = 0 processed = 0 for cluster in clusters: if processed >= args.max_batch: break processed += 1 sources = cluster["sources"] ids = cluster["memory_ids"] min_sim = float(cluster["min_similarity"]) proj = cluster.get("project") or "(global)" mtype = cluster.get("memory_type") or "?" print(f"\n[{proj}/{mtype}] cluster size={cluster['size']} min_sim={min_sim:.3f} " f"{[s['id'][:8] for s in sources]}") tier1 = draft_merge(sources, args.model, args.timeout_s) if tier1 is None: continue if tier1["action"] == "reject": tier1_rejections += 1 print(f" TIER-1 rejected: {tier1['reason'][:100]}") continue # All sources share the bucket by construction from the server bucket_ok = True tier1_ok = ( tier1["confidence"] >= AUTO_APPROVE_CONF and min_sim >= AUTO_APPROVE_SIM and bucket_ok ) if autonomous and tier1_ok: cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) if cid == "dry-run": auto_merged_tier1 += 1 print(" [dry-run] would auto-merge (tier-1)") elif cid: new_id = auto_approve(base, cid, actor="auto-dedup-tier1", dry_run=args.dry_run) if new_id: auto_merged_tier1 += 1 print(f" ✅ auto-merged (tier-1) → {str(new_id)[:8]}") else: human_candidates += 1 print(f" ⚠️ tier-1 approve failed; candidate {cid[:8]} pending") else: skipped_existing += 1 time.sleep(0.3) continue tier2_eligible = ( autonomous and min_sim >= TIER2_MIN_SIM and tier1["confidence"] >= TIER2_MIN_CONF ) if tier2_eligible: print(" → escalating to tier-2 (opus)…") tier2 = tier2_review(sources, tier1, args.tier2_model, args.timeout_s) if tier2 is None: cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) if cid and cid != "dry-run": human_candidates += 1 print(f" → candidate {cid[:8]} (tier-2 errored, human review)") time.sleep(0.5) continue if tier2["action"] == "reject": tier2_overrides += 1 print(f" ❌ TIER-2 override (reject): {tier2['reason'][:100]}") time.sleep(0.5) continue if tier2["confidence"] >= AUTO_APPROVE_CONF: cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) if cid == "dry-run": auto_merged_tier2 += 1 elif cid: new_id = auto_approve(base, cid, actor="auto-dedup-tier2", dry_run=args.dry_run) if new_id: auto_merged_tier2 += 1 print(f" ✅ auto-merged (tier-2) → {str(new_id)[:8]}") else: human_candidates += 1 else: skipped_existing += 1 time.sleep(0.5) continue cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) if cid and cid != "dry-run": human_candidates += 1 print(f" → candidate {cid[:8]} (tier-2 low-conf, human review)") time.sleep(0.5) continue # Below tier-2 thresholds — human review with tier-1 draft cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) if cid == "dry-run": human_candidates += 1 elif cid: human_candidates += 1 print(f" → candidate {cid[:8]} (human review)") else: skipped_existing += 1 time.sleep(0.3) print( f"\nsummary: clusters_processed={processed} " f"auto_merged_tier1={auto_merged_tier1} " f"auto_merged_tier2={auto_merged_tier2} " f"human_candidates={human_candidates} " f"tier1_rejections={tier1_rejections} " f"tier2_overrides={tier2_overrides} " f"skipped_existing={skipped_existing}" ) if __name__ == "__main__": main()