diff --git a/scripts/memory_dedup.py b/scripts/memory_dedup.py index 1ce8b79..486e282 100644 --- a/scripts/memory_dedup.py +++ b/scripts/memory_dedup.py @@ -53,6 +53,8 @@ if _SRC_DIR not in sys.path: from atocore.memory._dedup_prompt import ( # noqa: E402 DEDUP_PROMPT_VERSION, SYSTEM_PROMPT, + TIER2_SYSTEM_PROMPT, + build_tier2_user_message, build_user_message, normalize_merge_verdict, parse_merge_verdict, @@ -61,8 +63,18 @@ from atocore.memory.similarity import cluster_by_threshold # noqa: E402 DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100") DEFAULT_MODEL = os.environ.get("ATOCORE_DEDUP_MODEL", "sonnet") +DEFAULT_TIER2_MODEL = os.environ.get("ATOCORE_DEDUP_TIER2_MODEL", "opus") DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_DEDUP_TIMEOUT_S", "60")) +# Phase 7A.1 — auto-merge tiering thresholds. +# TIER-1 auto-approve: if sonnet confidence >= this AND min pairwise +# similarity >= AUTO_APPROVE_SIM AND all sources share project+type → merge silently. +AUTO_APPROVE_CONF = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_CONF", "0.8")) +AUTO_APPROVE_SIM = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_SIM", "0.92")) +# TIER-2 escalation band: sonnet uncertain but pair is similar enough to be worth opus time. +TIER2_MIN_CONF = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_CONF", "0.5")) +TIER2_MIN_SIM = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_SIM", "0.85")) + _sandbox_cwd = None @@ -149,18 +161,59 @@ def group_memories(mems: list[dict]) -> dict[tuple[str, str], list[dict]]: def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, Any] | None: + """Tier-1 draft: cheap sonnet call proposes the unified content.""" user_msg = build_user_message(sources) raw, err = call_claude(SYSTEM_PROMPT, user_msg, model, timeout_s) if err: - print(f" WARN: claude call failed: {err}", file=sys.stderr) + print(f" WARN: claude tier-1 failed: {err}", file=sys.stderr) return None parsed = parse_merge_verdict(raw or "") if parsed is None: - print(f" WARN: could not parse verdict: {(raw or '')[:200]}", file=sys.stderr) + print(f" WARN: could not parse tier-1 verdict: {(raw or '')[:200]}", file=sys.stderr) return None return normalize_merge_verdict(parsed) +def tier2_review( + sources: list[dict], + tier1_verdict: dict[str, Any], + model: str, + timeout_s: float, +) -> dict[str, Any] | None: + """Tier-2 second opinion: opus confirms or overrides the tier-1 draft.""" + user_msg = build_tier2_user_message(sources, tier1_verdict) + raw, err = call_claude(TIER2_SYSTEM_PROMPT, user_msg, model, timeout_s) + if err: + print(f" WARN: claude tier-2 failed: {err}", file=sys.stderr) + return None + parsed = parse_merge_verdict(raw or "") + if parsed is None: + print(f" WARN: could not parse tier-2 verdict: {(raw or '')[:200]}", file=sys.stderr) + return None + return normalize_merge_verdict(parsed) + + +def min_pairwise_similarity(texts: list[str]) -> float: + """Return the minimum pairwise cosine similarity across N texts. + + Used to sanity-check a transitive cluster: A~B~C doesn't guarantee + A~C, so the auto-approve threshold should be met by the WEAKEST + pair, not just by the strongest. If the cluster has N=2 this is just + the one pairwise similarity. + """ + if len(texts) < 2: + return 0.0 + # Reuse similarity_matrix rather than computing it ourselves + from atocore.memory.similarity import similarity_matrix + m = similarity_matrix(texts) + min_sim = 1.0 + for i in range(len(texts)): + for j in range(i + 1, len(texts)): + if m[i][j] < min_sim: + min_sim = m[i][j] + return min_sim + + def submit_candidate( base_url: str, memory_ids: list[str], @@ -192,21 +245,60 @@ def submit_candidate( return None +def auto_approve(base_url: str, candidate_id: str, actor: str, dry_run: bool) -> str | None: + """POST /admin/memory/merge-candidates/{id}/approve. Returns result_memory_id.""" + if dry_run: + return "dry-run" + try: + result = api_post( + base_url, + f"/admin/memory/merge-candidates/{candidate_id}/approve", + {"actor": actor}, + ) + return result.get("result_memory_id") + except Exception as e: + print(f" ERROR: auto-approve failed: {e}", file=sys.stderr) + return None + + +def same_bucket(sources: list[dict]) -> bool: + """All sources share the same (project, memory_type).""" + if not sources: + return False + proj = (sources[0].get("project") or "").strip().lower() + mtype = (sources[0].get("memory_type") or "").strip().lower() + for s in sources[1:]: + if (s.get("project") or "").strip().lower() != proj: + return False + if (s.get("memory_type") or "").strip().lower() != mtype: + return False + return True + + def main() -> None: - parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector") + parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector (tiered)") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--project", default="", help="Only scan this project (empty = all)") parser.add_argument("--similarity-threshold", type=float, default=0.88) parser.add_argument("--max-batch", type=int, default=50, - help="Max clusters to propose per run") - parser.add_argument("--model", default=DEFAULT_MODEL) + help="Max clusters to process per run") + parser.add_argument("--model", default=DEFAULT_MODEL, help="Tier-1 model (default: sonnet)") + parser.add_argument("--tier2-model", default=DEFAULT_TIER2_MODEL, help="Tier-2 model (default: opus)") parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S) + parser.add_argument("--no-auto-approve", action="store_true", + help="Disable autonomous merging; all merges land in human triage queue") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() base = args.base_url.rstrip("/") + autonomous = not args.no_auto_approve - print(f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | model={args.model}") + print( + f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | " + f"tier1={args.model} tier2={args.tier2_model} | " + f"autonomous={autonomous} | " + f"auto-approve: conf>={AUTO_APPROVE_CONF} sim>={AUTO_APPROVE_SIM}" + ) mems = fetch_active_memories(base, args.project or None) print(f"fetched {len(mems)} active memories") if not mems: @@ -216,60 +308,153 @@ def main() -> None: print(f"grouped into {len(buckets)} (project, memory_type) buckets") clusters_found = 0 - candidates_created = 0 + auto_merged_tier1 = 0 + auto_merged_tier2 = 0 + human_candidates = 0 + tier1_rejections = 0 + tier2_overrides = 0 # opus disagreed with sonnet + skipped_low_sim = 0 skipped_existing = 0 - llm_rejections = 0 + processed = 0 for (proj, mtype), group in sorted(buckets.items()): if len(group) < 2: continue - if candidates_created >= args.max_batch: + if processed >= args.max_batch: print(f"reached max-batch={args.max_batch}, stopping") break texts = [(m.get("content") or "") for m in group] clusters = cluster_by_threshold(texts, args.similarity_threshold) - # Keep only non-trivial clusters clusters = [c for c in clusters if len(c) >= 2] if not clusters: continue print(f"\n[{proj or '(global)'}/{mtype}] {len(group)} mems → {len(clusters)} cluster(s)") for cluster in clusters: - if candidates_created >= args.max_batch: + if processed >= args.max_batch: break clusters_found += 1 sources = [group[i] for i in cluster] ids = [s["id"] for s in sources] - # Approximate cluster similarity = min pairwise within cluster. - # For reporting, just use threshold (we know all pairs >= threshold - # transitively; min may be lower). Keep it simple. - sim = args.similarity_threshold - print(f" cluster of {len(cluster)}: {[s['id'][:8] for s in sources]}") + cluster_texts = [texts[i] for i in cluster] + min_sim = min_pairwise_similarity(cluster_texts) + print(f" cluster of {len(cluster)} (min_sim={min_sim:.3f}): {[s['id'][:8] for s in sources]}") - verdict = draft_merge(sources, args.model, args.timeout_s) - if verdict is None: + # Tier-1 draft + tier1 = draft_merge(sources, args.model, args.timeout_s) + processed += 1 + if tier1 is None: continue - if verdict["action"] == "reject": - llm_rejections += 1 - print(f" LLM rejected: {verdict['reason'][:100]}") + if tier1["action"] == "reject": + tier1_rejections += 1 + print(f" TIER-1 rejected: {tier1['reason'][:100]}") continue - cid = submit_candidate(base, ids, sim, verdict, args.dry_run) + # --- Tiering decision --- + bucket_ok = same_bucket(sources) + tier1_ok = ( + tier1["confidence"] >= AUTO_APPROVE_CONF + and min_sim >= AUTO_APPROVE_SIM + and bucket_ok + ) + + if autonomous and tier1_ok: + cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) + if cid == "dry-run": + auto_merged_tier1 += 1 + print(" [dry-run] would auto-merge (tier-1)") + elif cid: + new_id = auto_approve(base, cid, actor="auto-dedup-tier1", dry_run=args.dry_run) + if new_id: + auto_merged_tier1 += 1 + print(f" ✅ auto-merged (tier-1) → {str(new_id)[:8]}") + else: + print(f" ⚠️ tier-1 approve failed; candidate {cid[:8]} left pending") + human_candidates += 1 + else: + skipped_existing += 1 + time.sleep(0.3) + continue + + # Not tier-1 auto-approve. Decide if it's worth tier-2 escalation. + tier2_eligible = ( + autonomous + and min_sim >= TIER2_MIN_SIM + and tier1["confidence"] >= TIER2_MIN_CONF + and bucket_ok + ) + + if tier2_eligible: + print(" → escalating to tier-2 (opus)…") + tier2 = tier2_review(sources, tier1, args.tier2_model, args.timeout_s) + if tier2 is None: + # Opus errored. Fall back to human triage. + cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) + if cid and cid != "dry-run": + human_candidates += 1 + print(f" → candidate {cid[:8]} (tier-2 errored, human review)") + time.sleep(0.5) + continue + + if tier2["action"] == "reject": + tier2_overrides += 1 + print(f" ❌ TIER-2 override (reject): {tier2['reason'][:100]}") + time.sleep(0.5) + continue + + if tier2["confidence"] >= AUTO_APPROVE_CONF: + # Opus confirms. Auto-merge using opus's (possibly refined) content. + cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) + if cid == "dry-run": + auto_merged_tier2 += 1 + print(" [dry-run] would auto-merge (tier-2)") + elif cid: + new_id = auto_approve(base, cid, actor="auto-dedup-tier2", dry_run=args.dry_run) + if new_id: + auto_merged_tier2 += 1 + print(f" ✅ auto-merged (tier-2) → {str(new_id)[:8]}") + else: + human_candidates += 1 + print(f" ⚠️ tier-2 approve failed; candidate {cid[:8]} left pending") + else: + skipped_existing += 1 + time.sleep(0.5) + continue + + # Opus confirmed but low confidence → human review with opus's draft + cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run) + if cid and cid != "dry-run": + human_candidates += 1 + print(f" → candidate {cid[:8]} (tier-2 low-confidence, human review)") + time.sleep(0.5) + continue + + # Below tier-2 eligibility (either non-autonomous mode, or + # similarity too low / cross-bucket). Always human review. + if min_sim < TIER2_MIN_SIM or not bucket_ok: + skipped_low_sim += 1 + # Still create a human candidate so it's visible, but log why + print(f" → below auto-tier thresholds (min_sim={min_sim:.3f}, bucket_ok={bucket_ok})") + + cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run) if cid == "dry-run": - candidates_created += 1 + human_candidates += 1 elif cid: - candidates_created += 1 - print(f" → candidate {cid[:8]}") + human_candidates += 1 + print(f" → candidate {cid[:8]} (human review)") else: skipped_existing += 1 - - time.sleep(0.3) # be kind to claude CLI + time.sleep(0.3) print( f"\nsummary: clusters_found={clusters_found} " - f"candidates_created={candidates_created} " - f"llm_rejections={llm_rejections} " + f"auto_merged_tier1={auto_merged_tier1} " + f"auto_merged_tier2={auto_merged_tier2} " + f"human_candidates={human_candidates} " + f"tier1_rejections={tier1_rejections} " + f"tier2_overrides={tier2_overrides} " + f"skipped_low_sim={skipped_low_sim} " f"skipped_existing={skipped_existing}" ) diff --git a/src/atocore/memory/_dedup_prompt.py b/src/atocore/memory/_dedup_prompt.py index 95c5777..1b3c64f 100644 --- a/src/atocore/memory/_dedup_prompt.py +++ b/src/atocore/memory/_dedup_prompt.py @@ -53,6 +53,50 @@ OUTPUT — raw JSON, no prose, no markdown fences: On action=reject, still fill content with a short explanation and set confidence=0.""" +TIER2_SYSTEM_PROMPT = """You are the second-opinion reviewer for AtoCore's memory-consolidation pipeline. + +A tier-1 model (cheaper, faster) already drafted a unified memory from N near-duplicate source memories. Your job is to either CONFIRM the merge (refining the content if you see a clearer phrasing) or OVERRIDE with action="reject" if the tier-1 missed something important. + +You must be STRICTER than tier-1. Specifically, REJECT if: +- The sources are about different subjects that share vocabulary (e.g., different components within the same project) +- The tier-1 draft dropped specifics that existed in the sources (numbers, dates, vendors, people, part IDs) +- One source contradicts another and the draft glossed over it +- The sources span a timeline of a changing state (should be preserved as a sequence, not collapsed) + +If you CONFIRM, you may polish the content — but preserve every specific from every source. + +Same output schema as tier-1: +{ + "action": "merge" | "reject", + "content": "the unified memory content", + "memory_type": "knowledge|project|preference|adaptation|episodic|identity", + "project": "project-slug or empty", + "domain_tags": ["tag1", "tag2"], + "confidence": 0.5, + "reason": "one sentence — what you confirmed or why you overrode" +} + +Raw JSON only, no prose, no markdown fences.""" + + +def build_tier2_user_message(sources: list[dict[str, Any]], tier1_verdict: dict[str, Any]) -> str: + """Format tier-2 review payload: same sources + tier-1's draft.""" + base = build_user_message(sources) + draft_summary = ( + f"\n\n--- TIER-1 DRAFT (for your review) ---\n" + f"action: {tier1_verdict.get('action')}\n" + f"confidence: {tier1_verdict.get('confidence', 0):.2f}\n" + f"proposed content: {(tier1_verdict.get('content') or '')[:600]}\n" + f"proposed memory_type: {tier1_verdict.get('memory_type', '')}\n" + f"proposed project: {tier1_verdict.get('project', '')}\n" + f"proposed tags: {tier1_verdict.get('domain_tags', [])}\n" + f"tier-1 reason: {tier1_verdict.get('reason', '')[:300]}\n" + f"---\n\n" + f"Return your JSON verdict now. Confirm or override." + ) + return base.replace("Return the JSON object now.", "").rstrip() + draft_summary + + def build_user_message(sources: list[dict[str, Any]]) -> str: """Format N source memories for the model to consolidate. diff --git a/tests/test_memory_dedup.py b/tests/test_memory_dedup.py index 345e543..321c5f6 100644 --- a/tests/test_memory_dedup.py +++ b/tests/test_memory_dedup.py @@ -15,6 +15,8 @@ from __future__ import annotations import pytest from atocore.memory._dedup_prompt import ( + TIER2_SYSTEM_PROMPT, + build_tier2_user_message, normalize_merge_verdict, parse_merge_verdict, ) @@ -119,6 +121,120 @@ def test_normalize_merge_verdict_rejects_unknown_action(): assert normalize_merge_verdict({"action": "?", "content": "x"}) is None +# --- Tier-2 (Phase 7A.1) --- + + +def test_tier2_prompt_is_stricter(): + # The tier-2 system prompt must explicitly instruct the model to be + # stricter than tier-1 — that's the whole point of escalation. + assert "STRICTER" in TIER2_SYSTEM_PROMPT + assert "REJECT" in TIER2_SYSTEM_PROMPT + + +def test_build_tier2_user_message_includes_tier1_draft(): + sources = [{ + "id": "abc12345", "content": "source text A", + "memory_type": "knowledge", "project": "p04", + "domain_tags": ["optics"], "confidence": 0.6, + "valid_until": "", "reference_count": 2, + }, { + "id": "def67890", "content": "source text B", + "memory_type": "knowledge", "project": "p04", + "domain_tags": ["optics"], "confidence": 0.7, + "valid_until": "", "reference_count": 1, + }] + tier1 = { + "action": "merge", + "content": "unified draft by tier1", + "memory_type": "knowledge", + "project": "p04", + "domain_tags": ["optics"], + "confidence": 0.65, + "reason": "near-paraphrase", + } + msg = build_tier2_user_message(sources, tier1) + assert "source text A" in msg + assert "source text B" in msg + assert "TIER-1 DRAFT" in msg + assert "unified draft by tier1" in msg + assert "near-paraphrase" in msg + # Should end asking for a verdict + assert "verdict" in msg.lower() + + +# --- Tiering helpers (min_pairwise_similarity, same_bucket) --- + + +def test_same_bucket_true_for_matching(): + import importlib.util + spec = importlib.util.spec_from_file_location( + "memory_dedup_for_test", + "scripts/memory_dedup.py", + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + sources = [ + {"memory_type": "knowledge", "project": "p04"}, + {"memory_type": "knowledge", "project": "p04"}, + ] + assert mod.same_bucket(sources) is True + + +def test_same_bucket_false_for_mixed(): + import importlib.util + spec = importlib.util.spec_from_file_location( + "memory_dedup_for_test", + "scripts/memory_dedup.py", + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + # Different project + assert mod.same_bucket([ + {"memory_type": "knowledge", "project": "p04"}, + {"memory_type": "knowledge", "project": "p05"}, + ]) is False + # Different memory_type + assert mod.same_bucket([ + {"memory_type": "knowledge", "project": "p04"}, + {"memory_type": "project", "project": "p04"}, + ]) is False + + +def test_min_pairwise_similarity_identical_texts(): + import importlib.util + spec = importlib.util.spec_from_file_location( + "memory_dedup_for_test", + "scripts/memory_dedup.py", + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + # Three identical texts — min should be ~1.0 + ms = mod.min_pairwise_similarity(["hello world"] * 3) + assert 0.99 <= ms <= 1.0 + + +def test_min_pairwise_similarity_mixed_cluster(): + """Transitive cluster A~B~C with A and C actually quite different + should expose a low min even though A~B and B~C are high.""" + import importlib.util + spec = importlib.util.spec_from_file_location( + "memory_dedup_for_test", + "scripts/memory_dedup.py", + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + ms = mod.min_pairwise_similarity([ + "Antoine prefers OAuth over API keys", + "Antoine's OAuth preference", + "USB SSD mandatory for polisher firmware", + ]) + assert ms < 0.6 # Third is unrelated; min is far below threshold + + # --- create_merge_candidate idempotency ---