feat: Phase 7A.1 — autonomous merge tiering (sonnet → opus → human)

Dedup detector now merges high-confidence duplicates silently instead
of piling every proposal into a human triage queue. Matches the 3-tier
escalation pattern that auto_triage already uses.

Tiering decision per cluster:
  TIER-1 auto-approve: sonnet confidence >= 0.8 AND min_pairwise_sim >= 0.92
                       AND all sources share project+type → auto-merge silently
                       (actor="auto-dedup-tier1" in audit log)
  TIER-2 escalation:   sonnet 0.5-0.8 conf OR sim 0.85-0.92 → opus second opinion.
                       Opus confirms with conf >= 0.8 → auto-merge (actor="auto-dedup-tier2").
                       Opus overrides (reject) → skip silently.
                       Opus low conf → human triage with opus's refined draft.
  HUMAN triage:        Only the genuinely ambiguous land in /admin/triage.

Env-tunable thresholds:
  ATOCORE_DEDUP_AUTO_APPROVE_CONF (0.8)
  ATOCORE_DEDUP_AUTO_APPROVE_SIM (0.92)
  ATOCORE_DEDUP_TIER2_MIN_CONF (0.5)
  ATOCORE_DEDUP_TIER2_MIN_SIM (0.85)
  ATOCORE_DEDUP_TIER2_MODEL (opus)

New flag --no-auto-approve for kill-switch testing (everything → human queue).

Tests: +6 (tier-2 prompt content, same_bucket edges, min_pairwise_similarity
on identical + transitive clusters). 395 → 401.

Rationale: user asked for autonomous behavior — "this needs to be intelligent,
I don't want to manually triage stuff". Matches the consolidation principle:
never discard details, but let the brain tidy up on its own for the easy cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-18 15:46:26 -04:00
parent 028d4c3594
commit 56d5df0ab4
3 changed files with 374 additions and 29 deletions

View File

