From 028d4c35947cf3807f0926e9139a0e03c3c8ac4d Mon Sep 17 00:00:00 2001 From: Anto01 Date: Sat, 18 Apr 2026 10:30:49 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=207A=20=E2=80=94=20semantic=20mem?= =?UTF-8?q?ory=20dedup=20("sleep=20cycle"=20V1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New table memory_merge_candidates + service functions to cluster near-duplicate active memories within (project, memory_type) buckets, draft a unified content via LLM, and merge on human approval. Source memories become superseded (never deleted); merged memory carries union of tags, max of confidence, sum of reference_count. - schema migration for memory_merge_candidates - atocore.memory.similarity: cosine + transitive clustering - atocore.memory._dedup_prompt: stdlib-only LLM prompt preserving every specific - service: merge_memories / create_merge_candidate / get_merge_candidates / reject_merge_candidate - scripts/memory_dedup.py: host-side detector (HTTP-only, idempotent) - 5 API endpoints under /admin/memory/merge-candidates* + /admin/memory/dedup-scan - triage UI: purple "πŸ”— Merge Candidates" section + "πŸ”— Scan for duplicates" bar - batch-extract.sh Step B3 (0.90 daily, 0.85 Sundays) - deploy/dalidou/dedup-watcher.sh for UI-triggered scans - 21 new tests (374 β†’ 395) - docs/PHASE-7-MEMORY-CONSOLIDATION.md covering 7A-7H roadmap Co-Authored-By: Claude Opus 4.7 (1M context) --- DEV-LEDGER.md | 6 +- deploy/dalidou/batch-extract.sh | 20 ++ deploy/dalidou/dedup-watcher.sh | 110 ++++++++ docs/PHASE-7-MEMORY-CONSOLIDATION.md | 96 +++++++ scripts/memory_dedup.py | 278 ++++++++++++++++++++ src/atocore/api/routes.py | 163 ++++++++++++ src/atocore/engineering/triage_ui.py | 211 ++++++++++++++- src/atocore/memory/_dedup_prompt.py | 156 +++++++++++ src/atocore/memory/service.py | 324 +++++++++++++++++++++++ src/atocore/memory/similarity.py | 88 +++++++ src/atocore/models/database.py | 36 +++ tests/test_memory_dedup.py | 380 +++++++++++++++++++++++++++ 12 files changed, 1860 insertions(+), 8 deletions(-) create mode 100644 deploy/dalidou/dedup-watcher.sh create mode 100644 docs/PHASE-7-MEMORY-CONSOLIDATION.md create mode 100644 scripts/memory_dedup.py create mode 100644 src/atocore/memory/_dedup_prompt.py create mode 100644 src/atocore/memory/similarity.py create mode 100644 tests/test_memory_dedup.py diff --git a/DEV-LEDGER.md b/DEV-LEDGER.md index 5cfbea9..1cad66c 100644 --- a/DEV-LEDGER.md +++ b/DEV-LEDGER.md @@ -7,9 +7,9 @@ ## Orientation - **live_sha** (Dalidou `/health` build_sha): `775960c` (verified 2026-04-16 via /health, build_time 2026-04-16T17:59:30Z) -- **last_updated**: 2026-04-16 by Claude ("Make It Actually Useful" sprint β€” observability + Phase 10) +- **last_updated**: 2026-04-18 by Claude (Phase 7A β€” Memory Consolidation "sleep cycle" V1 on branch, not yet deployed) - **main_tip**: `999788b` -- **test_count**: 303 (4 new Phase 10 tests) +- **test_count**: 395 (21 new Phase 7A dedup tests + accumulated Phase 5/6 tests since last ledger refresh) - **harness**: `17/18 PASS` on live Dalidou (p04-constraints expects "Zerodur" β€” retrieval content gap, not regression) - **vectors**: 33,253 - **active_memories**: 84 (31 project, 23 knowledge, 10 episodic, 8 adaptation, 7 preference, 5 identity) @@ -160,6 +160,8 @@ One branch `codex/extractor-eval-loop` for Day 1-5, a second `codex/retrieval-ha ## Session Log +- **2026-04-18 Claude** **Phase 7A β€” Memory Consolidation V1 ("sleep cycle") landed on branch.** New `docs/PHASE-7-MEMORY-CONSOLIDATION.md` covers all 8 subphases (7A dedup, 7B contradictions, 7C tag canon, 7D confidence decay, 7E memory detail, 7F domain view, 7F re-extract, 7H vector hygiene). 7A implementation: schema migration `memory_merge_candidates`, `atocore.memory.similarity` (cosine + transitive cluster), stdlib-only `atocore.memory._dedup_prompt` (llm drafts unified content preserving all specifics), `merge_memories()` + `create_merge_candidate()` + `get_merge_candidates()` + `reject_merge_candidate()` in service.py, host-side `scripts/memory_dedup.py` (HTTP + claude -p, idempotent via sorted-id set), 5 new endpoints under `/admin/memory/merge-candidates*` + `/admin/memory/dedup-scan` + `/admin/memory/dedup-status`, purple-themed "πŸ”— Merge Candidates" section in /admin/triage with editable draft + approve/reject buttons, "πŸ”— Scan for duplicates" control bar with threshold slider, nightly Step B3 in batch-extract.sh (0.90 daily, 0.85 Sundays deep), `deploy/dalidou/dedup-watcher.sh` host watcher for UI-triggered scans (mirrors graduation-watcher pattern). 21 new tests (similarity, prompt parse, idempotency, merge happy path, override content/tags, audit rows, abort-if-source-tampered, reject leaves sources alone, schema). Tests 374 β†’ 395. Not yet deployed; harness not re-run. Next: push + deploy, install `dedup-watcher.sh` in host cron, trigger first scan, review proposals in UI. + - **2026-04-16 Claude** `b687e7f..999788b` **"Make It Actually Useful" sprint.** Two-part session: ops fixes then consolidation sprint. **Part 1 β€” Ops fixes:** Deployed `b687e7f` (project inference from cwd). Fixed cron logging (was `/dev/null` β€” redirected to `~/atocore-logs/`). Fixed OpenClaw gateway crash-loop (`discord.replyToMode: "any"` invalid β†’ `"all"`). Deployed `atocore-capture` plugin on T420 OpenClaw using `before_agent_start` + `llm_output` hooks β€” verified end-to-end: 38 `client=openclaw` interactions captured. Backfilled project tags on 179/181 unscoped interactions (165 atocore, 8 p06, 6 p04). diff --git a/deploy/dalidou/batch-extract.sh b/deploy/dalidou/batch-extract.sh index b4c7a0a..a69ec97 100644 --- a/deploy/dalidou/batch-extract.sh +++ b/deploy/dalidou/batch-extract.sh @@ -166,6 +166,26 @@ curl -sSf -X POST "$ATOCORE_URL/admin/memory/extend-reinforced" \ log "WARN: extend-reinforced failed (non-blocking)" } +# Step B3: Memory dedup scan (Phase 7A) +# Nightly at 0.90 (tight β€” only near-duplicates). Sundays run a deeper +# pass at 0.85 to catch semantically-similar-but-differently-worded memories. +if [[ "$(date -u +%u)" == "7" ]]; then + DEDUP_THRESHOLD="0.85" + DEDUP_BATCH="80" + log "Step B3: memory dedup (Sunday deep pass, threshold $DEDUP_THRESHOLD)" +else + DEDUP_THRESHOLD="0.90" + DEDUP_BATCH="50" + log "Step B3: memory dedup (daily, threshold $DEDUP_THRESHOLD)" +fi +python3 "$APP_DIR/scripts/memory_dedup.py" \ + --base-url "$ATOCORE_URL" \ + --similarity-threshold "$DEDUP_THRESHOLD" \ + --max-batch "$DEDUP_BATCH" \ + 2>&1 || { + log "WARN: memory dedup failed (non-blocking)" +} + # Step G: Integrity check (Phase 4 V1) log "Step G: integrity check" python3 "$APP_DIR/scripts/integrity_check.py" \ diff --git a/deploy/dalidou/dedup-watcher.sh b/deploy/dalidou/dedup-watcher.sh new file mode 100644 index 0000000..578213e --- /dev/null +++ b/deploy/dalidou/dedup-watcher.sh @@ -0,0 +1,110 @@ +#!/usr/bin/env bash +# +# deploy/dalidou/dedup-watcher.sh +# ------------------------------- +# Host-side watcher for on-demand memory dedup scans (Phase 7A). +# +# The /admin/triage page has a "πŸ”— Scan for duplicates" button that POSTs +# to /admin/memory/dedup-scan with {project, similarity_threshold, max_batch}. +# The container writes this to project_state (atocore/config/dedup_requested_at). +# +# This script runs on the Dalidou HOST (where claude CLI lives), polls +# for the flag, and runs memory_dedup.py when seen. +# +# Installed via cron every 2 minutes: +# */2 * * * * /srv/storage/atocore/app/deploy/dalidou/dedup-watcher.sh \ +# >> /home/papa/atocore-logs/dedup-watcher.log 2>&1 +# +# Mirrors deploy/dalidou/graduation-watcher.sh exactly. + +set -euo pipefail + +ATOCORE_URL="${ATOCORE_URL:-http://127.0.0.1:8100}" +APP_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +LOCK_FILE="/tmp/atocore-dedup.lock" +LOG_DIR="/home/papa/atocore-logs" +mkdir -p "$LOG_DIR" + +TS="$(date -u +%Y-%m-%dT%H:%M:%SZ)" +log() { printf '[%s] %s\n' "$TS" "$*"; } + +# Fetch the flag via API +STATE_JSON=$(curl -sSf --max-time 5 "$ATOCORE_URL/project/state/atocore" 2>/dev/null || echo "{}") +REQUESTED=$(echo "$STATE_JSON" | python3 -c " +import sys, json +try: + d = json.load(sys.stdin) + for e in d.get('entries', d.get('state', [])): + if e.get('category') == 'config' and e.get('key') == 'dedup_requested_at': + print(e.get('value', '')) + break +except Exception: + pass +" 2>/dev/null || echo "") + +if [[ -z "$REQUESTED" ]]; then + exit 0 +fi + +PROJECT=$(echo "$REQUESTED" | python3 -c "import sys,json; print(json.loads(sys.stdin.read() or '{}').get('project',''))" 2>/dev/null || echo "") +THRESHOLD=$(echo "$REQUESTED" | python3 -c "import sys,json; print(json.loads(sys.stdin.read() or '{}').get('similarity_threshold',0.88))" 2>/dev/null || echo "0.88") +MAX_BATCH=$(echo "$REQUESTED" | python3 -c "import sys,json; print(json.loads(sys.stdin.read() or '{}').get('max_batch',50))" 2>/dev/null || echo "50") + +# Acquire lock +exec 9>"$LOCK_FILE" || exit 0 +if ! flock -n 9; then + log "dedup already running, skipping" + exit 0 +fi + +# Mark running +curl -sSf -X POST "$ATOCORE_URL/project/state" \ + -H 'Content-Type: application/json' \ + -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_running\",\"value\":\"1\",\"source\":\"dedup watcher\"}" \ + >/dev/null 2>&1 || true +curl -sSf -X POST "$ATOCORE_URL/project/state" \ + -H 'Content-Type: application/json' \ + -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_last_started_at\",\"value\":\"$TS\",\"source\":\"dedup watcher\"}" \ + >/dev/null 2>&1 || true + +LOG_FILE="$LOG_DIR/dedup-ondemand-$(date -u +%Y%m%d-%H%M%S).log" +log "Starting dedup (project='$PROJECT' threshold=$THRESHOLD max_batch=$MAX_BATCH, log: $LOG_FILE)" + +# Clear the flag BEFORE running so duplicate clicks queue at most one +curl -sSf -X DELETE "$ATOCORE_URL/project/state" \ + -H 'Content-Type: application/json' \ + -d "{\"project\":\"atocore\",\"category\":\"config\",\"key\":\"dedup_requested_at\"}" \ + >/dev/null 2>&1 || true + +cd "$APP_DIR" +export PYTHONPATH="$APP_DIR/src:${PYTHONPATH:-}" +ARGS=(--base-url "$ATOCORE_URL" --similarity-threshold "$THRESHOLD" --max-batch "$MAX_BATCH") +if [[ -n "$PROJECT" ]]; then + ARGS+=(--project "$PROJECT") +fi + +if python3 scripts/memory_dedup.py "${ARGS[@]}" >> "$LOG_FILE" 2>&1; then + RESULT=$(grep "^summary:" "$LOG_FILE" | tail -1 || tail -1 "$LOG_FILE") + RESULT="${RESULT:-completed}" + log "dedup finished: $RESULT" +else + RESULT="ERROR β€” see $LOG_FILE" + log "dedup FAILED" +fi + +FINISH_TS="$(date -u +%Y-%m-%dT%H:%M:%SZ)" + +curl -sSf -X POST "$ATOCORE_URL/project/state" \ + -H 'Content-Type: application/json' \ + -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_running\",\"value\":\"0\",\"source\":\"dedup watcher\"}" \ + >/dev/null 2>&1 || true +curl -sSf -X POST "$ATOCORE_URL/project/state" \ + -H 'Content-Type: application/json' \ + -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_last_finished_at\",\"value\":\"$FINISH_TS\",\"source\":\"dedup watcher\"}" \ + >/dev/null 2>&1 || true + +SAFE_RESULT=$(printf '%s' "$RESULT" | python3 -c "import sys,json; print(json.dumps(sys.stdin.read())[1:-1])") +curl -sSf -X POST "$ATOCORE_URL/project/state" \ + -H 'Content-Type: application/json' \ + -d "{\"project\":\"atocore\",\"category\":\"status\",\"key\":\"dedup_last_result\",\"value\":\"$SAFE_RESULT\",\"source\":\"dedup watcher\"}" \ + >/dev/null 2>&1 || true diff --git a/docs/PHASE-7-MEMORY-CONSOLIDATION.md b/docs/PHASE-7-MEMORY-CONSOLIDATION.md new file mode 100644 index 0000000..96b6064 --- /dev/null +++ b/docs/PHASE-7-MEMORY-CONSOLIDATION.md @@ -0,0 +1,96 @@ +# Phase 7 β€” Memory Consolidation (the "Sleep Cycle") + +**Status**: 7A in progress Β· 7B-H scoped, deferred +**Design principle**: *"Like human memory while sleeping, but more robotic β€” never discard relevant details. Consolidate, update, supersede β€” don't delete."* + +## Why + +Phases 1–6 built capture + triage + graduation + emerging-project detection. What they don't solve: + +| # | Problem | Fix | +|---|---|---| +| 1 | Redundancy β€” "APM uses NX" said 5 different ways across 5 memories | **7A** Semantic dedup | +| 2 | Latent contradictions β€” "chose Zygo" + "switched from Zygo" both active | **7B** Pair contradiction detection | +| 3 | Tag drift β€” `firmware`, `fw`, `firmware-control` fragment retrieval | **7C** Tag canonicalization | +| 4 | Confidence staleness β€” 6-month unreferenced memory ranks as fresh | **7D** Confidence decay | +| 5 | No memory drill-down page | **7E** `/wiki/memories/{id}` | +| 6 | Domain knowledge siloed per project | **7F** `/wiki/domains/{tag}` | +| 7 | Prompt upgrades (llm-0.5 β†’ 0.6) don't re-process old interactions | **7G** Re-extraction on version bump | +| 8 | Superseded memory vectors still in Chroma polluting retrieval | **7H** Vector hygiene | + +Collectively: the brain needs a nightly pass that looks at what it already knows and tidies up β€” dedup, resolve contradictions, canonicalize tags, decay stale facts β€” **without losing information**. + +## Subphases + +### 7A β€” Semantic dedup + consolidation *(this sprint)* + +Compute embeddings on active memories, find pairs within `(project, memory_type)` bucket above similarity threshold (default 0.88), cluster, draft a unified memory via LLM, human approves in triage UI. On approve: sources become `superseded`, new merged memory created with union of `source_refs`, sum of `reference_count`, max of `confidence`. **Ships first** because redundancy compounds β€” every new memory potentially duplicates an old one. + +Detailed spec lives in the working plan (`dapper-cooking-tower.md`) and across the files listed under "Files touched" below. Key decisions: + +- LLM drafts, human approves β€” no silent auto-merge. +- Same `(project, memory_type)` bucket only. Cross-project merges are rare + risky β†’ separate flow in 7B. +- Recompute embeddings each scan (~2s / 335 memories). Persist only if scan time becomes a problem. +- Cluster-based proposals (A~B~C β†’ one merge), not pair-based. +- `status=superseded` never deleted β€” still queryable with filter. + +**Schema**: new table `memory_merge_candidates` (pending | approved | rejected). +**Cron**: nightly at threshold 0.90 (tight); weekly (Sundays) at 0.85 (deeper cleanup). +**UI**: new "πŸ”— Merge Candidates" section in `/admin/triage`. + +**Files touched in 7A**: +- `src/atocore/models/database.py` β€” migration +- `src/atocore/memory/similarity.py` β€” new, `compute_memory_similarity()` +- `src/atocore/memory/_dedup_prompt.py` β€” new, shared LLM prompt +- `src/atocore/memory/service.py` β€” `merge_memories()` +- `scripts/memory_dedup.py` β€” new, host-side detector (HTTP-only) +- `src/atocore/api/routes.py` β€” 5 new endpoints under `/admin/memory/` +- `src/atocore/engineering/triage_ui.py` β€” merge cards section +- `deploy/dalidou/batch-extract.sh` β€” Step B3 +- `deploy/dalidou/dedup-watcher.sh` β€” new, UI-triggered scans +- `tests/test_memory_dedup.py` β€” ~10-15 new tests + +### 7B β€” Memory-to-memory contradiction detection + +Same embedding-pair machinery as 7A but within a *different* band (similarity 0.70–0.88 β€” semantically related but different wording). LLM classifies each pair: `duplicate | complementary | contradicts | supersedes-older`. Contradictions write a `memory_conflicts` row + surface a triage badge. Clear supersessions (both tier 1 sonnet and tier 2 opus agree) auto-mark the older as `superseded`. + +### 7C β€” Tag canonicalization + +Weekly LLM pass over `domain_tags` distribution, proposes `alias β†’ canonical` map (e.g. `fw β†’ firmware`). Human approves via UI (one-click pattern, same as emerging-project registration). Bulk-rewrites `domain_tags` atomically across all memories. + +### 7D β€” Confidence decay + +Daily lightweight job. For memories with `reference_count=0` AND `last_referenced_at` older than 30 days: multiply confidence by 0.97/day (~2-month half-life). Reinforcement already bumps confidence. Below 0.3 β†’ auto-supersede with reason `decayed, no references`. Reversible (tune half-life), non-destructive (still searchable with status filter). + +### 7E β€” Memory detail page `/wiki/memories/{id}` + +Provenance chain: source_chunk β†’ interaction β†’ graduated_to_entity. Audit trail (Phase 4 has the data). Related memories (same project + tag + semantic neighbors). Decay trajectory plot (if 7D ships). Link target from every memory surfaced anywhere in the wiki. + +### 7F β€” Cross-project domain view `/wiki/domains/{tag}` + +One page per `domain_tag` showing all memories + graduated entities with that tag, grouped by project. "Optics across p04+p05+p06" becomes a real navigable page. Answers the long-standing question the tag system was meant to enable. + +### 7G β€” Re-extraction on prompt upgrade + +`batch_llm_extract_live.py --force-reextract --since DATE`. Dedupe key: `(interaction_id, extractor_version)` β€” same run on same interaction doesn't double-create. Triggered manually when `LLM_EXTRACTOR_VERSION` bumps. Not automatic (destructive). + +### 7H β€” Vector store hygiene + +Nightly: scan `source_chunks` and `memory_embeddings` (added in 7A V2) for `status=superseded|invalid`. Delete matching vectors from Chroma. Fail-open β€” the retrieval harness catches any real regression. + +## Verification & ship order + +1. **7A** β€” ship + observe 1 week β†’ validate merge proposals are high-signal, rejection rate acceptable +2. **7D** β€” decay is low-risk + high-compounding value; ship second +3. **7C** β€” clean up tag fragmentation before 7F depends on canonical tags +4. **7E** + **7F** β€” UX surfaces; ship together once data is clean +5. **7B** β€” contradictions flow (pairs harder than duplicates to classify; wait for 7A data to tune threshold) +6. **7G** β€” on-demand; no ship until we actually bump the extractor prompt +7. **7H** β€” housekeeping; after 7A + 7B + 7D have generated enough `superseded` rows to matter + +## Scope NOT in Phase 7 + +- Graduated memories (entity-descended) are **frozen** β€” exempt from dedup/decay. Entity consolidation is a separate Phase (8+). +- Auto-merging without human approval (always human-in-the-loop in V1). +- Summarization / compression β€” a different problem (reducing the number of chunks per memory, not the number of memories). +- Forgetting policies β€” there's no user-facing "delete this" flow in Phase 7. Supersede + filter covers the need. diff --git a/scripts/memory_dedup.py b/scripts/memory_dedup.py new file mode 100644 index 0000000..1ce8b79 --- /dev/null +++ b/scripts/memory_dedup.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +"""Phase 7A β€” semantic memory dedup detector. + +Finds clusters of near-duplicate active memories and writes merge- +candidate proposals for human review in the triage UI. + +Algorithm: + 1. Fetch active memories via HTTP + 2. Group by (project, memory_type) β€” cross-bucket merges are deferred + to Phase 7B contradiction flow + 3. Within each group, embed contents via atocore.retrieval.embeddings + 4. Greedy transitive cluster at similarity >= threshold + 5. For each cluster of size >= 2, ask claude-p to draft unified content + 6. POST the proposal to /admin/memory/merge-candidates/create (server- + side dedupes by the sorted memory-id set, so re-runs don't double- + create) + +Host-side because claude CLI lives on Dalidou, not the container. Reuses +the same PYTHONPATH=src pattern as scripts/graduate_memories.py for +atocore imports (embeddings, similarity, prompt module). + +Usage: + python3 scripts/memory_dedup.py --base-url http://127.0.0.1:8100 \\ + --similarity-threshold 0.88 --max-batch 50 + +Threshold conventions (see Phase 7 doc): + 0.88 interactive / default β€” balanced precision/recall + 0.90 nightly cron β€” tight, only near-duplicates + 0.85 weekly cron β€” deeper cleanup +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +import tempfile +import time +import urllib.error +import urllib.request +from collections import defaultdict +from typing import Any + +# Make src/ importable β€” same pattern as graduate_memories.py +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) +if _SRC_DIR not in sys.path: + sys.path.insert(0, _SRC_DIR) + +from atocore.memory._dedup_prompt import ( # noqa: E402 + DEDUP_PROMPT_VERSION, + SYSTEM_PROMPT, + build_user_message, + normalize_merge_verdict, + parse_merge_verdict, +) +from atocore.memory.similarity import cluster_by_threshold # noqa: E402 + +DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100") +DEFAULT_MODEL = os.environ.get("ATOCORE_DEDUP_MODEL", "sonnet") +DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_DEDUP_TIMEOUT_S", "60")) + +_sandbox_cwd = None + + +def get_sandbox_cwd() -> str: + global _sandbox_cwd + if _sandbox_cwd is None: + _sandbox_cwd = tempfile.mkdtemp(prefix="ato-dedup-") + return _sandbox_cwd + + +def api_get(base_url: str, path: str) -> dict: + req = urllib.request.Request(f"{base_url}{path}") + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def api_post(base_url: str, path: str, body: dict | None = None) -> dict: + data = json.dumps(body or {}).encode("utf-8") + req = urllib.request.Request( + f"{base_url}{path}", method="POST", + headers={"Content-Type": "application/json"}, data=data, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def call_claude(system_prompt: str, user_message: str, model: str, timeout_s: float) -> tuple[str | None, str | None]: + """Shared CLI caller with retry + stderr capture (mirrors auto_triage).""" + if not shutil.which("claude"): + return None, "claude CLI not available" + args = [ + "claude", "-p", + "--model", model, + "--append-system-prompt", system_prompt, + "--disable-slash-commands", + user_message, + ] + last_error = "" + for attempt in range(3): + if attempt > 0: + time.sleep(2 ** attempt) + try: + completed = subprocess.run( + args, capture_output=True, text=True, + timeout=timeout_s, cwd=get_sandbox_cwd(), + encoding="utf-8", errors="replace", + ) + except subprocess.TimeoutExpired: + last_error = f"{model} timed out" + continue + except Exception as exc: + last_error = f"subprocess error: {exc}" + continue + if completed.returncode == 0: + return (completed.stdout or "").strip(), None + stderr = (completed.stderr or "").strip()[:200] + last_error = f"{model} exit {completed.returncode}: {stderr}" if stderr else f"{model} exit {completed.returncode}" + return None, last_error + + +def fetch_active_memories(base_url: str, project: str | None) -> list[dict]: + # The /memory endpoint with active_only=true returns active memories. + # Graduated memories are exempt from dedup β€” they're frozen pointers + # to entities. Filter them out on the client side. + params = "active_only=true&limit=2000" + if project: + params += f"&project={urllib.request.quote(project)}" + try: + result = api_get(base_url, f"/memory?{params}") + except Exception as e: + print(f"ERROR: could not fetch memories: {e}", file=sys.stderr) + return [] + mems = result.get("memories", []) + return [m for m in mems if (m.get("status") or "active") == "active"] + + +def group_memories(mems: list[dict]) -> dict[tuple[str, str], list[dict]]: + """Bucket by (project, memory_type). Empty project is its own bucket.""" + buckets: dict[tuple[str, str], list[dict]] = defaultdict(list) + for m in mems: + key = ((m.get("project") or "").strip().lower(), (m.get("memory_type") or "").strip().lower()) + buckets[key].append(m) + return buckets + + +def draft_merge(sources: list[dict], model: str, timeout_s: float) -> dict[str, Any] | None: + user_msg = build_user_message(sources) + raw, err = call_claude(SYSTEM_PROMPT, user_msg, model, timeout_s) + if err: + print(f" WARN: claude call failed: {err}", file=sys.stderr) + return None + parsed = parse_merge_verdict(raw or "") + if parsed is None: + print(f" WARN: could not parse verdict: {(raw or '')[:200]}", file=sys.stderr) + return None + return normalize_merge_verdict(parsed) + + +def submit_candidate( + base_url: str, + memory_ids: list[str], + similarity: float, + verdict: dict[str, Any], + dry_run: bool, +) -> str | None: + body = { + "memory_ids": memory_ids, + "similarity": similarity, + "proposed_content": verdict["content"], + "proposed_memory_type": verdict["memory_type"], + "proposed_project": verdict["project"], + "proposed_tags": verdict["domain_tags"], + "proposed_confidence": verdict["confidence"], + "reason": verdict["reason"], + } + if dry_run: + print(f" [dry-run] would POST: {json.dumps(body)[:200]}...") + return "dry-run" + try: + result = api_post(base_url, "/admin/memory/merge-candidates/create", body) + return result.get("candidate_id") + except urllib.error.HTTPError as e: + print(f" ERROR: submit failed: {e.code} {e.read().decode()[:200]}", file=sys.stderr) + return None + except Exception as e: + print(f" ERROR: submit failed: {e}", file=sys.stderr) + return None + + +def main() -> None: + parser = argparse.ArgumentParser(description="Phase 7A semantic dedup detector") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--project", default="", help="Only scan this project (empty = all)") + parser.add_argument("--similarity-threshold", type=float, default=0.88) + parser.add_argument("--max-batch", type=int, default=50, + help="Max clusters to propose per run") + parser.add_argument("--model", default=DEFAULT_MODEL) + parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + base = args.base_url.rstrip("/") + + print(f"memory_dedup {DEDUP_PROMPT_VERSION} | threshold={args.similarity_threshold} | model={args.model}") + mems = fetch_active_memories(base, args.project or None) + print(f"fetched {len(mems)} active memories") + if not mems: + return + + buckets = group_memories(mems) + print(f"grouped into {len(buckets)} (project, memory_type) buckets") + + clusters_found = 0 + candidates_created = 0 + skipped_existing = 0 + llm_rejections = 0 + + for (proj, mtype), group in sorted(buckets.items()): + if len(group) < 2: + continue + if candidates_created >= args.max_batch: + print(f"reached max-batch={args.max_batch}, stopping") + break + + texts = [(m.get("content") or "") for m in group] + clusters = cluster_by_threshold(texts, args.similarity_threshold) + # Keep only non-trivial clusters + clusters = [c for c in clusters if len(c) >= 2] + if not clusters: + continue + + print(f"\n[{proj or '(global)'}/{mtype}] {len(group)} mems β†’ {len(clusters)} cluster(s)") + for cluster in clusters: + if candidates_created >= args.max_batch: + break + clusters_found += 1 + sources = [group[i] for i in cluster] + ids = [s["id"] for s in sources] + # Approximate cluster similarity = min pairwise within cluster. + # For reporting, just use threshold (we know all pairs >= threshold + # transitively; min may be lower). Keep it simple. + sim = args.similarity_threshold + print(f" cluster of {len(cluster)}: {[s['id'][:8] for s in sources]}") + + verdict = draft_merge(sources, args.model, args.timeout_s) + if verdict is None: + continue + if verdict["action"] == "reject": + llm_rejections += 1 + print(f" LLM rejected: {verdict['reason'][:100]}") + continue + + cid = submit_candidate(base, ids, sim, verdict, args.dry_run) + if cid == "dry-run": + candidates_created += 1 + elif cid: + candidates_created += 1 + print(f" β†’ candidate {cid[:8]}") + else: + skipped_existing += 1 + + time.sleep(0.3) # be kind to claude CLI + + print( + f"\nsummary: clusters_found={clusters_found} " + f"candidates_created={candidates_created} " + f"llm_rejections={llm_rejections} " + f"skipped_existing={skipped_existing}" + ) + + +if __name__ == "__main__": + main() diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index f3a91d2..83769dd 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -1530,6 +1530,169 @@ def api_extend_reinforced() -> dict: return {"extended_count": len(extended), "extensions": extended} +# --- Phase 7A: memory dedup / merge-candidate lifecycle --- + + +class MergeCandidateCreateBody(BaseModel): + memory_ids: list[str] + similarity: float = 0.0 + proposed_content: str + proposed_memory_type: str = "knowledge" + proposed_project: str = "" + proposed_tags: list[str] = [] + proposed_confidence: float = 0.6 + reason: str = "" + + +class MergeCandidateApproveBody(BaseModel): + actor: str = "human-triage" + content: str | None = None + domain_tags: list[str] | None = None + + +class MergeCandidateRejectBody(BaseModel): + actor: str = "human-triage" + note: str = "" + + +class DedupScanRequestBody(BaseModel): + project: str = "" + similarity_threshold: float = 0.88 + max_batch: int = 50 + + +@router.get("/admin/memory/merge-candidates") +def api_list_merge_candidates(status: str = "pending", limit: int = 100) -> dict: + """Phase 7A: list merge-candidate proposals for triage UI.""" + from atocore.memory.service import get_merge_candidates + cands = get_merge_candidates(status=status, limit=limit) + return {"candidates": cands, "count": len(cands)} + + +@router.post("/admin/memory/merge-candidates/create") +def api_create_merge_candidate(body: MergeCandidateCreateBody) -> dict: + """Phase 7A: host-side dedup detector submits a proposal here. + + Server-side idempotency: if a pending candidate already exists for + the same sorted memory_id set, returns the existing id. + """ + from atocore.memory.service import create_merge_candidate + try: + cid = create_merge_candidate( + memory_ids=body.memory_ids, + similarity=body.similarity, + proposed_content=body.proposed_content, + proposed_memory_type=body.proposed_memory_type, + proposed_project=body.proposed_project, + proposed_tags=body.proposed_tags, + proposed_confidence=body.proposed_confidence, + reason=body.reason, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + if cid is None: + return {"candidate_id": None, "duplicate": True} + return {"candidate_id": cid, "duplicate": False} + + +@router.post("/admin/memory/merge-candidates/{candidate_id}/approve") +def api_approve_merge_candidate(candidate_id: str, body: MergeCandidateApproveBody) -> dict: + """Phase 7A: execute an approved merge. Sources β†’ superseded; new + merged memory created. UI can pass content/tag edits via body.""" + from atocore.memory.service import merge_memories + new_id = merge_memories( + candidate_id=candidate_id, + actor=body.actor, + override_content=body.content, + override_tags=body.domain_tags, + ) + if new_id is None: + raise HTTPException( + status_code=409, + detail="Merge could not execute (candidate not pending, or source memory tampered)", + ) + return {"status": "approved", "candidate_id": candidate_id, "result_memory_id": new_id} + + +@router.post("/admin/memory/merge-candidates/{candidate_id}/reject") +def api_reject_merge_candidate(candidate_id: str, body: MergeCandidateRejectBody) -> dict: + """Phase 7A: dismiss a merge candidate. Sources stay untouched.""" + from atocore.memory.service import reject_merge_candidate + ok = reject_merge_candidate(candidate_id, actor=body.actor, note=body.note) + if not ok: + raise HTTPException(status_code=404, detail="Candidate not found or already resolved") + return {"status": "rejected", "candidate_id": candidate_id} + + +@router.post("/admin/memory/dedup-scan") +def api_request_dedup_scan(body: DedupScanRequestBody) -> dict: + """Phase 7A: request a host-side dedup scan. + + Writes a flag in project_state with project + threshold + max_batch. + A host cron watcher picks it up within ~2 min and runs + scripts/memory_dedup.py. Mirrors /admin/graduation/request. + """ + import json as _json + from datetime import datetime as _dt, timezone as _tz + from atocore.context.project_state import set_state + + now = _dt.now(_tz.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + payload = _json.dumps({ + "project": (body.project or "").strip(), + "similarity_threshold": max(0.5, min(0.99, body.similarity_threshold)), + "max_batch": max(1, min(body.max_batch, 200)), + "requested_at": now, + }) + set_state( + project_name="atocore", + category="config", + key="dedup_requested_at", + value=payload, + source="admin ui", + ) + return { + "requested_at": now, + "project": body.project, + "similarity_threshold": body.similarity_threshold, + "max_batch": body.max_batch, + "note": "Host watcher picks up within ~2 min. Poll /admin/memory/dedup-status for progress.", + } + + +@router.get("/admin/memory/dedup-status") +def api_dedup_status() -> dict: + """Phase 7A: state of the dedup scan pipeline (UI polling).""" + import json as _json + from atocore.context.project_state import get_state + out = { + "requested": None, + "last_started_at": None, + "last_finished_at": None, + "last_result": None, + "is_running": False, + } + try: + for e in get_state("atocore"): + if e.category not in ("config", "status"): + continue + if e.key == "dedup_requested_at": + try: + out["requested"] = _json.loads(e.value) + except Exception: + out["requested"] = {"raw": e.value} + elif e.key == "dedup_last_started_at": + out["last_started_at"] = e.value + elif e.key == "dedup_last_finished_at": + out["last_finished_at"] = e.value + elif e.key == "dedup_last_result": + out["last_result"] = e.value + elif e.key == "dedup_running": + out["is_running"] = (e.value == "1") + except Exception: + pass + return out + + @router.get("/admin/graduation/stats") def api_graduation_stats() -> dict: """Phase 5F graduation stats for dashboard.""" diff --git a/src/atocore/engineering/triage_ui.py b/src/atocore/engineering/triage_ui.py index 696e329..68cadad 100644 --- a/src/atocore/engineering/triage_ui.py +++ b/src/atocore/engineering/triage_ui.py @@ -377,6 +377,177 @@ _ENTITY_TRIAGE_CSS = """ """ +# --------------------------------------------------------------------- +# Phase 7A β€” Merge candidates (semantic dedup) +# --------------------------------------------------------------------- + +_MERGE_TRIAGE_CSS = """ + +""" + + +def _render_merge_card(cand: dict) -> str: + import json as _json + cid = _escape(cand.get("id", "")) + sim = cand.get("similarity") or 0.0 + sources = cand.get("sources") or [] + proposed_content = cand.get("proposed_content") or "" + proposed_tags = cand.get("proposed_tags") or [] + proposed_project = cand.get("proposed_project") or "" + reason = cand.get("reason") or "" + + src_html = "".join( + f""" +
+
+ {_escape(s.get('id','')[:8])} Β· [{_escape(s.get('memory_type',''))}] + Β· {_escape(s.get('project','') or '(global)')} + Β· conf {float(s.get('confidence',0)):.2f} + Β· refs {int(s.get('reference_count',0))} +
+
{_escape((s.get('content') or '')[:300])}
+
+ """ + for s in sources + ) + tags_str = ", ".join(proposed_tags) + return f""" +
+
+ [merge Β· {len(sources)} sources] + {_escape(proposed_project or '(global)')} + sim β‰₯ {sim:.2f} +
+
{src_html}
+
↓ merged into ↓
+
+ +
+ +
+ {f'
πŸ’‘ {_escape(reason)}
' if reason else ''} +
+
+ + +
+
+
+""" + + +_MERGE_TRIAGE_SCRIPT = """ + +""" + + +def _render_dedup_bar() -> str: + return """ +
+ + + + Finds semantically near-duplicate active memories and proposes LLM-drafted merges for review. Source memories become superseded on approve; nothing is deleted. + +
+""" + + def _render_graduation_bar() -> str: """The 'Graduate memories β†’ entity candidates' control bar.""" from atocore.projects.registry import load_project_registry @@ -478,26 +649,51 @@ def render_triage_page(limit: int = 100) -> str: except Exception as e: entity_candidates = [] - total = len(mem_candidates) + len(entity_candidates) + try: + from atocore.memory.service import get_merge_candidates + merge_candidates = get_merge_candidates(status="pending", limit=limit) + except Exception: + merge_candidates = [] + + total = len(mem_candidates) + len(entity_candidates) + len(merge_candidates) graduation_bar = _render_graduation_bar() + dedup_bar = _render_dedup_bar() if total == 0: - body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + f""" + body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + _MERGE_TRIAGE_CSS + f"""

Triage Queue

{graduation_bar} + {dedup_bar}

πŸŽ‰ No candidates to review.

The auto-triage pipeline keeps this queue empty unless something needs your judgment.

-

Use the πŸŽ“ Graduate memories button above to propose new entity candidates from existing memories.

+

Use πŸŽ“ Graduate memories to propose entity candidates, or πŸ”— Scan for duplicates to find near-duplicate memories to merge.

- """ + _GRADUATION_SCRIPT + """ + _GRADUATION_SCRIPT + _MERGE_TRIAGE_SCRIPT return render_html("Triage β€” AtoCore", body, breadcrumbs=[("Wiki", "/wiki"), ("Triage", "")]) # Memory cards mem_cards = "".join(_render_candidate_card(c) for c in mem_candidates) + # Merge cards (Phase 7A) + merge_cards_html = "" + if merge_candidates: + merge_cards = "".join(_render_merge_card(c) for c in merge_candidates) + merge_cards_html = f""" +
+

πŸ”— Merge Candidates ({len(merge_candidates)})

+

+ Semantically near-duplicate active memories. Approving merges the sources + into the proposed unified memory; sources become superseded + (not deleted β€” still queryable). You can edit the draft content and tags + before approving. +

+
+ {merge_cards} + """ + # Entity cards ent_cards_html = "" if entity_candidates: @@ -513,11 +709,12 @@ def render_triage_page(limit: int = 100) -> str: {ent_cards} """ - body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + f""" + body = _TRIAGE_CSS + _ENTITY_TRIAGE_CSS + _MERGE_TRIAGE_CSS + f"""

Triage Queue

{len(mem_candidates)} memory Β· + {len(merge_candidates)} merge Β· {len(entity_candidates)} entity
@@ -536,10 +733,12 @@ def render_triage_page(limit: int = 100) -> str: {graduation_bar} + {dedup_bar}

πŸ“ Memory Candidates ({len(mem_candidates)})

{mem_cards} + {merge_cards_html} {ent_cards_html} - """ + _TRIAGE_SCRIPT + _ENTITY_TRIAGE_SCRIPT + _GRADUATION_SCRIPT + """ + _TRIAGE_SCRIPT + _ENTITY_TRIAGE_SCRIPT + _GRADUATION_SCRIPT + _MERGE_TRIAGE_SCRIPT return render_html( "Triage — AtoCore", diff --git a/src/atocore/memory/_dedup_prompt.py b/src/atocore/memory/_dedup_prompt.py new file mode 100644 index 0000000..95c5777 --- /dev/null +++ b/src/atocore/memory/_dedup_prompt.py @@ -0,0 +1,156 @@ +"""Shared LLM prompt + parser for memory dedup (Phase 7A). + +Stdlib-only — must be importable from both the in-container service +layer (when a user clicks "scan for duplicates" in the UI) and the +host-side batch script (``scripts/memory_dedup.py``), which runs on +Dalidou where the container's Python deps are not available. + +The prompt instructs the model to draft a UNIFIED memory that +preserves every specific detail from the sources. We never want a +merge to lose information — if two memories disagree on a number, the +merged content should surface both with context. +""" + +from __future__ import annotations + +import json +from typing import Any + +DEDUP_PROMPT_VERSION = "dedup-0.1.0" +MAX_CONTENT_CHARS = 1000 +MAX_SOURCES = 8 # cluster size cap — bigger clusters are suspicious + +SYSTEM_PROMPT = """You consolidate near-duplicate memories for AtoCore, a personal context engine. + +Given 2-8 memories that a semantic-similarity scan flagged as likely duplicates, draft a UNIFIED replacement that preserves every specific detail from every source. + +CORE PRINCIPLE: information never gets lost. If the sources disagree on a number, date, vendor, or spec, surface BOTH with attribution (e.g., "quoted at $3.2k on 2026-03-01, revised to $3.8k on 2026-04-10"). If one source is more specific than another, keep the specificity. If they say the same thing differently, pick the clearer wording. + +YOU MUST: +- Produce content under 500 characters that reads as a single coherent statement +- Keep all project/vendor/person/part names that appear in any source +- Keep all numbers, dates, and identifiers +- Keep the strongest claim wording ("ratified", "decided", "committed") if any source has it +- Propose domain_tags as a UNION of the sources' tags (lowercase, deduped, cap 6) +- Return valid_until = latest non-null valid_until across sources, or null if any source has null (permanent beats transient) + +REFUSE TO MERGE (return action="reject") if: +- The memories are actually about DIFFERENT subjects that just share vocabulary (e.g., "p04 mirror" and "p05 mirror" — same project bucket means same project, but different components) +- One memory CONTRADICTS another and you cannot reconcile them — flag for contradiction review instead +- The sources span different time snapshots of a changing state that should stay as a timeline, not be collapsed + +OUTPUT — raw JSON, no prose, no markdown fences: +{ + "action": "merge" | "reject", + "content": "the unified memory content", + "memory_type": "knowledge|project|preference|adaptation|episodic|identity", + "project": "project-slug or empty", + "domain_tags": ["tag1", "tag2"], + "confidence": 0.5, + "reason": "one sentence explaining the merge (or the rejection)" +} + +On action=reject, still fill content with a short explanation and set confidence=0.""" + + +def build_user_message(sources: list[dict[str, Any]]) -> str: + """Format N source memories for the model to consolidate. + + Each source dict should carry id, content, project, memory_type, + domain_tags, confidence, valid_until, reference_count. + """ + lines = [f"You have {len(sources)} source memories in the same (project, memory_type) bucket:\n"] + for i, src in enumerate(sources[:MAX_SOURCES], start=1): + tags = src.get("domain_tags") or [] + if isinstance(tags, str): + try: + tags = json.loads(tags) + except Exception: + tags = [] + lines.append( + f"--- Source {i} (id={src.get('id','?')[:8]}, " + f"refs={src.get('reference_count',0)}, " + f"conf={src.get('confidence',0):.2f}, " + f"valid_until={src.get('valid_until') or 'permanent'}) ---" + ) + lines.append(f"project: {src.get('project','')}") + lines.append(f"type: {src.get('memory_type','')}") + lines.append(f"tags: {tags}") + lines.append(f"content: {(src.get('content') or '')[:MAX_CONTENT_CHARS]}") + lines.append("") + lines.append("Return the JSON object now.") + return "\n".join(lines) + + +def parse_merge_verdict(raw_output: str) -> dict[str, Any] | None: + """Strip markdown fences / leading prose and return the parsed JSON + object. Returns None on parse failure.""" + text = (raw_output or "").strip() + if text.startswith("```"): + text = text.strip("`") + nl = text.find("\n") + if nl >= 0: + text = text[nl + 1:] + if text.endswith("```"): + text = text[:-3] + text = text.strip() + + if not text.lstrip().startswith("{"): + start = text.find("{") + end = text.rfind("}") + if start >= 0 and end > start: + text = text[start:end + 1] + + try: + parsed = json.loads(text) + except json.JSONDecodeError: + return None + if not isinstance(parsed, dict): + return None + return parsed + + +def normalize_merge_verdict(verdict: dict[str, Any]) -> dict[str, Any] | None: + """Validate + normalize a raw merge verdict. Returns None if the + verdict is unusable (no content, unknown action).""" + action = str(verdict.get("action") or "").strip().lower() + if action not in ("merge", "reject"): + return None + + content = str(verdict.get("content") or "").strip() + if not content: + return None + + memory_type = str(verdict.get("memory_type") or "knowledge").strip().lower() + project = str(verdict.get("project") or "").strip() + + raw_tags = verdict.get("domain_tags") or [] + if isinstance(raw_tags, str): + raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()] + if not isinstance(raw_tags, list): + raw_tags = [] + tags: list[str] = [] + for t in raw_tags[:6]: + if not isinstance(t, str): + continue + tt = t.strip().lower() + if tt and tt not in tags: + tags.append(tt) + + try: + confidence = float(verdict.get("confidence", 0.5)) + except (TypeError, ValueError): + confidence = 0.5 + confidence = max(0.0, min(1.0, confidence)) + + reason = str(verdict.get("reason") or "").strip()[:500] + + return { + "action": action, + "content": content[:1000], + "memory_type": memory_type, + "project": project, + "domain_tags": tags, + "confidence": confidence, + "reason": reason, + } diff --git a/src/atocore/memory/service.py b/src/atocore/memory/service.py index d2bec89..ff841c7 100644 --- a/src/atocore/memory/service.py +++ b/src/atocore/memory/service.py @@ -925,3 +925,327 @@ def _row_to_memory(row) -> Memory: def _validate_confidence(confidence: float) -> None: if not 0.0 <= confidence <= 1.0: raise ValueError("Confidence must be between 0.0 and 1.0") + + +# --------------------------------------------------------------------- +# Phase 7A — Memory Consolidation: merge-candidate lifecycle +# --------------------------------------------------------------------- +# +# The detector (scripts/memory_dedup.py) writes proposals into +# memory_merge_candidates. The triage UI lists pending rows, a human +# reviews, and on approve we execute the merge here — never at detect +# time. This keeps the audit trail clean: every mutation is a human +# decision. + + +def create_merge_candidate( + memory_ids: list[str], + similarity: float, + proposed_content: str, + proposed_memory_type: str, + proposed_project: str, + proposed_tags: list[str] | None = None, + proposed_confidence: float = 0.6, + reason: str = "", +) -> str | None: + """Insert a merge-candidate row. Returns the new row id, or None if + a pending candidate already covers this exact set of memory ids + (idempotent scan — re-running the detector doesn't double-create).""" + import json as _json + + if not memory_ids or len(memory_ids) < 2: + raise ValueError("merge candidate requires at least 2 memory_ids") + + memory_ids_sorted = sorted(set(memory_ids)) + memory_ids_json = _json.dumps(memory_ids_sorted) + tags_json = _json.dumps(_normalize_tags(proposed_tags)) + candidate_id = str(uuid.uuid4()) + + with get_connection() as conn: + # Idempotency: same sorted-id set already pending? skip. + existing = conn.execute( + "SELECT id FROM memory_merge_candidates " + "WHERE status = 'pending' AND memory_ids = ?", + (memory_ids_json,), + ).fetchone() + if existing: + return None + + conn.execute( + "INSERT INTO memory_merge_candidates " + "(id, status, memory_ids, similarity, proposed_content, " + "proposed_memory_type, proposed_project, proposed_tags, " + "proposed_confidence, reason) " + "VALUES (?, 'pending', ?, ?, ?, ?, ?, ?, ?, ?)", + ( + candidate_id, memory_ids_json, float(similarity or 0.0), + (proposed_content or "")[:2000], + (proposed_memory_type or "knowledge")[:50], + (proposed_project or "")[:100], + tags_json, + max(0.0, min(1.0, float(proposed_confidence))), + (reason or "")[:500], + ), + ) + log.info( + "merge_candidate_created", + candidate_id=candidate_id, + memory_count=len(memory_ids_sorted), + similarity=round(similarity, 4), + ) + return candidate_id + + +def get_merge_candidates(status: str = "pending", limit: int = 100) -> list[dict]: + """List merge candidates with their source memories inlined.""" + import json as _json + + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM memory_merge_candidates " + "WHERE status = ? ORDER BY created_at DESC LIMIT ?", + (status, limit), + ).fetchall() + + out = [] + for r in rows: + try: + mem_ids = _json.loads(r["memory_ids"] or "[]") + except Exception: + mem_ids = [] + try: + tags = _json.loads(r["proposed_tags"] or "[]") + except Exception: + tags = [] + + sources = [] + for mid in mem_ids: + srow = conn.execute( + "SELECT id, memory_type, content, project, confidence, " + "status, reference_count, domain_tags, valid_until " + "FROM memories WHERE id = ?", + (mid,), + ).fetchone() + if srow: + try: + stags = _json.loads(srow["domain_tags"] or "[]") + except Exception: + stags = [] + sources.append({ + "id": srow["id"], + "memory_type": srow["memory_type"], + "content": srow["content"], + "project": srow["project"] or "", + "confidence": srow["confidence"], + "status": srow["status"], + "reference_count": int(srow["reference_count"] or 0), + "domain_tags": stags, + "valid_until": srow["valid_until"] or "", + }) + + out.append({ + "id": r["id"], + "status": r["status"], + "memory_ids": mem_ids, + "similarity": r["similarity"], + "proposed_content": r["proposed_content"] or "", + "proposed_memory_type": r["proposed_memory_type"] or "knowledge", + "proposed_project": r["proposed_project"] or "", + "proposed_tags": tags, + "proposed_confidence": r["proposed_confidence"], + "reason": r["reason"] or "", + "created_at": r["created_at"], + "resolved_at": r["resolved_at"], + "resolved_by": r["resolved_by"], + "result_memory_id": r["result_memory_id"], + "sources": sources, + }) + return out + + +def reject_merge_candidate(candidate_id: str, actor: str = "human-triage", note: str = "") -> bool: + """Mark a merge candidate as rejected. Source memories stay untouched.""" + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + with get_connection() as conn: + result = conn.execute( + "UPDATE memory_merge_candidates " + "SET status = 'rejected', resolved_at = ?, resolved_by = ? " + "WHERE id = ? AND status = 'pending'", + (now_str, actor, candidate_id), + ) + if result.rowcount == 0: + return False + log.info("merge_candidate_rejected", candidate_id=candidate_id, actor=actor, note=note[:100]) + return True + + +def merge_memories( + candidate_id: str, + actor: str = "human-triage", + override_content: str | None = None, + override_tags: list[str] | None = None, +) -> str | None: + """Execute an approved merge candidate. + + 1. Validate all source memories still status=active + 2. Create the new merged memory (status=active) + 3. Mark each source status=superseded with an audit row pointing at + the new merged id + 4. Mark the candidate status=approved, record result_memory_id + 5. Write a consolidated audit row on the new memory + + Returns the new merged memory's id, or None if the candidate cannot + be executed (already resolved, source tampered, etc.). + + ``override_content`` and ``override_tags`` let the UI pass the human's + edits before clicking approve. + """ + import json as _json + + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + with get_connection() as conn: + row = conn.execute( + "SELECT * FROM memory_merge_candidates WHERE id = ?", + (candidate_id,), + ).fetchone() + if row is None or row["status"] != "pending": + log.warning("merge_candidate_not_pending", candidate_id=candidate_id) + return None + + try: + mem_ids = _json.loads(row["memory_ids"] or "[]") + except Exception: + mem_ids = [] + if not mem_ids or len(mem_ids) < 2: + log.warning("merge_candidate_invalid_memory_ids", candidate_id=candidate_id) + return None + + # Snapshot sources + validate all active + source_rows = [] + for mid in mem_ids: + srow = conn.execute( + "SELECT * FROM memories WHERE id = ?", (mid,) + ).fetchone() + if srow is None or srow["status"] != "active": + log.warning( + "merge_source_not_active", + candidate_id=candidate_id, + memory_id=mid, + actual_status=(srow["status"] if srow else "missing"), + ) + return None + source_rows.append(srow) + + # Build merged memory fields — prefer human overrides, then proposed + content = (override_content or row["proposed_content"] or "").strip() + if not content: + log.warning("merge_candidate_empty_content", candidate_id=candidate_id) + return None + + merged_type = (row["proposed_memory_type"] or source_rows[0]["memory_type"]).lower() + if merged_type not in MEMORY_TYPES: + merged_type = source_rows[0]["memory_type"] + + merged_project = row["proposed_project"] or source_rows[0]["project"] or "" + merged_project = resolve_project_name(merged_project) + + # Tags: override wins, else proposed, else union of sources + if override_tags is not None: + merged_tags = _normalize_tags(override_tags) + else: + try: + proposed_tags = _json.loads(row["proposed_tags"] or "[]") + except Exception: + proposed_tags = [] + if proposed_tags: + merged_tags = _normalize_tags(proposed_tags) + else: + union: list[str] = [] + for srow in source_rows: + try: + stags = _json.loads(srow["domain_tags"] or "[]") + except Exception: + stags = [] + for t in stags: + if isinstance(t, str) and t and t not in union: + union.append(t) + merged_tags = union + + # confidence = max; reference_count = sum + merged_confidence = max(float(s["confidence"]) for s in source_rows) + total_refs = sum(int(s["reference_count"] or 0) for s in source_rows) + + # valid_until: if any source is permanent (None/empty), merged is permanent. + # Otherwise take the latest (lexical compare on ISO dates works). + merged_vu: str | None = "" # placeholder + has_permanent = any(not (s["valid_until"] or "").strip() for s in source_rows) + if has_permanent: + merged_vu = None + else: + merged_vu = max((s["valid_until"] or "").strip() for s in source_rows) or None + + new_id = str(uuid.uuid4()) + tags_json = _json.dumps(merged_tags) + + conn.execute( + "INSERT INTO memories (id, memory_type, content, project, " + "source_chunk_id, confidence, status, domain_tags, valid_until, " + "reference_count, last_referenced_at) " + "VALUES (?, ?, ?, ?, NULL, ?, 'active', ?, ?, ?, ?)", + ( + new_id, merged_type, content[:2000], merged_project, + merged_confidence, tags_json, merged_vu, total_refs, now_str, + ), + ) + + # Mark sources superseded + for srow in source_rows: + conn.execute( + "UPDATE memories SET status = 'superseded', updated_at = ? " + "WHERE id = ?", + (now_str, srow["id"]), + ) + + # Mark candidate approved + conn.execute( + "UPDATE memory_merge_candidates SET status = 'approved', " + "resolved_at = ?, resolved_by = ?, result_memory_id = ? WHERE id = ?", + (now_str, actor, new_id, candidate_id), + ) + + # Audit rows (out of the transaction; fail-open via _audit_memory) + _audit_memory( + memory_id=new_id, + action="created_via_merge", + actor=actor, + after={ + "memory_type": merged_type, + "content": content, + "project": merged_project, + "confidence": merged_confidence, + "domain_tags": merged_tags, + "reference_count": total_refs, + "merged_from": list(mem_ids), + "merge_candidate_id": candidate_id, + }, + note=f"merged {len(mem_ids)} sources via candidate {candidate_id[:8]}", + ) + for srow in source_rows: + _audit_memory( + memory_id=srow["id"], + action="superseded", + actor=actor, + before={"status": "active", "content": srow["content"]}, + after={"status": "superseded", "superseded_by": new_id}, + note=f"merged into {new_id}", + ) + + log.info( + "merge_executed", + candidate_id=candidate_id, + result_memory_id=new_id, + source_count=len(source_rows), + actor=actor, + ) + return new_id diff --git a/src/atocore/memory/similarity.py b/src/atocore/memory/similarity.py new file mode 100644 index 0000000..378d5e7 --- /dev/null +++ b/src/atocore/memory/similarity.py @@ -0,0 +1,88 @@ +"""Phase 7A (Memory Consolidation): semantic similarity helpers. + +Thin wrapper over ``atocore.retrieval.embeddings`` that exposes +pairwise + batch cosine similarity on normalized embeddings. Used by +the dedup detector to cluster near-duplicate active memories. + +Embeddings from ``embed_texts()`` are already L2-normalized, so cosine +similarity reduces to a dot product — no extra normalization needed. +""" + +from __future__ import annotations + +from atocore.retrieval.embeddings import embed_texts + + +def _dot(a: list[float], b: list[float]) -> float: + return sum(x * y for x, y in zip(a, b)) + + +def cosine(a: list[float], b: list[float]) -> float: + """Cosine similarity on already-normalized vectors. Clamped to [0,1] + (embeddings use paraphrase-multilingual-MiniLM which is unit-norm, + and we never want negative values leaking into thresholds).""" + return max(0.0, min(1.0, _dot(a, b))) + + +def compute_memory_similarity(text_a: str, text_b: str) -> float: + """Return cosine similarity of two memory contents in [0,1]. + + Convenience helper for one-off checks + tests. For batch work (the + dedup detector), use ``embed_texts()`` directly and compute the + similarity matrix yourself to avoid re-embedding shared texts. + """ + if not text_a or not text_b: + return 0.0 + vecs = embed_texts([text_a, text_b]) + return cosine(vecs[0], vecs[1]) + + +def similarity_matrix(texts: list[str]) -> list[list[float]]: + """N×N cosine similarity matrix. Diagonal is 1.0, symmetric.""" + if not texts: + return [] + vecs = embed_texts(texts) + n = len(vecs) + matrix = [[0.0] * n for _ in range(n)] + for i in range(n): + matrix[i][i] = 1.0 + for j in range(i + 1, n): + s = cosine(vecs[i], vecs[j]) + matrix[i][j] = s + matrix[j][i] = s + return matrix + + +def cluster_by_threshold(texts: list[str], threshold: float) -> list[list[int]]: + """Greedy transitive clustering: if sim(i,j) >= threshold, merge. + + Returns a list of clusters, each a list of indices into ``texts``. + Singletons are included. Used by the dedup detector to collapse + A~B~C into one merge proposal rather than three pair proposals. + """ + if not texts: + return [] + matrix = similarity_matrix(texts) + n = len(texts) + parent = list(range(n)) + + def find(x: int) -> int: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: int, y: int) -> None: + rx, ry = find(x), find(y) + if rx != ry: + parent[rx] = ry + + for i in range(n): + for j in range(i + 1, n): + if matrix[i][j] >= threshold: + union(i, j) + + groups: dict[int, list[int]] = {} + for i in range(n): + groups.setdefault(find(i), []).append(i) + return list(groups.values()) diff --git a/src/atocore/models/database.py b/src/atocore/models/database.py index e65010a..b9605ec 100644 --- a/src/atocore/models/database.py +++ b/src/atocore/models/database.py @@ -251,6 +251,42 @@ def _apply_migrations(conn: sqlite3.Connection) -> None: "CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at)" ) + # Phase 7A (Memory Consolidation — "sleep cycle"): merge candidates. + # When the dedup detector finds a cluster of semantically similar active + # memories within the same (project, memory_type) bucket, it drafts a + # unified content via LLM and writes a proposal here. The triage UI + # surfaces these for human approval. On approve, source memories become + # status=superseded and a new merged memory is created. + # memory_ids is a JSON array (length >= 2) of the source memory ids. + # proposed_* hold the LLM's draft; a human can edit before approve. + # result_memory_id is filled on approve with the new merged memory's id. + conn.execute( + """ + CREATE TABLE IF NOT EXISTS memory_merge_candidates ( + id TEXT PRIMARY KEY, + status TEXT DEFAULT 'pending', + memory_ids TEXT NOT NULL, + similarity REAL, + proposed_content TEXT, + proposed_memory_type TEXT, + proposed_project TEXT, + proposed_tags TEXT DEFAULT '[]', + proposed_confidence REAL, + reason TEXT DEFAULT '', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + resolved_at DATETIME, + resolved_by TEXT, + result_memory_id TEXT + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_mmc_status ON memory_merge_candidates(status)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_mmc_created_at ON memory_merge_candidates(created_at)" + ) + def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() diff --git a/tests/test_memory_dedup.py b/tests/test_memory_dedup.py new file mode 100644 index 0000000..345e543 --- /dev/null +++ b/tests/test_memory_dedup.py @@ -0,0 +1,380 @@ +"""Phase 7A — memory consolidation tests. + +Covers: + - similarity helpers (cosine bounds, matrix symmetry, clustering) + - _dedup_prompt parser / normalizer robustness + - create_merge_candidate idempotency + - get_merge_candidates inlines source memories + - merge_memories end-to-end happy path (sources → superseded, + new merged memory active, audit rows, result_memory_id) + - reject_merge_candidate leaves sources untouched +""" + +from __future__ import annotations + +import pytest + +from atocore.memory._dedup_prompt import ( + normalize_merge_verdict, + parse_merge_verdict, +) +from atocore.memory.service import ( + create_memory, + create_merge_candidate, + get_memory_audit, + get_merge_candidates, + merge_memories, + reject_merge_candidate, +) +from atocore.memory.similarity import ( + cluster_by_threshold, + cosine, + compute_memory_similarity, + similarity_matrix, +) +from atocore.models.database import get_connection, init_db + + +# --- Similarity helpers --- + + +def test_cosine_bounds(): + assert cosine([1.0, 0.0], [1.0, 0.0]) == pytest.approx(1.0) + assert cosine([1.0, 0.0], [0.0, 1.0]) == pytest.approx(0.0) + # Negative dot product clamped to 0 + assert cosine([1.0, 0.0], [-1.0, 0.0]) == 0.0 + + +def test_compute_memory_similarity_identical_high(): + s = compute_memory_similarity("the sky is blue", "the sky is blue") + assert 0.99 <= s <= 1.0 + + +def test_compute_memory_similarity_unrelated_low(): + s = compute_memory_similarity( + "APM integrates with NX via a Python bridge", + "the polisher firmware must use USB SSD not SD card", + ) + assert 0.0 <= s < 0.7 + + +def test_similarity_matrix_symmetric(): + texts = ["alpha beta gamma", "alpha beta gamma", "completely unrelated text"] + m = similarity_matrix(texts) + assert len(m) == 3 and all(len(r) == 3 for r in m) + for i in range(3): + assert m[i][i] == pytest.approx(1.0) + for i in range(3): + for j in range(3): + assert m[i][j] == pytest.approx(m[j][i]) + + +def test_cluster_by_threshold_transitive(): + # Three near-paraphrases should land in one cluster + texts = [ + "Antoine prefers OAuth over API keys", + "Antoine's preference is OAuth, not API keys", + "the polisher firmware uses USB SSD storage", + ] + clusters = cluster_by_threshold(texts, threshold=0.7) + # At least one cluster of size 2+ containing the paraphrases + big = [c for c in clusters if len(c) >= 2] + assert big, f"expected at least one multi-member cluster, got {clusters}" + assert 0 in big[0] and 1 in big[0] + + +# --- Prompt parser robustness --- + + +def test_parse_merge_verdict_strips_fences(): + raw = "```json\n{\"action\":\"merge\",\"content\":\"x\"}\n```" + parsed = parse_merge_verdict(raw) + assert parsed == {"action": "merge", "content": "x"} + + +def test_parse_merge_verdict_handles_prose_prefix(): + raw = "Sure! Here's the result:\n{\"action\":\"reject\",\"content\":\"no\"}" + parsed = parse_merge_verdict(raw) + assert parsed is not None + assert parsed["action"] == "reject" + + +def test_normalize_merge_verdict_fills_defaults(): + v = normalize_merge_verdict({ + "action": "merge", + "content": "unified text", + }) + assert v is not None + assert v["memory_type"] == "knowledge" + assert v["project"] == "" + assert v["domain_tags"] == [] + assert v["confidence"] == 0.5 + + +def test_normalize_merge_verdict_rejects_empty_content(): + assert normalize_merge_verdict({"action": "merge", "content": ""}) is None + + +def test_normalize_merge_verdict_rejects_unknown_action(): + assert normalize_merge_verdict({"action": "?", "content": "x"}) is None + + +# --- create_merge_candidate idempotency --- + + +def test_create_merge_candidate_inserts_row(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "APM uses NX for DXF conversion") + m2 = create_memory("knowledge", "APM uses NX for DXF-to-STL") + + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], + similarity=0.92, + proposed_content="APM uses NX for DXF→STL conversion", + proposed_memory_type="knowledge", + proposed_project="", + proposed_tags=["apm", "nx"], + proposed_confidence=0.6, + reason="near-paraphrase", + ) + assert cid is not None + + pending = get_merge_candidates(status="pending") + assert len(pending) == 1 + assert pending[0]["id"] == cid + assert pending[0]["similarity"] == pytest.approx(0.92) + assert len(pending[0]["sources"]) == 2 + + +def test_create_merge_candidate_idempotent(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "Fact A") + m2 = create_memory("knowledge", "Fact A slightly reworded") + + first = create_merge_candidate( + memory_ids=[m1.id, m2.id], + similarity=0.9, + proposed_content="merged", + proposed_memory_type="knowledge", + proposed_project="", + ) + # Same id set, different order → dedupe skips + second = create_merge_candidate( + memory_ids=[m2.id, m1.id], + similarity=0.9, + proposed_content="merged (again)", + proposed_memory_type="knowledge", + proposed_project="", + ) + assert first is not None + assert second is None + + +def test_create_merge_candidate_requires_two_ids(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "lonely") + with pytest.raises(ValueError): + create_merge_candidate( + memory_ids=[m1.id], + similarity=1.0, + proposed_content="x", + proposed_memory_type="knowledge", + proposed_project="", + ) + + +# --- merge_memories end-to-end --- + + +def test_merge_memories_happy_path(tmp_data_dir): + init_db() + m1 = create_memory( + "knowledge", "APM uses NX for DXF conversion", + project="apm", confidence=0.6, domain_tags=["apm", "nx"], + ) + m2 = create_memory( + "knowledge", "APM does DXF to STL via NX bridge", + project="apm", confidence=0.8, domain_tags=["apm", "bridge"], + ) + # Bump reference counts so sum is meaningful + with get_connection() as conn: + conn.execute("UPDATE memories SET reference_count = 3 WHERE id = ?", (m1.id,)) + conn.execute("UPDATE memories SET reference_count = 5 WHERE id = ?", (m2.id,)) + + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], + similarity=0.92, + proposed_content="APM uses NX bridge for DXF→STL conversion", + proposed_memory_type="knowledge", + proposed_project="apm", + proposed_tags=["apm", "nx", "bridge"], + proposed_confidence=0.7, + reason="duplicates", + ) + new_id = merge_memories(candidate_id=cid, actor="human-triage") + assert new_id is not None + + # Sources superseded + with get_connection() as conn: + s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone() + s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone() + merged = conn.execute( + "SELECT content, status, confidence, reference_count, project " + "FROM memories WHERE id = ?", (new_id,) + ).fetchone() + cand = conn.execute( + "SELECT status, result_memory_id FROM memory_merge_candidates WHERE id = ?", + (cid,), + ).fetchone() + assert s1["status"] == "superseded" + assert s2["status"] == "superseded" + assert merged["status"] == "active" + assert merged["project"] == "apm" + # confidence = max of sources (0.8), not the proposed 0.7 (proposed is hint; + # merge_memories picks max of actual source confidences — verify). + assert merged["confidence"] == pytest.approx(0.8) + # reference_count = sum (3 + 5 = 8) + assert int(merged["reference_count"]) == 8 + assert cand["status"] == "approved" + assert cand["result_memory_id"] == new_id + + +def test_merge_memories_content_override(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "draft A", project="p05-interferometer") + m2 = create_memory("knowledge", "draft B", project="p05-interferometer") + + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], + similarity=0.9, + proposed_content="AI draft", + proposed_memory_type="knowledge", + proposed_project="p05-interferometer", + ) + new_id = merge_memories( + candidate_id=cid, + actor="human-triage", + override_content="human-edited final text", + override_tags=["optics", "custom"], + ) + assert new_id is not None + with get_connection() as conn: + row = conn.execute( + "SELECT content, domain_tags FROM memories WHERE id = ?", (new_id,) + ).fetchone() + assert row["content"] == "human-edited final text" + # domain_tags JSON should contain the override + assert "optics" in row["domain_tags"] + assert "custom" in row["domain_tags"] + + +def test_merge_memories_writes_audit(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "alpha") + m2 = create_memory("knowledge", "alpha variant") + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], similarity=0.9, + proposed_content="alpha merged", + proposed_memory_type="knowledge", proposed_project="", + ) + new_id = merge_memories(candidate_id=cid) + assert new_id + + audit_new = get_memory_audit(new_id) + actions_new = {a["action"] for a in audit_new} + assert "created_via_merge" in actions_new + + audit_m1 = get_memory_audit(m1.id) + actions_m1 = {a["action"] for a in audit_m1} + assert "superseded" in actions_m1 + + +def test_merge_memories_aborts_if_source_not_active(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "one") + m2 = create_memory("knowledge", "two") + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], similarity=0.9, + proposed_content="merged", + proposed_memory_type="knowledge", proposed_project="", + ) + # Tamper: supersede one source before the merge runs + with get_connection() as conn: + conn.execute("UPDATE memories SET status = 'superseded' WHERE id = ?", (m1.id,)) + result = merge_memories(candidate_id=cid) + assert result is None + + # Candidate still pending + pending = get_merge_candidates(status="pending") + assert any(c["id"] == cid for c in pending) + + +def test_merge_memories_rejects_already_resolved(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "x") + m2 = create_memory("knowledge", "y") + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], similarity=0.9, + proposed_content="xy", + proposed_memory_type="knowledge", proposed_project="", + ) + first = merge_memories(candidate_id=cid) + assert first is not None + # second call — already approved, should return None + second = merge_memories(candidate_id=cid) + assert second is None + + +# --- reject_merge_candidate --- + + +def test_reject_merge_candidate_leaves_sources_untouched(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "a") + m2 = create_memory("knowledge", "b") + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], similarity=0.9, + proposed_content="a+b", + proposed_memory_type="knowledge", proposed_project="", + ) + ok = reject_merge_candidate(cid, actor="human-triage", note="false positive") + assert ok + + # Sources still active + with get_connection() as conn: + s1 = conn.execute("SELECT status FROM memories WHERE id = ?", (m1.id,)).fetchone() + s2 = conn.execute("SELECT status FROM memories WHERE id = ?", (m2.id,)).fetchone() + cand = conn.execute( + "SELECT status FROM memory_merge_candidates WHERE id = ?", (cid,) + ).fetchone() + assert s1["status"] == "active" + assert s2["status"] == "active" + assert cand["status"] == "rejected" + + +def test_reject_merge_candidate_idempotent(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "p") + m2 = create_memory("knowledge", "q") + cid = create_merge_candidate( + memory_ids=[m1.id, m2.id], similarity=0.9, + proposed_content="pq", + proposed_memory_type="knowledge", proposed_project="", + ) + assert reject_merge_candidate(cid) is True + # second reject — already rejected, returns False + assert reject_merge_candidate(cid) is False + + +# --- Schema sanity --- + + +def test_merge_candidates_table_exists(tmp_data_dir): + init_db() + with get_connection() as conn: + cols = [r["name"] for r in conn.execute("PRAGMA table_info(memory_merge_candidates)").fetchall()] + expected = {"id", "status", "memory_ids", "similarity", "proposed_content", + "proposed_memory_type", "proposed_project", "proposed_tags", + "proposed_confidence", "reason", "created_at", "resolved_at", + "resolved_by", "result_memory_id"} + assert expected.issubset(set(cols))