diff --git a/DEV-LEDGER.md b/DEV-LEDGER.md
index 5cfbea9..1cad66c 100644
--- a/DEV-LEDGER.md
+++ b/DEV-LEDGER.md
@@ -7,9 +7,9 @@
## Orientation
- **live_sha** (Dalidou `/health` build_sha): `775960c` (verified 2026-04-16 via /health, build_time 2026-04-16T17:59:30Z)
-- **last_updated**: 2026-04-16 by Claude ("Make It Actually Useful" sprint β observability + Phase 10)
+- **last_updated**: 2026-04-18 by Claude (Phase 7A β Memory Consolidation "sleep cycle" V1 on branch, not yet deployed)
- **main_tip**: `999788b`
-- **test_count**: 303 (4 new Phase 10 tests)
+- **test_count**: 395 (21 new Phase 7A dedup tests + accumulated Phase 5/6 tests since last ledger refresh)
- **harness**: `17/18 PASS` on live Dalidou (p04-constraints expects "Zerodur" β retrieval content gap, not regression)
- **vectors**: 33,253
- **active_memories**: 84 (31 project, 23 knowledge, 10 episodic, 8 adaptation, 7 preference, 5 identity)
@@ -160,6 +160,8 @@ One branch `codex/extractor-eval-loop` for Day 1-5, a second `codex/retrieval-ha
## Session Log
+- **2026-04-18 Claude** **Phase 7A β Memory Consolidation V1 ("sleep cycle") landed on branch.** New `docs/PHASE-7-MEMORY-CONSOLIDATION.md` covers all 8 subphases (7A dedup, 7B contradictions, 7C tag canon, 7D confidence decay, 7E memory detail, 7F domain view, 7F re-extract, 7H vector hygiene). 7A implementation: schema migration `memory_merge_candidates`, `atocore.memory.similarity` (cosine + transitive cluster), stdlib-only `atocore.memory._dedup_prompt` (llm drafts unified content preserving all specifics), `merge_memories()` + `create_merge_candidate()` + `get_merge_candidates()` + `reject_merge_candidate()` in service.py, host-side `scripts/memory_dedup.py` (HTTP + claude -p, idempotent via sorted-id set), 5 new endpoints under `/admin/memory/merge-candidates*` + `/admin/memory/dedup-scan` + `/admin/memory/dedup-status`, purple-themed "π Merge Candidates" section in /admin/triage with editable draft + approve/reject buttons, "π Scan for duplicates" control bar with threshold slider, nightly Step B3 in batch-extract.sh (0.90 daily, 0.85 Sundays deep), `deploy/dalidou/dedup-watcher.sh` host watcher for UI-triggered scans (mirrors graduation-watcher pattern). 21 new tests (similarity, prompt parse, idempotency, merge happy path, override content/tags, audit rows, abort-if-source-tampered, reject leaves sources alone, schema). Tests 374 β 395. Not yet deployed; harness not re-run. Next: push + deploy, install `dedup-watcher.sh` in host cron, trigger first scan, review proposals in UI.
+
- **2026-04-16 Claude** `b687e7f..999788b` **"Make It Actually Useful" sprint.** Two-part session: ops fixes then consolidation sprint.
**Part 1 β Ops fixes:** Deployed `b687e7f` (project inference from cwd). Fixed cron logging (was `/dev/null` β redirected to `~/atocore-logs/`). Fixed OpenClaw gateway crash-loop (`discord.replyToMode: "any"` invalid β `"all"`). Deployed `atocore-capture` plugin on T420 OpenClaw using `before_agent_start` + `llm_output` hooks β verified end-to-end: 38 `client=openclaw` interactions captured. Backfilled project tags on 179/181 unscoped interactions (165 atocore, 8 p06, 6 p04).
diff --git a/deploy/dalidou/batch-extract.sh b/deploy/dalidou/batch-extract.sh
index b4c7a0a..a69ec97 100644
--- a/deploy/dalidou/batch-extract.sh
+++ b/deploy/dalidou/batch-extract.sh
@@ -166,6 +166,26 @@ curl -sSf -X POST "$ATOCORE_URL/admin/memory/extend-reinforced" \
log "WARN: extend-reinforced failed (non-blocking)"
}
+# Step B3: Memory dedup scan (Phase 7A)
+# Nightly at 0.90 (tight β only near-duplicates). Sundays run a deeper
+# pass at 0.85 to catch semantically-similar-but-differently-worded memories.
+if [[ "$(date -u +%u)" == "7" ]]; then
+ DEDUP_THRESHOLD="0.85"
+ DEDUP_BATCH="80"
+ log "Step B3: memory dedup (Sunday deep pass, threshold $DEDUP_THRESHOLD)"
+else
+ DEDUP_THRESHOLD="0.90"
+ DEDUP_BATCH="50"
+ log "Step B3: memory dedup (daily, threshold $DEDUP_THRESHOLD)"
+fi
+python3 "$APP_DIR/scripts/memory_dedup.py" \
+ --base-url "$ATOCORE_URL" \
+ --similarity-threshold "$DEDUP_THRESHOLD" \
+ --max-batch "$DEDUP_BATCH" \
+ 2>&1 || {
+ log "WARN: memory dedup failed (non-blocking)"
+}
+
# Step G: Integrity check (Phase 4 V1)
log "Step G: integrity check"
python3 "$APP_DIR/scripts/integrity_check.py" \
diff --git a/deploy/dalidou/dedup-watcher.sh b/deploy/dalidou/dedup-watcher.sh
new file mode 100644
index 0000000..578213e
--- /dev/null
+++ b/deploy/dalidou/dedup-watcher.sh
@@ -0,0 +1,110 @@
+#!/usr/bin/env bash
+#
+# deploy/dalidou/dedup-watcher.sh
+# -------------------------------
+# Host-side watcher for on-demand memory dedup scans (Phase 7A).
+#
+# The /admin/triage page has a "π Scan for duplicates" button that POSTs
+# to /admin/memory/dedup-scan with {project, similarity_threshold, max_batch}.
+# The container writes this to project_state (atocore/config/dedup_requested_at).
+#
+# This script runs on the Dalidou HOST (where claude CLI lives), polls
+# for the flag, and runs memory_dedup.py when seen.
+#
+# Installed via cron every 2 minutes:
+# */2 * * * * /srv/storage/atocore/app/deploy/dalidou/dedup-watcher.sh \
+# >> /home/papa/atocore-logs/dedup-watcher.log 2>&1
+#
+# Mirrors deploy/dalidou/graduation-watcher.sh exactly.
+
+set -euo pipefail
+
+ATOCORE_URL="${ATOCORE_URL:-http://127.0.0.1:8100}"
+APP_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
+LOCK_FILE="/tmp/atocore-dedup.lock"
+LOG_DIR="/home/papa/atocore-logs"
+mkdir -p "$LOG_DIR"
+
+TS="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
+log() { printf '[%s] %s\n' "$TS" "$*"; }
+
+# Fetch the flag via API
+STATE_JSON=$(curl -sSf --max-time 5 "$ATOCORE_URL/project/state/atocore" 2>/dev/null || echo "{}")
+REQUESTED=$(echo "$STATE_JSON" | python3 -c "
+import sys, json
+try:
+ d = json.load(sys.stdin)
+ for e in d.get('entries', d.get('state', [])):
+ if e.get('category') == 'config' and e.get('key') == 'dedup_requested_at':
+ print(e.get('value', ''))
+ break
+except Exception:
+ pass
+" 2>/dev/null || echo "")
+
+if [[ -z "$REQUESTED" ]]; then
+ exit 0
+fi
+
+PROJECT=$(echo "$REQUESTED" | python3 -c "import sys,json; print(json.loads(sys.stdin.read() or '{}').get('project',''))" 2>/dev/null || echo "")
+THRESHOLD=$(echo "$REQUESTED" | python3 -c "import sys,json; print(json.loads(sys.stdin.read() or '{}').get('similarity_threshold',0.88))" 2>/dev/null || echo "0.88")
+MAX_BATCH=$(echo "$REQUESTED" | python3 -c "import sys,json; print(json.loads(sys.stdin.read() or '{}').get('max_batch',50))" 2>/dev/null || echo "50")
+
+# Acquire lock
+exec 9>"$LOCK_FILE" || exit 0
+if ! flock -n 9; then
+ log "dedup already running, skipping"
+ exit 0
+fi
+
+# Mark running
+curl -sSf -X POST "$ATOCORE_URL/project/state" \
+ -H 'Content-Type: application/json' \
+ -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_running\",\"value\":\"1\",\"source\":\"dedup watcher\"}" \
+ >/dev/null 2>&1 || true
+curl -sSf -X POST "$ATOCORE_URL/project/state" \
+ -H 'Content-Type: application/json' \
+ -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_last_started_at\",\"value\":\"$TS\",\"source\":\"dedup watcher\"}" \
+ >/dev/null 2>&1 || true
+
+LOG_FILE="$LOG_DIR/dedup-ondemand-$(date -u +%Y%m%d-%H%M%S).log"
+log "Starting dedup (project='$PROJECT' threshold=$THRESHOLD max_batch=$MAX_BATCH, log: $LOG_FILE)"
+
+# Clear the flag BEFORE running so duplicate clicks queue at most one
+curl -sSf -X DELETE "$ATOCORE_URL/project/state" \
+ -H 'Content-Type: application/json' \
+ -d "{\"project\":\"atocore\",\"category\":\"config\",\"key\":\"dedup_requested_at\"}" \
+ >/dev/null 2>&1 || true
+
+cd "$APP_DIR"
+export PYTHONPATH="$APP_DIR/src:${PYTHONPATH:-}"
+ARGS=(--base-url "$ATOCORE_URL" --similarity-threshold "$THRESHOLD" --max-batch "$MAX_BATCH")
+if [[ -n "$PROJECT" ]]; then
+ ARGS+=(--project "$PROJECT")
+fi
+
+if python3 scripts/memory_dedup.py "${ARGS[@]}" >> "$LOG_FILE" 2>&1; then
+ RESULT=$(grep "^summary:" "$LOG_FILE" | tail -1 || tail -1 "$LOG_FILE")
+ RESULT="${RESULT:-completed}"
+ log "dedup finished: $RESULT"
+else
+ RESULT="ERROR β see $LOG_FILE"
+ log "dedup FAILED"
+fi
+
+FINISH_TS="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
+
+curl -sSf -X POST "$ATOCORE_URL/project/state" \
+ -H 'Content-Type: application/json' \
+ -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_running\",\"value\":\"0\",\"source\":\"dedup watcher\"}" \
+ >/dev/null 2>&1 || true
+curl -sSf -X POST "$ATOCORE_URL/project/state" \
+ -H 'Content-Type: application/json' \
+ -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_last_finished_at\",\"value\":\"$FINISH_TS\",\"source\":\"dedup watcher\"}" \
+ >/dev/null 2>&1 || true
+
+SAFE_RESULT=$(printf '%s' "$RESULT" | python3 -c "import sys,json; print(json.dumps(sys.stdin.read())[1:-1])")
+curl -sSf -X POST "$ATOCORE_URL/project/state" \
+ -H 'Content-Type: application/json' \
+ -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_last_result\",\"value\":\"$SAFE_RESULT\",\"source\":\"dedup watcher\"}" \
+ >/dev/null 2>&1 || true
diff --git a/docs/PHASE-7-MEMORY-CONSOLIDATION.md b/docs/PHASE-7-MEMORY-CONSOLIDATION.md
new file mode 100644
index 0000000..96b6064
--- /dev/null
+++ b/docs/PHASE-7-MEMORY-CONSOLIDATION.md
@@ -0,0 +1,96 @@
+# Phase 7 β Memory Consolidation (the "Sleep Cycle")
+
+**Status**: 7A in progress Β· 7B-H scoped, deferred
+**Design principle**: *"Like human memory while sleeping, but more robotic β never discard relevant details. Consolidate, update, supersede β don't delete."*
+
+## Why
+
+Phases 1β6 built capture + triage + graduation + emerging-project detection. What they don't solve:
+
+| # | Problem | Fix |
+|---|---|---|
+| 1 | Redundancy β "APM uses NX" said 5 different ways across 5 memories | **7A** Semantic dedup |
+| 2 | Latent contradictions β "chose Zygo" + "switched from Zygo" both active | **7B** Pair contradiction detection |
+| 3 | Tag drift β `firmware`, `fw`, `firmware-control` fragment retrieval | **7C** Tag canonicalization |
+| 4 | Confidence staleness β 6-month unreferenced memory ranks as fresh | **7D** Confidence decay |
+| 5 | No memory drill-down page | **7E** `/wiki/memories/{id}` |
+| 6 | Domain knowledge siloed per project | **7F** `/wiki/domains/{tag}` |
+| 7 | Prompt upgrades (llm-0.5 β 0.6) don't re-process old interactions | **7G** Re-extraction on version bump |
+| 8 | Superseded memory vectors still in Chroma polluting retrieval | **7H** Vector hygiene |
+
+Collectively: the brain needs a nightly pass that looks at what it already knows and tidies up β dedup, resolve contradictions, canonicalize tags, decay stale facts β **without losing information**.
+
+## Subphases
+
+### 7A β Semantic dedup + consolidation *(this sprint)*
+
+Compute embeddings on active memories, find pairs within `(project, memory_type)` bucket above similarity threshold (default 0.88), cluster, draft a unified memory via LLM, human approves in triage UI. On approve: sources become `superseded`, new merged memory created with union of `source_refs`, sum of `reference_count`, max of `confidence`. **Ships first** because redundancy compounds β every new memory potentially duplicates an old one.
+
+Detailed spec lives in the working plan (`dapper-cooking-tower.md`) and across the files listed under "Files touched" below. Key decisions:
+
+- LLM drafts, human approves β no silent auto-merge.
+- Same `(project, memory_type)` bucket only. Cross-project merges are rare + risky β separate flow in 7B.
+- Recompute embeddings each scan (~2s / 335 memories). Persist only if scan time becomes a problem.
+- Cluster-based proposals (A~B~C β one merge), not pair-based.
+- `status=superseded` never deleted β still queryable with filter.
+
+**Schema**: new table `memory_merge_candidates` (pending | approved | rejected).
+**Cron**: nightly at threshold 0.90 (tight); weekly (Sundays) at 0.85 (deeper cleanup).
+**UI**: new "π Merge Candidates" section in `/admin/triage`.
+
+**Files touched in 7A**:
+- `src/atocore/models/database.py` β migration
+- `src/atocore/memory/similarity.py` β new, `compute_memory_similarity()`
+- `src/atocore/memory/_dedup_prompt.py` β new, shared LLM prompt
+- `src/atocore/memory/service.py` β `merge_memories()`
+- `scripts/memory_dedup.py` β new, host-side detector (HTTP-only)
+- `src/atocore/api/routes.py` β 5 new endpoints under `/admin/memory/`
+- `src/atocore/engineering/triage_ui.py` β merge cards section
+- `deploy/dalidou/batch-extract.sh` β Step B3
+- `deploy/dalidou/dedup-watcher.sh` β new, UI-triggered scans
+- `tests/test_memory_dedup.py` β ~10-15 new tests
+
+### 7B β Memory-to-memory contradiction detection
+
+Same embedding-pair machinery as 7A but within a *different* band (similarity 0.70β0.88 β semantically related but different wording). LLM classifies each pair: `duplicate | complementary | contradicts | supersedes-older`. Contradictions write a `memory_conflicts` row + surface a triage badge. Clear supersessions (both tier 1 sonnet and tier 2 opus agree) auto-mark the older as `superseded`.
+
+### 7C β Tag canonicalization
+
+Weekly LLM pass over `domain_tags` distribution, proposes `alias β canonical` map (e.g. `fw β firmware`). Human approves via UI (one-click pattern, same as emerging-project registration). Bulk-rewrites `domain_tags` atomically across all memories.
+
+### 7D β Confidence decay
+
+Daily lightweight job. For memories with `reference_count=0` AND `last_referenced_at` older than 30 days: multiply confidence by 0.97/day (~2-month half-life). Reinforcement already bumps confidence. Below 0.3 β auto-supersede with reason `decayed, no references`. Reversible (tune half-life), non-destructive (still searchable with status filter).
+
+### 7E β Memory detail page `/wiki/memories/{id}`
+
+Provenance chain: source_chunk β interaction β graduated_to_entity. Audit trail (Phase 4 has the data). Related memories (same project + tag + semantic neighbors). Decay trajectory plot (if 7D ships). Link target from every memory surfaced anywhere in the wiki.
+
+### 7F β Cross-project domain view `/wiki/domains/{tag}`
+
+One page per `domain_tag` showing all memories + graduated entities with that tag, grouped by project. "Optics across p04+p05+p06" becomes a real navigable page. Answers the long-standing question the tag system was meant to enable.
+
+### 7G β Re-extraction on prompt upgrade
+
+`batch_llm_extract_live.py --force-reextract --since DATE`. Dedupe key: `(interaction_id, extractor_version)` β same run on same interaction doesn't double-create. Triggered manually when `LLM_EXTRACTOR_VERSION` bumps. Not automatic (destructive).
+
+### 7H β Vector store hygiene
+
+Nightly: scan `source_chunks` and `memory_embeddings` (added in 7A V2) for `status=superseded|invalid`. Delete matching vectors from Chroma. Fail-open β the retrieval harness catches any real regression.
+
+## Verification & ship order
+
+1. **7A** β ship + observe 1 week β validate merge proposals are high-signal, rejection rate acceptable
+2. **7D** β decay is low-risk + high-compounding value; ship second
+3. **7C** β clean up tag fragmentation before 7F depends on canonical tags
+4. **7E** + **7F** β UX surfaces; ship together once data is clean
+5. **7B** β contradictions flow (pairs harder than duplicates to classify; wait for 7A data to tune threshold)
+6. **7G** β on-demand; no ship until we actually bump the extractor prompt
+7. **7H** β housekeeping; after 7A + 7B + 7D have generated enough `superseded` rows to matter
+
+## Scope NOT in Phase 7
+
+- Graduated memories (entity-descended) are **frozen** β exempt from dedup/decay. Entity consolidation is a separate Phase (8+).
+- Auto-merging without human approval (always human-in-the-loop in V1).
+- Summarization / compression β a different problem (reducing the number of chunks per memory, not the number of memories).
+- Forgetting policies β there's no user-facing "delete this" flow in Phase 7. Supersede + filter covers the need.
diff --git a/scripts/memory_dedup.py b/scripts/memory_dedup.py
new file mode 100644
index 0000000..1ce8b79
--- /dev/null
+++ b/scripts/memory_dedup.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+"""Phase 7A β semantic memory dedup detector.
+
+Finds clusters of near-duplicate active memories and writes merge-
+candidate proposals for human review in the triage UI.
+
+Algorithm:
+ 1. Fetch active memories via HTTP
+ 2. Group by (project, memory_type) β cross-bucket merges are deferred
+ to Phase 7B contradiction flow
+ 3. Within each group, embed contents via atocore.retrieval.embeddings
+ 4. Greedy transitive cluster at similarity >= threshold
+ 5. For each cluster of size >= 2, ask claude-p to draft unified content
+ 6. POST the proposal to /admin/memory/merge-candidates/create (server-
+ side dedupes by the sorted memory-id set, so re-runs don't double-
+ create)
+
+Host-side because claude CLI lives on Dalidou, not the container. Reuses
+the same PYTHONPATH=src pattern as scripts/graduate_memories.py for
+atocore imports (embeddings, similarity, prompt module).
+
+Usage:
+ python3 scripts/memory_dedup.py --base-url http://127.0.0.1:8100 \\
+ --similarity-threshold 0.88 --max-batch 50
+
+Threshold conventions (see Phase 7 doc):
+ 0.88 interactive / default β balanced precision/recall
+ 0.90 nightly cron β tight, only near-duplicates
+ 0.85 weekly cron β deeper cleanup
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import urllib.error
+import urllib.request
+from collections import defaultdict
+from typing import Any
+
+# Make src/ importable β same pattern as graduate_memories.py
+_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+_SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src"))
+if _SRC_DIR not in sys.path:
+ sys.path.insert(0, _SRC_DIR)
+
+from atocore.memory._dedup_prompt import ( # noqa: E402
+ DEDUP_PROMPT_VERSION,
+ SYSTEM_PROMPT,
+ build_user_message,
+ normalize_merge_verdict,
+ parse_merge_verdict,
+)
+from atocore.memory.similarity import cluster_by_threshold # noqa: E402
+
+DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100")
+DEFAULT_MODEL = os.environ.get("ATOCORE_DEDUP_MODEL", "sonnet")
+DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_DEDUP_TIMEOUT_S", "60"))
+
+_sandbox_cwd = None
+
+
+def get_sandbox_cwd() -> str:
+ global _sandbox_cwd
+ if _sandbox_cwd is None:
+ _sandbox_cwd = tempfile.mkdtemp(prefix="ato-dedup-")
+ return _sandbox_cwd
+
+
+def api_get(base_url: str, path: str) -> dict:
+ req = urllib.request.Request(f"{base_url}{path}")
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return json.loads(resp.read().decode("utf-8"))
+
+
+def api_post(base_url: str, path: str, body: dict | None = None) -> dict:
+ data = json.dumps(body or {}).encode("utf-8")
+ req = urllib.request.Request(
+ f"{base_url}{path}", method="POST",
+ headers={"Content-Type": "application/json"}, data=data,
+ )
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return json.loads(resp.read().decode("utf-8"))
+
+
+def call_claude(system_prompt: str, user_message: str, model: str, timeout_s: float) -> tuple[str | None, str | None]:
+ """Shared CLI caller with retry + stderr capture (mirrors auto_triage)."""
+ if not shutil.which("claude"):
+ return None, "claude CLI not available"
+ args = [
+ "claude", "-p",
+ "--model", model,
+ "--append-system-prompt", system_prompt,
+ "--disable-slash-commands",
+ user_message,
+ ]
+ last_error = ""
+ for attempt in range(3):
+ if attempt > 0:
+ time.sleep(2 ** attempt)
+ try:
+ completed = subprocess.run(
+ args, capture_output=True, text=True,
+ timeout=timeout_s, cwd=get_sandbox_cwd(),
+ encoding="utf-8", errors="replace",
+ )
+ except subprocess.TimeoutExpired:
+ last_error = f"{model} timed out"
+ continue
+ except Exception as exc:
+ last_error = f"subprocess error: {exc}"
+ continue
+ if completed.returncode == 0:
+ return (completed.stdout or "").strip(), None
+ stderr = (completed.stderr or "").strip()[:200]
+ last_error = f"{model} exit {completed.returncode}: {stderr}" if stderr else f"{model} exit {completed.returncode}"
+ return None, last_error
+
+
+def fetch_active_memories(base_url: str, project: str | None) -> list[dict]:
+ # The /memory endpoint with active_only=true returns active memories.
+ # Graduated memories are exempt from dedup β they're frozen pointers
+ # to entities. Filter them out on the client side.
+ params = "active_only=true&limit=2000"
+ if project:
+ params += f"&project={urllib.request.quote(project)}"
+ try:
+ result = api_get(base_url, f"/memory?{params}")
+ except Exception as e:
+ print(f"ERROR: could not fetch memories: {e}", file=sys.stderr)
+ return []
+ mems = result.get("memories", [])
+ return [m for m in mems if (m.get("status") or "active") == "active"]
+
+
+def group_memories(mems: list[dict]) -> dict[tuple[str, str], list[dict]]:
+ """Bucket by (project, memory_type). Empty project is its own bucket."""
+ buckets: dict[tuple[str, str], list[dict]] = defaultdict(list)
+ for m in mems:
+ key = ((m.get("project") or "").strip().lower(), (m.get("memory_type") or "").strip().lower())
+ buckets[key].append(m)
+ return buckets
+
+
+def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, Any] | None:
+ user_msg = build_user_message(sources)
+ raw, err = call_claude(SYSTEM_PROMPT, user_msg, model, timeout_s)
+ if err:
+ print(f" WARN: claude call failed: {err}", file=sys.stderr)
+ return None
+ parsed = parse_merge_verdict(raw or "")
+ if parsed is None:
+ print(f" WARN: could not parse verdict: {(raw or '')[:200]}", file=sys.stderr)
+ return None
+ return normalize_merge_verdict(parsed)
+
+
+def submit_candidate(
+ base_url: str,
+ memory_ids: list[str],
+ similarity: float,
+ verdict: dict[str, Any],
+ dry_run: bool,
+) -> str | None:
+ body = {
+ "memory_ids": memory_ids,
+ "similarity": similarity,
+ "proposed_content": verdict["content"],
+ "proposed_memory_type": verdict["memory_type"],
+ "proposed_project": verdict["project"],
+ "proposed_tags": verdict["domain_tags"],
+ "proposed_confidence": verdict["confidence"],
+ "reason": verdict["reason"],
+ }
+ if dry_run:
+ print(f" [dry-run] would POST: {json.dumps(body)[:200]}...")
+ return "dry-run"
+ try:
+ result = api_post(base_url, "/admin/memory/merge-candidates/create", body)
+ return result.get("candidate_id")
+ except urllib.error.HTTPError as e:
+ print(f" ERROR: submit failed: {e.code} {e.read().decode()[:200]}", file=sys.stderr)
+ return None
+ except Exception as e:
+ print(f" ERROR: submit failed: {e}", file=sys.stderr)
+ return None
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector")
+ parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
+ parser.add_argument("--project", default="", help="Only scan this project (empty = all)")
+ parser.add_argument("--similarity-threshold", type=float, default=0.88)
+ parser.add_argument("--max-batch", type=int, default=50,
+ help="Max clusters to propose per run")
+ parser.add_argument("--model", default=DEFAULT_MODEL)
+ parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S)
+ parser.add_argument("--dry-run", action="store_true")
+ args = parser.parse_args()
+
+ base = args.base_url.rstrip("/")
+
+ print(f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | model={args.model}")
+ mems = fetch_active_memories(base, args.project or None)
+ print(f"fetched {len(mems)} active memories")
+ if not mems:
+ return
+
+ buckets = group_memories(mems)
+ print(f"grouped into {len(buckets)} (project, memory_type) buckets")
+
+ clusters_found = 0
+ candidates_created = 0
+ skipped_existing = 0
+ llm_rejections = 0
+
+ for (proj, mtype), group in sorted(buckets.items()):
+ if len(group) < 2:
+ continue
+ if candidates_created >= args.max_batch:
+ print(f"reached max-batch={args.max_batch}, stopping")
+ break
+
+ texts = [(m.get("content") or "") for m in group]
+ clusters = cluster_by_threshold(texts, args.similarity_threshold)
+ # Keep only non-trivial clusters
+ clusters = [c for c in clusters if len(c) >= 2]
+ if not clusters:
+ continue
+
+ print(f"\n[{proj or '(global)'}/{mtype}] {len(group)} mems β {len(clusters)} cluster(s)")
+ for cluster in clusters:
+ if candidates_created >= args.max_batch:
+ break
+ clusters_found += 1
+ sources = [group[i] for i in cluster]
+ ids = [s["id"] for s in sources]
+ # Approximate cluster similarity = min pairwise within cluster.
+ # For reporting, just use threshold (we know all pairs >= threshold
+ # transitively; min may be lower). Keep it simple.
+ sim = args.similarity_threshold
+ print(f" cluster of {len(cluster)}: {[s['id'][:8] for s in sources]}")
+
+ verdict = draft_merge(sources, args.model, args.timeout_s)
+ if verdict is None:
+ continue
+ if verdict["action"] == "reject":
+ llm_rejections += 1
+ print(f" LLM rejected: {verdict['reason'][:100]}")
+ continue
+
+ cid = submit_candidate(base, ids, sim, verdict, args.dry_run)
+ if cid == "dry-run":
+ candidates_created += 1
+ elif cid:
+ candidates_created += 1
+ print(f" β candidate {cid[:8]}")
+ else:
+ skipped_existing += 1
+
+ time.sleep(0.3) # be kind to claude CLI
+
+ print(
+ f"\nsummary: clusters_found={clusters_found} "
+ f"candidates_created={candidates_created} "
+ f"llm_rejections={llm_rejections} "
+ f"skipped_existing={skipped_existing}"
+ )
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py
index f3a91d2..83769dd 100644
--- a/src/atocore/api/routes.py
+++ b/src/atocore/api/routes.py
@@ -1530,6 +1530,169 @@ def api_extend_reinforced() -> dict:
return {"extended_count": len(extended), "extensions": extended}
+# --- Phase 7A: memory dedup / merge-candidate lifecycle ---
+
+
+class MergeCandidateCreateBody(BaseModel):
+ memory_ids: list[str]
+ similarity: float = 0.0
+ proposed_content: str
+ proposed_memory_type: str = "knowledge"
+ proposed_project: str = ""
+ proposed_tags: list[str] = []
+ proposed_confidence: float = 0.6
+ reason: str = ""
+
+
+class MergeCandidateApproveBody(BaseModel):
+ actor: str = "human-triage"
+ content: str | None = None
+ domain_tags: list[str] | None = None
+
+
+class MergeCandidateRejectBody(BaseModel):
+ actor: str = "human-triage"
+ note: str = ""
+
+
+class DedupScanRequestBody(BaseModel):
+ project: str = ""
+ similarity_threshold: float = 0.88
+ max_batch: int = 50
+
+
+@router.get("/admin/memory/merge-candidates")
+def api_list_merge_candidates(status: str = "pending", limit: int = 100) -> dict:
+ """Phase 7A: list merge-candidate proposals for triage UI."""
+ from atocore.memory.service import get_merge_candidates
+ cands = get_merge_candidates(status=status, limit=limit)
+ return {"candidates": cands, "count": len(cands)}
+
+
+@router.post("/admin/memory/merge-candidates/create")
+def api_create_merge_candidate(body: MergeCandidateCreateBody) -> dict:
+ """Phase 7A: host-side dedup detector submits a proposal here.
+
+ Server-side idempotency: if a pending candidate already exists for
+ the same sorted memory_id set, returns the existing id.
+ """
+ from atocore.memory.service import create_merge_candidate
+ try:
+ cid = create_merge_candidate(
+ memory_ids=body.memory_ids,
+ similarity=body.similarity,
+ proposed_content=body.proposed_content,
+ proposed_memory_type=body.proposed_memory_type,
+ proposed_project=body.proposed_project,
+ proposed_tags=body.proposed_tags,
+ proposed_confidence=body.proposed_confidence,
+ reason=body.reason,
+ )
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ if cid is None:
+ return {"candidate_id": None, "duplicate": True}
+ return {"candidate_id": cid, "duplicate": False}
+
+
+@router.post("/admin/memory/merge-candidates/{candidate_id}/approve")
+def api_approve_merge_candidate(candidate_id: str, body: MergeCandidateApproveBody) -> dict:
+ """Phase 7A: execute an approved merge. Sources β superseded; new
+ merged memory created. UI can pass content/tag edits via body."""
+ from atocore.memory.service import merge_memories
+ new_id = merge_memories(
+ candidate_id=candidate_id,
+ actor=body.actor,
+ override_content=body.content,
+ override_tags=body.domain_tags,
+ )
+ if new_id is None:
+ raise HTTPException(
+ status_code=409,
+ detail="Merge could not execute (candidate not pending, or source memory tampered)",
+ )
+ return {"status": "approved", "candidate_id": candidate_id, "result_memory_id": new_id}
+
+
+@router.post("/admin/memory/merge-candidates/{candidate_id}/reject")
+def api_reject_merge_candidate(candidate_id: str, body: MergeCandidateRejectBody) -> dict:
+ """Phase 7A: dismiss a merge candidate. Sources stay untouched."""
+ from atocore.memory.service import reject_merge_candidate
+ ok = reject_merge_candidate(candidate_id, actor=body.actor, note=body.note)
+ if not ok:
+ raise HTTPException(status_code=404, detail="Candidate not found or already resolved")
+ return {"status": "rejected", "candidate_id": candidate_id}
+
+
+@router.post("/admin/memory/dedup-scan")
+def api_request_dedup_scan(body: DedupScanRequestBody) -> dict:
+ """Phase 7A: request a host-side dedup scan.
+
+ Writes a flag in project_state with project + threshold + max_batch.
+ A host cron watcher picks it up within ~2 min and runs
+ scripts/memory_dedup.py. Mirrors /admin/graduation/request.
+ """
+ import json as _json
+ from datetime import datetime as _dt, timezone as _tz
+ from atocore.context.project_state import set_state
+
+ now = _dt.now(_tz.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
+ payload = _json.dumps({
+ "project": (body.project or "").strip(),
+ "similarity_threshold": max(0.5, min(0.99, body.similarity_threshold)),
+ "max_batch": max(1, min(body.max_batch, 200)),
+ "requested_at": now,
+ })
+ set_state(
+ project_name="atocore",
+ category="config",
+ key="dedup_requested_at",
+ value=payload,
+ source="admin ui",
+ )
+ return {
+ "requested_at": now,
+ "project": body.project,
+ "similarity_threshold": body.similarity_threshold,
+ "max_batch": body.max_batch,
+ "note": "Host watcher picks up within ~2 min. Poll /admin/memory/dedup-status for progress.",
+ }
+
+
+@router.get("/admin/memory/dedup-status")
+def api_dedup_status() -> dict:
+ """Phase 7A: state of the dedup scan pipeline (UI polling)."""
+ import json as _json
+ from atocore.context.project_state import get_state
+ out = {
+ "requested": None,
+ "last_started_at": None,
+ "last_finished_at": None,
+ "last_result": None,
+ "is_running": False,
+ }
+ try:
+ for e in get_state("atocore"):
+ if e.category not in ("config", "status"):
+ continue
+ if e.key == "dedup_requested_at":
+ try:
+ out["requested"] = _json.loads(e.value)
+ except Exception:
+ out["requested"] = {"raw": e.value}
+ elif e.key == "dedup_last_started_at":
+ out["last_started_at"] = e.value
+ elif e.key == "dedup_last_finished_at":
+ out["last_finished_at"] = e.value
+ elif e.key == "dedup_last_result":
+ out["last_result"] = e.value
+ elif e.key == "dedup_running":
+ out["is_running"] = (e.value == "1")
+ except Exception:
+ pass
+ return out
+
+
@router.get("/admin/graduation/stats")
def api_graduation_stats() -> dict:
"""Phase 5F graduation stats for dashboard."""
diff --git a/src/atocore/engineering/triage_ui.py b/src/atocore/engineering/triage_ui.py
index 696e329..68cadad 100644
--- a/src/atocore/engineering/triage_ui.py
+++ b/src/atocore/engineering/triage_ui.py
@@ -377,6 +377,177 @@ _ENTITY_TRIAGE_CSS = """
"""
+# ---------------------------------------------------------------------
+# Phase 7A β Merge candidates (semantic dedup)
+# ---------------------------------------------------------------------
+
+_MERGE_TRIAGE_CSS = """
+
+"""
+
+
+def _render_merge_card(cand: dict) -> str:
+ import json as _json
+ cid = _escape(cand.get("id", ""))
+ sim = cand.get("similarity") or 0.0
+ sources = cand.get("sources") or []
+ proposed_content = cand.get("proposed_content") or ""
+ proposed_tags = cand.get("proposed_tags") or []
+ proposed_project = cand.get("proposed_project") or ""
+ reason = cand.get("reason") or ""
+
+ src_html = "".join(
+ f"""
+
+
+ {_escape(s.get('id','')[:8])} Β· [{_escape(s.get('memory_type',''))}]
+ Β· {_escape(s.get('project','') or '(global)')}
+ Β· conf {float(s.get('confidence',0)):.2f}
+ Β· refs {int(s.get('reference_count',0))}
+
+
{_escape((s.get('content') or '')[:300])}
+
+ """
+ for s in sources
+ )
+ tags_str = ", ".join(proposed_tags)
+ return f"""
+
+
+ [merge Β· {len(sources)} sources]
+ {_escape(proposed_project or '(global)')}
+ sim β₯ {sim:.2f}
+
+
{src_html}
+
β merged into β
+
+
+
+ Tags:
+
+
+
+ {f'
π‘ {_escape(reason)}
' if reason else ''}
+
+
+ β
Approve Merge
+ β Keep Separate
+
+
+
+"""
+
+
+_MERGE_TRIAGE_SCRIPT = """
+
+"""
+
+
+def _render_dedup_bar() -> str:
+ return """
+
+
+ π Scan for duplicates
+
+
+ Threshold:
+
+
+
+ Finds semantically near-duplicate active memories and proposes LLM-drafted merges for review. Source memories become superseded on approve; nothing is deleted.
+
+
+"""
+
+
def _render_graduation_bar() -> str:
"""The 'Graduate memories β entity candidates' control bar."""
from atocore.projects.registry import load_project_registry
@@ -478,26 +649,51 @@ def render_triage_page(limit: int = 100) -> str:
except Exception as e:
entity_candidates = []
- total = len(mem_candidates) + len(entity_candidates)
+ try:
+ from atocore.memory.service import get_merge_candidates
+ merge_candidates = get_merge_candidates(status="pending", limit=limit)
+ except Exception:
+ merge_candidates = []
+
+ total = len(mem_candidates) + len(entity_candidates) + len(merge_candidates)
graduation_bar = _render_graduation_bar()
+ dedup_bar = _render_dedup_bar()
if total == 0:
- body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + f"""
+ body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + _MERGE_TRIAGE_CSS + f"""
{graduation_bar}
+ {dedup_bar}
π No candidates to review.
The auto-triage pipeline keeps this queue empty unless something needs your judgment.
-
Use the π Graduate memories button above to propose new entity candidates from existing memories.
+
Use π Graduate memories to propose entity candidates, or π Scan for duplicates to find near-duplicate memories to merge.
- """ + _GRADUATION_SCRIPT
+ """ + _GRADUATION_SCRIPT + _MERGE_TRIAGE_SCRIPT
return render_html("Triage β AtoCore", body, breadcrumbs=[("Wiki", "/wiki"), ("Triage", "")])
# Memory cards
mem_cards = "".join(_render_candidate_card(c) for c in mem_candidates)
+ # Merge cards (Phase 7A)
+ merge_cards_html = ""
+ if merge_candidates:
+ merge_cards = "".join(_render_merge_card(c) for c in merge_candidates)
+ merge_cards_html = f"""
+
+
π Merge Candidates ({len(merge_candidates)})
+
+ Semantically near-duplicate active memories. Approving merges the sources
+ into the proposed unified memory; sources become superseded
+ (not deleted β still queryable). You can edit the draft content and tags
+ before approving.
+
+
+ {merge_cards}
+ """
+
# Entity cards
ent_cards_html = ""
if entity_candidates:
@@ -513,11 +709,12 @@ def render_triage_page(limit: int = 100) -> str:
{ent_cards}
"""
- body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + f"""
+ body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + _MERGE_TRIAGE_CSS + f"""
@@ -536,10 +733,12 @@ def render_triage_page(limit: int = 100) -> str:
{graduation_bar}
+ {dedup_bar}
π Memory Candidates ({len(mem_candidates)})
{mem_cards}
+ {merge_cards_html}
{ent_cards_html}
- """ + _TRIAGE_SCRIPT + _ENTITY_TRIAGE_SCRIPT + _GRADUATION_SCRIPT
+ """ + _TRIAGE_SCRIPT + _ENTITY_TRIAGE_SCRIPT + _GRADUATION_SCRIPT + _MERGE_TRIAGE_SCRIPT
return render_html(
"Triage β AtoCore",
diff --git a/src/atocore/memory/_dedup_prompt.py b/src/atocore/memory/_dedup_prompt.py
new file mode 100644
index 0000000..95c5777
--- /dev/null
+++ b/src/atocore/memory/_dedup_prompt.py
@@ -0,0 +1,156 @@
+"""Shared LLM prompt + parser for memory dedup (Phase 7A).
+
+Stdlib-only β must be importable from both the in-container service
+layer (when a user clicks "scan for duplicates" in the UI) and the
+host-side batch script (``scripts/memory_dedup.py``), which runs on
+Dalidou where the container's Python deps are not available.
+
+The prompt instructs the model to draft a UNIFIED memory that
+preserves every specific detail from the sources. We never want a
+merge to lose information β if two memories disagree on a number, the
+merged content should surface both with context.
+"""
+
+from __future__ import annotations
+
+import json
+from typing import Any
+
+DEDUP_PROMPT_VERSION = "dedup-0.1.0"
+MAX_CONTENT_CHARS = 1000
+MAX_SOURCES = 8 # cluster size cap β bigger clusters are suspicious
+
+SYSTEM_PROMPT = """You consolidate near-duplicate memories for AtoCore, a personal context engine.
+
+Given 2-8 memories that a semantic-similarity scan flagged as likely duplicates, draft a UNIFIED replacement that preserves every specific detail from every source.
+
+CORE PRINCIPLE: information never gets lost. If the sources disagree on a number, date, vendor, or spec, surface BOTH with attribution (e.g., "quoted at $3.2k on 2026-03-01, revised to $3.8k on 2026-04-10"). If one source is more specific than another, keep the specificity. If they say the same thing differently, pick the clearer wording.
+
+YOU MUST:
+- Produce content under 500 characters that reads as a single coherent statement
+- Keep all project/vendor/person/part names that appear in any source
+- Keep all numbers, dates, and identifiers
+- Keep the strongest claim wording ("ratified", "decided", "committed") if any source has it
+- Propose domain_tags as a UNION of the sources' tags (lowercase, deduped, cap 6)
+- Return valid_until = latest non-null valid_until across sources, or null if any source has null (permanent beats transient)
+
+REFUSE TO MERGE (return action="reject") if:
+- The memories are actually about DIFFERENT subjects that just share vocabulary (e.g., "p04 mirror" and "p05 mirror" β same project bucket means same project, but different components)
+- One memory CONTRADICTS another and you cannot reconcile them β flag for contradiction review instead
+- The sources span different time snapshots of a changing state that should stay as a timeline, not be collapsed
+
+OUTPUT β raw JSON, no prose, no markdown fences:
+{
+ "action": "merge" | "reject",
+ "content": "the unified memory content",
+ "memory_type": "knowledge|project|preference|adaptation|episodic|identity",
+ "project": "project-slug or empty",
+ "domain_tags": ["tag1", "tag2"],
+ "confidence": 0.5,
+ "reason": "one sentence explaining the merge (or the rejection)"
+}
+
+On action=reject, still fill content with a short explanation and set confidence=0."""
+
+
+def build_user_message(sources: list[dict[str, Any]]) -> str:
+ """Format N source memories for the model to consolidate.
+
+ Each source dict should carry id, content, project, memory_type,
+ domain_tags, confidence, valid_until, reference_count.
+ """
+ lines = [f"You have {len(sources)} source memories in the same (project, memory_type) bucket:\n"]
+ for i, src in enumerate(sources[:MAX_SOURCES], start=1):
+ tags = src.get("domain_tags") or []
+ if isinstance(tags, str):
+ try:
+ tags = json.loads(tags)
+ except Exception:
+ tags = []
+ lines.append(
+ f"--- Source {i} (id={src.get('id','?')[:8]}, "
+ f"refs={src.get('reference_count',0)}, "
+ f"conf={src.get('confidence',0):.2f}, "
+ f"valid_until={src.get('valid_until') or 'permanent'}) ---"
+ )
+ lines.append(f"project: {src.get('project','')}")
+ lines.append(f"type: {src.get('memory_type','')}")
+ lines.append(f"tags: {tags}")
+ lines.append(f"content: {(src.get('content') or '')[:MAX_CONTENT_CHARS]}")
+ lines.append("")
+ lines.append("Return the JSON object now.")
+ return "\n".join(lines)
+
+
+def parse_merge_verdict(raw_output: str) -> dict[str, Any] | None:
+ """Strip markdown fences / leading prose and return the parsed JSON
+ object. Returns None on parse failure."""
+ text = (raw_output or "").strip()
+ if text.startswith("```"):
+ text = text.strip("`")
+ nl = text.find("\n")
+ if nl >= 0:
+ text = text[nl + 1:]
+ if text.endswith("```"):
+ text = text[:-3]
+ text = text.strip()
+
+ if not text.lstrip().startswith("{"):
+ start = text.find("{")
+ end = text.rfind("}")
+ if start >= 0 and end > start:
+ text = text[start:end + 1]
+
+ try:
+ parsed = json.loads(text)
+ except json.JSONDecodeError:
+ return None
+ if not isinstance(parsed, dict):
+ return None
+ return parsed
+
+
+def normalize_merge_verdict(verdict: dict[str, Any]) -> dict[str, Any] | None:
+ """Validate + normalize a raw merge verdict. Returns None if the
+ verdict is unusable (no content, unknown action)."""
+ action = str(verdict.get("action") or "").strip().lower()
+ if action not in ("merge", "reject"):
+ return None
+
+ content = str(verdict.get("content") or "").strip()
+ if not content:
+ return None
+
+ memory_type = str(verdict.get("memory_type") or "knowledge").strip().lower()
+ project = str(verdict.get("project") or "").strip()
+
+ raw_tags = verdict.get("domain_tags") or []
+ if isinstance(raw_tags, str):
+ raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()]
+ if not isinstance(raw_tags, list):
+ raw_tags = []
+ tags: list[str] = []
+ for t in raw_tags[:6]:
+ if not isinstance(t, str):
+ continue
+ tt = t.strip().lower()
+ if tt and tt not in tags:
+ tags.append(tt)
+
+ try:
+ confidence = float(verdict.get("confidence", 0.5))
+ except (TypeError, ValueError):
+ confidence = 0.5
+ confidence = max(0.0, min(1.0, confidence))
+
+ reason = str(verdict.get("reason") or "").strip()[:500]
+
+ return {
+ "action": action,
+ "content": content[:1000],
+ "memory_type": memory_type,
+ "project": project,
+ "domain_tags": tags,
+ "confidence": confidence,
+ "reason": reason,
+ }
diff --git a/src/atocore/memory/service.py b/src/atocore/memory/service.py
index d2bec89..ff841c7 100644
--- a/src/atocore/memory/service.py
+++ b/src/atocore/memory/service.py
@@ -925,3 +925,327 @@ def _row_to_memory(row) -> Memory:
def _validate_confidence(confidence: float) -> None:
if not 0.0 <= confidence <= 1.0:
raise ValueError("Confidence must be between 0.0 and 1.0")
+
+
+# ---------------------------------------------------------------------
+# Phase 7A β Memory Consolidation: merge-candidate lifecycle
+# ---------------------------------------------------------------------
+#
+# The detector (scripts/memory_dedup.py) writes proposals into
+# memory_merge_candidates. The triage UI lists pending rows, a human
+# reviews, and on approve we execute the merge here β never at detect
+# time. This keeps the audit trail clean: every mutation is a human
+# decision.
+
+
+def create_merge_candidate(
+ memory_ids: list[str],
+ similarity: float,
+ proposed_content: str,
+ proposed_memory_type: str,
+ proposed_project: str,
+ proposed_tags: list[str] | None = None,
+ proposed_confidence: float = 0.6,
+ reason: str = "",
+) -> str | None:
+ """Insert a merge-candidate row. Returns the new row id, or None if
+ a pending candidate already covers this exact set of memory ids
+ (idempotent scan β re-running the detector doesn't double-create)."""
+ import json as _json
+
+ if not memory_ids or len(memory_ids) < 2:
+ raise ValueError("merge candidate requires at least 2 memory_ids")
+
+ memory_ids_sorted = sorted(set(memory_ids))
+ memory_ids_json = _json.dumps(memory_ids_sorted)
+ tags_json = _json.dumps(_normalize_tags(proposed_tags))
+ candidate_id = str(uuid.uuid4())
+
+ with get_connection() as conn:
+ # Idempotency: same sorted-id set already pending? skip.
+ existing = conn.execute(
+ "SELECT id FROM memory_merge_candidates "
+ "WHERE status = 'pending' AND memory_ids = ?",
+ (memory_ids_json,),
+ ).fetchone()
+ if existing:
+ return None
+
+ conn.execute(
+ "INSERT INTO memory_merge_candidates "
+ "(id, status, memory_ids, similarity, proposed_content, "
+ "proposed_memory_type, proposed_project, proposed_tags, "
+ "proposed_confidence, reason) "
+ "VALUES (?, 'pending', ?, ?, ?, ?, ?, ?, ?, ?)",
+ (
+ candidate_id, memory_ids_json, float(similarity or 0.0),
+ (proposed_content or "")[:2000],
+ (proposed_memory_type or "knowledge")[:50],
+ (proposed_project or "")[:100],
+ tags_json,
+ max(0.0, min(1.0, float(proposed_confidence))),
+ (reason or "")[:500],
+ ),
+ )
+ log.info(
+ "merge_candidate_created",
+ candidate_id=candidate_id,
+ memory_count=len(memory_ids_sorted),
+ similarity=round(similarity, 4),
+ )
+ return candidate_id
+
+
+def get_merge_candidates(status: str = "pending", limit: int = 100) -> list[dict]:
+ """List merge candidates with their source memories inlined."""
+ import json as _json
+
+ with get_connection() as conn:
+ rows = conn.execute(
+ "SELECT * FROM memory_merge_candidates "
+ "WHERE status = ? ORDER BY created_at DESC LIMIT ?",
+ (status, limit),
+ ).fetchall()
+
+ out = []
+ for r in rows:
+ try:
+ mem_ids = _json.loads(r["memory_ids"] or "[]")
+ except Exception:
+ mem_ids = []
+ try:
+ tags = _json.loads(r["proposed_tags"] or "[]")
+ except Exception:
+ tags = []
+
+ sources = []
+ for mid in mem_ids:
+ srow = conn.execute(
+ "SELECT id, memory_type, content, project, confidence, "
+ "status, reference_count, domain_tags, valid_until "
+ "FROM memories WHERE id = ?",
+ (mid,),
+ ).fetchone()
+ if srow:
+ try:
+ stags = _json.loads(srow["domain_tags"] or "[]")
+ except Exception:
+ stags = []
+ sources.append({
+ "id": srow["id"],
+ "memory_type": srow["memory_type"],
+ "content": srow["content"],
+ "project": srow["project"] or "",
+ "confidence": srow["confidence"],
+ "status": srow["status"],
+ "reference_count": int(srow["reference_count"] or 0),
+ "domain_tags": stags,
+ "valid_until": srow["valid_until"] or "",
+ })
+
+ out.append({
+ "id": r["id"],
+ "status": r["status"],
+ "memory_ids": mem_ids,
+ "similarity": r["similarity"],
+ "proposed_content": r["proposed_content"] or "",
+ "proposed_memory_type": r["proposed_memory_type"] or "knowledge",
+ "proposed_project": r["proposed_project"] or "",
+ "proposed_tags": tags,
+ "proposed_confidence": r["proposed_confidence"],
+ "reason": r["reason"] or "",
+ "created_at": r["created_at"],
+ "resolved_at": r["resolved_at"],
+ "resolved_by": r["resolved_by"],
+ "result_memory_id": r["result_memory_id"],
+ "sources": sources,
+ })
+ return out
+
+
+def reject_merge_candidate(candidate_id: str, actor: str = "human-triage", note: str = "") -> bool:
+ """Mark a merge candidate as rejected. Source memories stay untouched."""
+ now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
+ with get_connection() as conn:
+ result = conn.execute(
+ "UPDATE memory_merge_candidates "
+ "SET status = 'rejected', resolved_at = ?, resolved_by = ? "
+ "WHERE id = ? AND status = 'pending'",
+ (now_str, actor, candidate_id),
+ )
+ if result.rowcount == 0:
+ return False
+ log.info("merge_candidate_rejected", candidate_id=candidate_id, actor=actor, note=note[:100])
+ return True
+
+
+def merge_memories(
+ candidate_id: str,
+ actor: str = "human-triage",
+ override_content: str | None = None,
+ override_tags: list[str] | None = None,
+) -> str | None:
+ """Execute an approved merge candidate.
+
+ 1. Validate all source memories still status=active
+ 2. Create the new merged memory (status=active)
+ 3. Mark each source status=superseded with an audit row pointing at
+ the new merged id
+ 4. Mark the candidate status=approved, record result_memory_id
+ 5. Write a consolidated audit row on the new memory
+
+ Returns the new merged memory's id, or None if the candidate cannot
+ be executed (already resolved, source tampered, etc.).
+
+ ``override_content`` and ``override_tags`` let the UI pass the human's
+ edits before clicking approve.
+ """
+ import json as _json
+
+ now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
+
+ with get_connection() as conn:
+ row = conn.execute(
+ "SELECT * FROM memory_merge_candidates WHERE id = ?",
+ (candidate_id,),
+ ).fetchone()
+ if row is None or row["status"] != "pending":
+ log.warning("merge_candidate_not_pending", candidate_id=candidate_id)
+ return None
+
+ try:
+ mem_ids = _json.loads(row["memory_ids"] or "[]")
+ except Exception:
+ mem_ids = []
+ if not mem_ids or len(mem_ids) < 2:
+ log.warning("merge_candidate_invalid_memory_ids", candidate_id=candidate_id)
+ return None
+
+ # Snapshot sources + validate all active
+ source_rows = []
+ for mid in mem_ids:
+ srow = conn.execute(
+ "SELECT * FROM memories WHERE id = ?", (mid,)
+ ).fetchone()
+ if srow is None or srow["status"] != "active":
+ log.warning(
+ "merge_source_not_active",
+ candidate_id=candidate_id,
+ memory_id=mid,
+ actual_status=(srow["status"] if srow else "missing"),
+ )
+ return None
+ source_rows.append(srow)
+
+ # Build merged memory fields β prefer human overrides, then proposed
+ content = (override_content or row["proposed_content"] or "").strip()
+ if not content:
+ log.warning("merge_candidate_empty_content", candidate_id=candidate_id)
+ return None
+
+ merged_type = (row["proposed_memory_type"] or source_rows[0]["memory_type"]).lower()
+ if merged_type not in MEMORY_TYPES:
+ merged_type = source_rows[0]["memory_type"]
+
+ merged_project = row["proposed_project"] or source_rows[0]["project"] or ""
+ merged_project = resolve_project_name(merged_project)
+
+ # Tags: override wins, else proposed, else union of sources
+ if override_tags is not None:
+ merged_tags = _normalize_tags(override_tags)
+ else:
+ try:
+ proposed_tags = _json.loads(row["proposed_tags"] or "[]")
+ except Exception:
+ proposed_tags = []
+ if proposed_tags:
+ merged_tags = _normalize_tags(proposed_tags)
+ else:
+ union: list[str] = []
+ for srow in source_rows:
+ try:
+ stags = _json.loads(srow["domain_tags"] or "[]")
+ except Exception:
+ stags = []
+ for t in stags:
+ if isinstance(t, str) and t and t not in union:
+ union.append(t)
+ merged_tags = union
+
+ # confidence = max; reference_count = sum
+ merged_confidence = max(float(s["confidence"]) for s in source_rows)
+ total_refs = sum(int(s["reference_count"] or 0) for s in source_rows)
+
+ # valid_until: if any source is permanent (None/empty), merged is permanent.
+ # Otherwise take the latest (lexical compare on ISO dates works).
+ merged_vu: str | None = "" # placeholder
+ has_permanent = any(not (s["valid_until"] or "").strip() for s in source_rows)
+ if has_permanent:
+ merged_vu = None
+ else:
+ merged_vu = max((s["valid_until"] or "").strip() for s in source_rows) or None
+
+ new_id = str(uuid.uuid4())
+ tags_json = _json.dumps(merged_tags)
+
+ conn.execute(
+ "INSERT INTO memories (id, memory_type, content, project, "
+ "source_chunk_id, confidence, status, domain_tags, valid_until, "
+ "reference_count, last_referenced_at) "
+ "VALUES (?, ?, ?, ?, NULL, ?, 'active', ?, ?, ?, ?)",
+ (
+ new_id, merged_type, content[:2000], merged_project,
+ merged_confidence, tags_json, merged_vu, total_refs, now_str,
+ ),
+ )
+
+ # Mark sources superseded
+ for srow in source_rows:
+ conn.execute(
+ "UPDATE memories SET status = 'superseded', updated_at = ? "
+ "WHERE id = ?",
+ (now_str, srow["id"]),
+ )
+
+ # Mark candidate approved
+ conn.execute(
+ "UPDATE memory_merge_candidates SET status = 'approved', "
+ "resolved_at = ?, resolved_by = ?, result_memory_id = ? WHERE id = ?",
+ (now_str, actor, new_id, candidate_id),
+ )
+
+ # Audit rows (out of the transaction; fail-open via _audit_memory)
+ _audit_memory(
+ memory_id=new_id,
+ action="created_via_merge",
+ actor=actor,
+ after={
+ "memory_type": merged_type,
+ "content": content,
+ "project": merged_project,
+ "confidence": merged_confidence,
+ "domain_tags": merged_tags,
+ "reference_count": total_refs,
+ "merged_from": list(mem_ids),
+ "merge_candidate_id": candidate_id,
+ },
+ note=f"merged {len(mem_ids)} sources via candidate {candidate_id[:8]}",
+ )
+ for srow in source_rows:
+ _audit_memory(
+ memory_id=srow["id"],
+ action="superseded",
+ actor=actor,
+ before={"status": "active", "content": srow["content"]},
+ after={"status": "superseded", "superseded_by": new_id},
+ note=f"merged into {new_id}",
+ )
+
+ log.info(
+ "merge_executed",
+ candidate_id=candidate_id,
+ result_memory_id=new_id,
+ source_count=len(source_rows),
+ actor=actor,
+ )
+ return new_id
diff --git a/src/atocore/memory/similarity.py b/src/atocore/memory/similarity.py
new file mode 100644
index 0000000..378d5e7
--- /dev/null
+++ b/src/atocore/memory/similarity.py
@@ -0,0 +1,88 @@
+"""Phase 7A (Memory Consolidation): semantic similarity helpers.
+
+Thin wrapper over ``atocore.retrieval.embeddings`` that exposes
+pairwise + batch cosine similarity on normalized embeddings. Used by
+the dedup detector to cluster near-duplicate active memories.
+
+Embeddings from ``embed_texts()`` are already L2-normalized, so cosine
+similarity reduces to a dot product β no extra normalization needed.
+"""
+
+from __future__ import annotations
+
+from atocore.retrieval.embeddings import embed_texts
+
+
+def _dot(a: list[float], b: list[float]) -> float:
+ return sum(x * y for x, y in zip(a, b))
+
+
+def cosine(a: list[float], b: list[float]) -> float:
+ """Cosine similarity on already-normalized vectors. Clamped to [0,1]
+ (embeddings use paraphrase-multilingual-MiniLM which is unit-norm,
+ and we never want negative values leaking into thresholds)."""
+ return max(0.0, min(1.0, _dot(a, b)))
+
+
+def compute_memory_similarity(text_a: str, text_b: str) -> float:
+ """Return cosine similarity of two memory contents in [0,1].
+
+ Convenience helper for one-off checks + tests. For batch work (the
+ dedup detector), use ``embed_texts()`` directly and compute the
+ similarity matrix yourself to avoid re-embedding shared texts.
+ """
+ if not text_a or not text_b:
+ return 0.0
+ vecs = embed_texts([text_a, text_b])
+ return cosine(vecs[0], vecs[1])
+
+
+def similarity_matrix(texts: list[str]) -> list[list[float]]:
+ """NΓN cosine similarity matrix. Diagonal is 1.0, symmetric."""
+ if not texts:
+ return []
+ vecs = embed_texts(texts)
+ n = len(vecs)
+ matrix = [[0.0] * n for _ in range(n)]
+ for i in range(n):
+ matrix[i][i] = 1.0
+ for j in range(i + 1, n):
+ s = cosine(vecs[i], vecs[j])
+ matrix[i][j] = s
+ matrix[j][i] = s
+ return matrix
+
+
+def cluster_by_threshold(texts: list[str], threshold: float) -> list[list[int]]:
+ """Greedy transitive clustering: if sim(i,j) >= threshold, merge.
+
+ Returns a list of clusters, each a list of indices into ``texts``.
+ Singletons are included. Used by the dedup detector to collapse
+ A~B~C into one merge proposal rather than three pair proposals.
+ """
+ if not texts:
+ return []
+ matrix = similarity_matrix(texts)
+ n = len(texts)
+ parent = list(range(n))
+
+ def find(x: int) -> int:
+ while parent[x] != x:
+ parent[x] = parent[parent[x]]
+ x = parent[x]
+ return x
+
+ def union(x: int, y: int) -> None:
+ rx, ry = find(x), find(y)
+ if rx != ry:
+ parent[rx] = ry
+
+ for i in range(n):
+ for j in range(i + 1, n):
+ if matrix[i][j] >= threshold:
+ union(i, j)
+
+ groups: dict[int, list[int]] = {}
+ for i in range(n):
+ groups.setdefault(find(i), []).append(i)
+ return list(groups.values())
diff --git a/src/atocore/models/database.py b/src/atocore/models/database.py
index e65010a..b9605ec 100644
--- a/src/atocore/models/database.py
+++ b/src/atocore/models/database.py
@@ -251,6 +251,42 @@ def _apply_migrations(conn: sqlite3.Connection) -> None:
"CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at)"
)
+ # Phase 7A (Memory Consolidation β "sleep cycle"): merge candidates.
+ # When the dedup detector finds a cluster of semantically similar active
+ # memories within the same (project, memory_type) bucket, it drafts a
+ # unified content via LLM and writes a proposal here. The triage UI
+ # surfaces these for human approval. On approve, source memories become
+ # status=superseded and a new merged memory is created.
+ # memory_ids is a JSON array (length >= 2) of the source memory ids.
+ # proposed_* hold the LLM's draft; a human can edit before approve.
+ # result_memory_id is filled on approve with the new merged memory's id.
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS memory_merge_candidates (
+ id TEXT PRIMARY KEY,
+ status TEXT DEFAULT 'pending',
+ memory_ids TEXT NOT NULL,
+ similarity REAL,
+ proposed_content TEXT,
+ proposed_memory_type TEXT,
+ proposed_project TEXT,
+ proposed_tags TEXT DEFAULT '[]',
+ proposed_confidence REAL,
+ reason TEXT DEFAULT '',
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ resolved_at DATETIME,
+ resolved_by TEXT,
+ result_memory_id TEXT
+ )
+ """
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_mmc_status ON memory_merge_candidates(status)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_mmc_created_at ON memory_merge_candidates(created_at)"
+ )
+
def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
diff --git a/tests/test_memory_dedup.py b/tests/test_memory_dedup.py
new file mode 100644
index 0000000..345e543
--- /dev/null
+++ b/tests/test_memory_dedup.py
@@ -0,0 +1,380 @@
+"""Phase 7A β memory consolidation tests.
+
+Covers:
+ - similarity helpers (cosine bounds, matrix symmetry, clustering)
+ - _dedup_prompt parser / normalizer robustness
+ - create_merge_candidate idempotency
+ - get_merge_candidates inlines source memories
+ - merge_memories end-to-end happy path (sources β superseded,
+ new merged memory active, audit rows, result_memory_id)
+ - reject_merge_candidate leaves sources untouched
+"""
+
+from __future__ import annotations
+
+import pytest
+
+from atocore.memory._dedup_prompt import (
+ normalize_merge_verdict,
+ parse_merge_verdict,
+)
+from atocore.memory.service import (
+ create_memory,
+ create_merge_candidate,
+ get_memory_audit,
+ get_merge_candidates,
+ merge_memories,
+ reject_merge_candidate,
+)
+from atocore.memory.similarity import (
+ cluster_by_threshold,
+ cosine,
+ compute_memory_similarity,
+ similarity_matrix,
+)
+from atocore.models.database import get_connection, init_db
+
+
+# --- Similarity helpers ---
+
+
+def test_cosine_bounds():
+ assert cosine([1.0, 0.0], [1.0, 0.0]) == pytest.approx(1.0)
+ assert cosine([1.0, 0.0], [0.0, 1.0]) == pytest.approx(0.0)
+ # Negative dot product clamped to 0
+ assert cosine([1.0, 0.0], [-1.0, 0.0]) == 0.0
+
+
+def test_compute_memory_similarity_identical_high():
+ s = compute_memory_similarity("the sky is blue", "the sky is blue")
+ assert 0.99 <= s <= 1.0
+
+
+def test_compute_memory_similarity_unrelated_low():
+ s = compute_memory_similarity(
+ "APM integrates with NX via a Python bridge",
+ "the polisher firmware must use USB SSD not SD card",
+ )
+ assert 0.0 <= s < 0.7
+
+
+def test_similarity_matrix_symmetric():
+ texts = ["alpha beta gamma", "alpha beta gamma", "completely unrelated text"]
+ m = similarity_matrix(texts)
+ assert len(m) == 3 and all(len(r) == 3 for r in m)
+ for i in range(3):
+ assert m[i][i] == pytest.approx(1.0)
+ for i in range(3):
+ for j in range(3):
+ assert m[i][j] == pytest.approx(m[j][i])
+
+
+def test_cluster_by_threshold_transitive():
+ # Three near-paraphrases should land in one cluster
+ texts = [
+ "Antoine prefers OAuth over API keys",
+ "Antoine's preference is OAuth, not API keys",
+ "the polisher firmware uses USB SSD storage",
+ ]
+ clusters = cluster_by_threshold(texts, threshold=0.7)
+ # At least one cluster of size 2+ containing the paraphrases
+ big = [c for c in clusters if len(c) >= 2]
+ assert big, f"expected at least one multi-member cluster, got {clusters}"
+ assert 0 in big[0] and 1 in big[0]
+
+
+# --- Prompt parser robustness ---
+
+
+def test_parse_merge_verdict_strips_fences():
+ raw = "```json\n{\"action\":\"merge\",\"content\":\"x\"}\n```"
+ parsed = parse_merge_verdict(raw)
+ assert parsed == {"action": "merge", "content": "x"}
+
+
+def test_parse_merge_verdict_handles_prose_prefix():
+ raw = "Sure! Here's the result:\n{\"action\":\"reject\",\"content\":\"no\"}"
+ parsed = parse_merge_verdict(raw)
+ assert parsed is not None
+ assert parsed["action"] == "reject"
+
+
+def test_normalize_merge_verdict_fills_defaults():
+ v = normalize_merge_verdict({
+ "action": "merge",
+ "content": "unified text",
+ })
+ assert v is not None
+ assert v["memory_type"] == "knowledge"
+ assert v["project"] == ""
+ assert v["domain_tags"] == []
+ assert v["confidence"] == 0.5
+
+
+def test_normalize_merge_verdict_rejects_empty_content():
+ assert normalize_merge_verdict({"action": "merge", "content": ""}) is None
+
+
+def test_normalize_merge_verdict_rejects_unknown_action():
+ assert normalize_merge_verdict({"action": "?", "content": "x"}) is None
+
+
+# --- create_merge_candidate idempotency ---
+
+
+def test_create_merge_candidate_inserts_row(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "APM uses NX for DXF conversion")
+ m2 = create_memory("knowledge", "APM uses NX for DXF-to-STL")
+
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id],
+ similarity=0.92,
+ proposed_content="APM uses NX for DXFβSTL conversion",
+ proposed_memory_type="knowledge",
+ proposed_project="",
+ proposed_tags=["apm", "nx"],
+ proposed_confidence=0.6,
+ reason="near-paraphrase",
+ )
+ assert cid is not None
+
+ pending = get_merge_candidates(status="pending")
+ assert len(pending) == 1
+ assert pending[0]["id"] == cid
+ assert pending[0]["similarity"] == pytest.approx(0.92)
+ assert len(pending[0]["sources"]) == 2
+
+
+def test_create_merge_candidate_idempotent(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "Fact A")
+ m2 = create_memory("knowledge", "Fact A slightly reworded")
+
+ first = create_merge_candidate(
+ memory_ids=[m1.id, m2.id],
+ similarity=0.9,
+ proposed_content="merged",
+ proposed_memory_type="knowledge",
+ proposed_project="",
+ )
+ # Same id set, different order β dedupe skips
+ second = create_merge_candidate(
+ memory_ids=[m2.id, m1.id],
+ similarity=0.9,
+ proposed_content="merged (again)",
+ proposed_memory_type="knowledge",
+ proposed_project="",
+ )
+ assert first is not None
+ assert second is None
+
+
+def test_create_merge_candidate_requires_two_ids(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "lonely")
+ with pytest.raises(ValueError):
+ create_merge_candidate(
+ memory_ids=[m1.id],
+ similarity=1.0,
+ proposed_content="x",
+ proposed_memory_type="knowledge",
+ proposed_project="",
+ )
+
+
+# --- merge_memories end-to-end ---
+
+
+def test_merge_memories_happy_path(tmp_data_dir):
+ init_db()
+ m1 = create_memory(
+ "knowledge", "APM uses NX for DXF conversion",
+ project="apm", confidence=0.6, domain_tags=["apm", "nx"],
+ )
+ m2 = create_memory(
+ "knowledge", "APM does DXF to STL via NX bridge",
+ project="apm", confidence=0.8, domain_tags=["apm", "bridge"],
+ )
+ # Bump reference counts so sum is meaningful
+ with get_connection() as conn:
+ conn.execute("UPDATE memories SET reference_count = 3 WHERE id = ?", (m1.id,))
+ conn.execute("UPDATE memories SET reference_count = 5 WHERE id = ?", (m2.id,))
+
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id],
+ similarity=0.92,
+ proposed_content="APM uses NX bridge for DXFβSTL conversion",
+ proposed_memory_type="knowledge",
+ proposed_project="apm",
+ proposed_tags=["apm", "nx", "bridge"],
+ proposed_confidence=0.7,
+ reason="duplicates",
+ )
+ new_id = merge_memories(candidate_id=cid, actor="human-triage")
+ assert new_id is not None
+
+ # Sources superseded
+ with get_connection() as conn:
+ s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone()
+ s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone()
+ merged = conn.execute(
+ "SELECT content, status, confidence, reference_count, project "
+ "FROM memories WHERE id = ?", (new_id,)
+ ).fetchone()
+ cand = conn.execute(
+ "SELECT status, result_memory_id FROM memory_merge_candidates WHERE id = ?",
+ (cid,),
+ ).fetchone()
+ assert s1["status"] == "superseded"
+ assert s2["status"] == "superseded"
+ assert merged["status"] == "active"
+ assert merged["project"] == "apm"
+ # confidence = max of sources (0.8), not the proposed 0.7 (proposed is hint;
+ # merge_memories picks max of actual source confidences β verify).
+ assert merged["confidence"] == pytest.approx(0.8)
+ # reference_count = sum (3 + 5 = 8)
+ assert int(merged["reference_count"]) == 8
+ assert cand["status"] == "approved"
+ assert cand["result_memory_id"] == new_id
+
+
+def test_merge_memories_content_override(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "draft A", project="p05-interferometer")
+ m2 = create_memory("knowledge", "draft B", project="p05-interferometer")
+
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id],
+ similarity=0.9,
+ proposed_content="AI draft",
+ proposed_memory_type="knowledge",
+ proposed_project="p05-interferometer",
+ )
+ new_id = merge_memories(
+ candidate_id=cid,
+ actor="human-triage",
+ override_content="human-edited final text",
+ override_tags=["optics", "custom"],
+ )
+ assert new_id is not None
+ with get_connection() as conn:
+ row = conn.execute(
+ "SELECT content, domain_tags FROM memories WHERE id = ?", (new_id,)
+ ).fetchone()
+ assert row["content"] == "human-edited final text"
+ # domain_tags JSON should contain the override
+ assert "optics" in row["domain_tags"]
+ assert "custom" in row["domain_tags"]
+
+
+def test_merge_memories_writes_audit(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "alpha")
+ m2 = create_memory("knowledge", "alpha variant")
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id], similarity=0.9,
+ proposed_content="alpha merged",
+ proposed_memory_type="knowledge", proposed_project="",
+ )
+ new_id = merge_memories(candidate_id=cid)
+ assert new_id
+
+ audit_new = get_memory_audit(new_id)
+ actions_new = {a["action"] for a in audit_new}
+ assert "created_via_merge" in actions_new
+
+ audit_m1 = get_memory_audit(m1.id)
+ actions_m1 = {a["action"] for a in audit_m1}
+ assert "superseded" in actions_m1
+
+
+def test_merge_memories_aborts_if_source_not_active(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "one")
+ m2 = create_memory("knowledge", "two")
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id], similarity=0.9,
+ proposed_content="merged",
+ proposed_memory_type="knowledge", proposed_project="",
+ )
+ # Tamper: supersede one source before the merge runs
+ with get_connection() as conn:
+ conn.execute("UPDATE memories SET status = 'superseded' WHERE id = ?", (m1.id,))
+ result = merge_memories(candidate_id=cid)
+ assert result is None
+
+ # Candidate still pending
+ pending = get_merge_candidates(status="pending")
+ assert any(c["id"] == cid for c in pending)
+
+
+def test_merge_memories_rejects_already_resolved(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "x")
+ m2 = create_memory("knowledge", "y")
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id], similarity=0.9,
+ proposed_content="xy",
+ proposed_memory_type="knowledge", proposed_project="",
+ )
+ first = merge_memories(candidate_id=cid)
+ assert first is not None
+ # second call β already approved, should return None
+ second = merge_memories(candidate_id=cid)
+ assert second is None
+
+
+# --- reject_merge_candidate ---
+
+
+def test_reject_merge_candidate_leaves_sources_untouched(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "a")
+ m2 = create_memory("knowledge", "b")
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id], similarity=0.9,
+ proposed_content="a+b",
+ proposed_memory_type="knowledge", proposed_project="",
+ )
+ ok = reject_merge_candidate(cid, actor="human-triage", note="false positive")
+ assert ok
+
+ # Sources still active
+ with get_connection() as conn:
+ s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone()
+ s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone()
+ cand = conn.execute(
+ "SELECT status FROM memory_merge_candidates WHERE id = ?", (cid,)
+ ).fetchone()
+ assert s1["status"] == "active"
+ assert s2["status"] == "active"
+ assert cand["status"] == "rejected"
+
+
+def test_reject_merge_candidate_idempotent(tmp_data_dir):
+ init_db()
+ m1 = create_memory("knowledge", "p")
+ m2 = create_memory("knowledge", "q")
+ cid = create_merge_candidate(
+ memory_ids=[m1.id, m2.id], similarity=0.9,
+ proposed_content="pq",
+ proposed_memory_type="knowledge", proposed_project="",
+ )
+ assert reject_merge_candidate(cid) is True
+ # second reject β already rejected, returns False
+ assert reject_merge_candidate(cid) is False
+
+
+# --- Schema sanity ---
+
+
+def test_merge_candidates_table_exists(tmp_data_dir):
+ init_db()
+ with get_connection() as conn:
+ cols = [r["name"] for r in conn.execute("PRAGMA table_info(memory_merge_candidates)").fetchall()]
+ expected = {"id", "status", "memory_ids", "similarity", "proposed_content",
+ "proposed_memory_type", "proposed_project", "proposed_tags",
+ "proposed_confidence", "reason", "created_at", "resolved_at",
+ "resolved_by", "result_memory_id"}
+ assert expected.issubset(set(cols))