@@ -53,6 +53,8 @@ if _SRC_DIR not in sys.path:
from atocore.memory._dedup_prompt import ( # noqa: E402
DEDUP_PROMPT_VERSION,
SYSTEM_PROMPT,
TIER2_SYSTEM_PROMPT,
build_tier2_user_message,
build_user_message,
normalize_merge_verdict,
parse_merge_verdict,
@@ -61,8 +63,18 @@ from atocore.memory.similarity import cluster_by_threshold # noqa: E402
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100")
DEFAULT_MODEL = os.environ.get("ATOCORE_DEDUP_MODEL", "sonnet")
DEFAULT_TIER2_MODEL = os.environ.get("ATOCORE_DEDUP_TIER2_MODEL", "opus")
DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_DEDUP_TIMEOUT_S", "60"))
# Phase 7A.1 — auto-merge tiering thresholds.
# TIER-1 auto-approve: if sonnet confidence >= this AND min pairwise
# similarity >= AUTO_APPROVE_SIM AND all sources share project+type → merge silently.
AUTO_APPROVE_CONF = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_CONF", "0.8"))
AUTO_APPROVE_SIM = float(os.environ.get("ATOCORE_DEDUP_AUTO_APPROVE_SIM", "0.92"))
# TIER-2 escalation band: sonnet uncertain but pair is similar enough to be worth opus time.
TIER2_MIN_CONF = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_CONF", "0.5"))
TIER2_MIN_SIM = float(os.environ.get("ATOCORE_DEDUP_TIER2_MIN_SIM", "0.85"))
_sandbox_cwd = None
@@ -149,18 +161,59 @@ def group_memories(mems: list[dict]) -> dict[tuple[str, str], list[dict]]:
def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, Any] | None:
"""Tier-1 draft: cheap sonnet call proposes the unified content."""
user_msg = build_user_message(sources)
raw, err = call_claude(SYSTEM_PROMPT, user_msg, model, timeout_s)
if err:
print(f" WARN: claude call failed: {err}", file=sys.stderr)
print(f" WARN: claude tier-1 failed: {err}", file=sys.stderr)
return None
parsed = parse_merge_verdict(raw or "")
if parsed is None:
print(f" WARN: could not parse verdict: {(raw or '')[:200]}", file=sys.stderr)
print(f" WARN: could not parse tier-1 verdict: {(raw or '')[:200]}", file=sys.stderr)
return None
return normalize_merge_verdict(parsed)
def tier2_review(
sources: list[dict],
tier1_verdict: dict[str, Any],
model: str,
timeout_s: float,
) -> dict[str, Any] | None:
"""Tier-2 second opinion: opus confirms or overrides the tier-1 draft."""
user_msg = build_tier2_user_message(sources, tier1_verdict)
raw, err = call_claude(TIER2_SYSTEM_PROMPT, user_msg, model, timeout_s)
if err:
print(f" WARN: claude tier-2 failed: {err}", file=sys.stderr)
return None
parsed = parse_merge_verdict(raw or "")
if parsed is None:
print(f" WARN: could not parse tier-2 verdict: {(raw or '')[:200]}", file=sys.stderr)
return None
return normalize_merge_verdict(parsed)
def min_pairwise_similarity(texts: list[str]) -> float:
"""Return the minimum pairwise cosine similarity across N texts.
Used to sanity-check a transitive cluster: A~B~C doesn't guarantee
A~C, so the auto-approve threshold should be met by the WEAKEST
pair, not just by the strongest. If the cluster has N=2 this is just
the one pairwise similarity.
"""
if len(texts) < 2:
return 0.0
# Reuse similarity_matrix rather than computing it ourselves
from atocore.memory.similarity import similarity_matrix
m = similarity_matrix(texts)
min_sim = 1.0
for i in range(len(texts)):
for j in range(i + 1, len(texts)):
if m[i][j] < min_sim:
min_sim = m[i][j]
return min_sim
def submit_candidate(
base_url: str,
memory_ids: list[str],
@@ -192,21 +245,60 @@ def submit_candidate(
return None
def auto_approve(base_url: str, candidate_id: str, actor: str, dry_run: bool) -> str | None:
"""POST /admin/memory/merge-candidates/{id}/approve. Returns result_memory_id."""
if dry_run:
return "dry-run"
try:
result = api_post(
base_url,
f"/admin/memory/merge-candidates/{candidate_id}/approve",
{"actor": actor},
)
return result.get("result_memory_id")
except Exception as e:
print(f" ERROR: auto-approve failed: {e}", file=sys.stderr)
return None
def same_bucket(sources: list[dict]) -> bool:
"""All sources share the same (project, memory_type)."""
if not sources:
return False
proj = (sources[0].get("project") or "").strip().lower()
mtype = (sources[0].get("memory_type") or "").strip().lower()
for s in sources[1:]:
if (s.get("project") or "").strip().lower() != proj:
return False
if (s.get("memory_type") or "").strip().lower() != mtype:
return False
return True
def main() -> None:
parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector")
parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector (tiered)")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--project", default="", help="Only scan this project (empty = all)")
parser.add_argument("--similarity-threshold", type=float, default=0.88)
parser.add_argument("--max-batch", type=int, default=50,
help="Max clusters to propose per run")
parser.add_argument("--model", default=DEFAULT_MODEL)
help="Max clusters to process per run")
parser.add_argument("--model", default=DEFAULT_MODEL, help="Tier-1 model (default: sonnet)")
parser.add_argument("--tier2-model", default=DEFAULT_TIER2_MODEL, help="Tier-2 model (default: opus)")
parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S)
parser.add_argument("--no-auto-approve", action="store_true",
help="Disable autonomous merging; all merges land in human triage queue")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
base = args.base_url.rstrip("/")
autonomous = not args.no_auto_approve
print(f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | model={args.model}")
print(
f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | "
f"tier1={args.model} tier2={args.tier2_model} | "
f"autonomous={autonomous} | "
f"auto-approve: conf>={AUTO_APPROVE_CONF} sim>={AUTO_APPROVE_SIM}"
)
mems = fetch_active_memories(base, args.project or None)
print(f"fetched {len(mems)} active memories")
if not mems:
@@ -216,60 +308,153 @@ def main() -> None:
print(f"grouped into {len(buckets)} (project, memory_type) buckets")
clusters_found = 0
candidates_created = 0
auto_merged_tier1 = 0
auto_merged_tier2 = 0
human_candidates = 0
tier1_rejections = 0
tier2_overrides = 0 # opus disagreed with sonnet
skipped_low_sim = 0
skipped_existing = 0
llm_rejections = 0
processed = 0
for (proj, mtype), group in sorted(buckets.items()):
if len(group) < 2:
continue
if candidates_created >= args.max_batch:
if processed >= args.max_batch:
print(f"reached max-batch={args.max_batch}, stopping")
break
texts = [(m.get("content") or "") for m in group]
clusters = cluster_by_threshold(texts, args.similarity_threshold)
# Keep only non-trivial clusters
clusters = [c for c in clusters if len(c) >= 2]
if not clusters:
continue
print(f"\n[{proj or '(global)'}/{mtype}] {len(group)} mems → {len(clusters)} cluster(s)")
for cluster in clusters:
if candidates_created >= args.max_batch:
if processed >= args.max_batch:
break
clusters_found += 1
sources = [group[i] for i in cluster]
ids = [s["id"] for s in sources]
# Approximate cluster similarity = min pairwise within cluster.
# For reporting, just use threshold (we know all pairs >= threshold
# transitively; min may be lower). Keep it simple.
sim = args.similarity_threshold
print(f" cluster of {len(cluster)}: {[s['id'][:8] for s in sources]}")
cluster_texts = [texts[i] for i in cluster]
min_sim = min_pairwise_similarity(cluster_texts)
print(f" cluster of {len(cluster)} (min_sim={min_sim:.3f}): {[s['id'][:8] for s in sources]}")
verdict = draft_merge(sources, args.model, args.timeout_s)
if verdict is None:
# Tier-1 draft
tier1 = draft_merge(sources, args.model, args.timeout_s)
processed += 1
if tier1 is None:
continue
if verdict["action"] == "reject":
llm_rejections += 1
print(f" LLM rejected: {verdict['reason'][:100]}")
if tier1["action"] == "reject":
tier1_rejections += 1
print(f" TIER-1 rejected: {tier1['reason'][:100]}")
continue
cid = submit_candidate(base, ids, sim, verdict, args.dry_run)
# --- Tiering decision ---
bucket_ok = same_bucket(sources)
tier1_ok = (
tier1["confidence"] >= AUTO_APPROVE_CONF
and min_sim >= AUTO_APPROVE_SIM
and bucket_ok
)
if autonomous and tier1_ok:
cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run)
if cid == "dry-run":
auto_merged_tier1 += 1
print(" [dry-run] would auto-merge (tier-1)")
elif cid:
new_id = auto_approve(base, cid, actor="auto-dedup-tier1", dry_run=args.dry_run)
if new_id:
auto_merged_tier1 += 1
print(f" ✅ auto-merged (tier-1) → {str(new_id)[:8]}")
else:
print(f" ⚠️ tier-1 approve failed; candidate {cid[:8]} left pending")
human_candidates += 1
else:
skipped_existing += 1
time.sleep(0.3)
continue
# Not tier-1 auto-approve. Decide if it's worth tier-2 escalation.
tier2_eligible = (
autonomous
and min_sim >= TIER2_MIN_SIM
and tier1["confidence"] >= TIER2_MIN_CONF
and bucket_ok
)
if tier2_eligible:
print(" → escalating to tier-2 (opus)…")
tier2 = tier2_review(sources, tier1, args.tier2_model, args.timeout_s)
if tier2 is None:
# Opus errored. Fall back to human triage.
cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run)
if cid and cid != "dry-run":
human_candidates += 1
print(f" → candidate {cid[:8]} (tier-2 errored, human review)")
time.sleep(0.5)
continue
if tier2["action"] == "reject":
tier2_overrides += 1
print(f" ❌ TIER-2 override (reject): {tier2['reason'][:100]}")
time.sleep(0.5)
continue
if tier2["confidence"] >= AUTO_APPROVE_CONF:
# Opus confirms. Auto-merge using opus's (possibly refined) content.
cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run)
if cid == "dry-run":
auto_merged_tier2 += 1
print(" [dry-run] would auto-merge (tier-2)")
elif cid:
new_id = auto_approve(base, cid, actor="auto-dedup-tier2", dry_run=args.dry_run)
if new_id:
auto_merged_tier2 += 1
print(f" ✅ auto-merged (tier-2) → {str(new_id)[:8]}")
else:
human_candidates += 1
print(f" ⚠️ tier-2 approve failed; candidate {cid[:8]} left pending")
else:
skipped_existing += 1
time.sleep(0.5)
continue
# Opus confirmed but low confidence → human review with opus's draft
cid = submit_candidate(base, ids, min_sim, tier2, args.dry_run)
if cid and cid != "dry-run":
human_candidates += 1
print(f" → candidate {cid[:8]} (tier-2 low-confidence, human review)")
time.sleep(0.5)
continue
# Below tier-2 eligibility (either non-autonomous mode, or
# similarity too low / cross-bucket). Always human review.
if min_sim < TIER2_MIN_SIM or not bucket_ok:
skipped_low_sim += 1
# Still create a human candidate so it's visible, but log why
print(f" → below auto-tier thresholds (min_sim={min_sim:.3f}, bucket_ok={bucket_ok})")
cid = submit_candidate(base, ids, min_sim, tier1, args.dry_run)
if cid == "dry-run":
candidates_created += 1
human_candidates += 1
elif cid:
candidates_created += 1
print(f" → candidate {cid[:8]}")
human_candidates += 1
print(f" → candidate {cid[:8]} (human review)")
else:
skipped_existing += 1
time.sleep(0.3) # be kind to claude CLI
time.sleep(0.3)
print(
f"\nsummary: clusters_found={clusters_found} "
f"candidates_created={candidates_created} "
f"llm_rejections={llm_rejections} "
f"auto_merged_tier1={auto_merged_tier1} "
f"auto_merged_tier2={auto_merged_tier2} "
f"human_candidates={human_candidates} "
f"tier1_rejections={tier1_rejections} "
f"tier2_overrides={tier2_overrides} "
f"skipped_low_sim={skipped_low_sim} "
f"skipped_existing={skipped_existing}"
)