diff --git a/deploy/dalidou/batch-extract.sh b/deploy/dalidou/batch-extract.sh index 8bd515b..f78ec1b 100644 --- a/deploy/dalidou/batch-extract.sh +++ b/deploy/dalidou/batch-extract.sh @@ -197,6 +197,19 @@ python3 "$APP_DIR/scripts/memory_dedup.py" \ log "WARN: memory dedup failed (non-blocking)" } +# Step B4: Tag canonicalization (Phase 7C, weekly Sundays) +# Autonomous: LLM proposes alias→canonical maps, auto-applies confidence >= 0.8. +# Projects tokens are protected (skipped on both sides). Borderline proposals +# land in /admin/tags/aliases for human review. +if [[ "$(date -u +%u)" == "7" ]]; then + log "Step B4: tag canonicalization (Sunday)" + python3 "$APP_DIR/scripts/canonicalize_tags.py" \ + --base-url "$ATOCORE_URL" \ + 2>&1 || { + log "WARN: tag canonicalization failed (non-blocking)" + } +fi + # Step G: Integrity check (Phase 4 V1) log "Step G: integrity check" python3 "$APP_DIR/scripts/integrity_check.py" \ diff --git a/scripts/canonicalize_tags.py b/scripts/canonicalize_tags.py new file mode 100644 index 0000000..51bf5b2 --- /dev/null +++ b/scripts/canonicalize_tags.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +"""Phase 7C — tag canonicalization detector. + +Weekly (or on-demand) LLM pass that: + 1. Fetches the tag distribution across all active memories via HTTP + 2. Asks claude-p to propose alias→canonical mappings + 3. AUTO-APPLIES aliases with confidence >= AUTO_APPROVE_CONF (0.8) + 4. Submits lower-confidence proposals as pending for human review + +Autonomous by default — matches the Phase 7A.1 pattern. Set +--no-auto-approve to force every proposal into human review. + +Host-side because claude CLI lives on Dalidou, not the container. +Reuses the PYTHONPATH=src pattern from scripts/memory_dedup.py. + +Usage: + python3 scripts/canonicalize_tags.py [--base-url URL] [--dry-run] [--no-auto-approve] +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +import tempfile +import time +import urllib.error +import urllib.request + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) +if _SRC_DIR not in sys.path: + sys.path.insert(0, _SRC_DIR) + +from atocore.memory._tag_canon_prompt import ( # noqa: E402 + PROTECTED_PROJECT_TOKENS, + SYSTEM_PROMPT, + TAG_CANON_PROMPT_VERSION, + build_user_message, + normalize_alias_item, + parse_canon_output, +) + +DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100") +DEFAULT_MODEL = os.environ.get("ATOCORE_TAG_CANON_MODEL", "sonnet") +DEFAULT_TIMEOUT_S = float(os.environ.get("ATOCORE_TAG_CANON_TIMEOUT_S", "90")) + +AUTO_APPROVE_CONF = float(os.environ.get("ATOCORE_TAG_CANON_AUTO_APPROVE_CONF", "0.8")) +MIN_ALIAS_COUNT = int(os.environ.get("ATOCORE_TAG_CANON_MIN_ALIAS_COUNT", "1")) + +_sandbox_cwd = None + + +def get_sandbox_cwd() -> str: + global _sandbox_cwd + if _sandbox_cwd is None: + _sandbox_cwd = tempfile.mkdtemp(prefix="ato-tagcanon-") + return _sandbox_cwd + + +def api_get(base_url: str, path: str) -> dict: + req = urllib.request.Request(f"{base_url}{path}") + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def api_post(base_url: str, path: str, body: dict | None = None) -> dict: + data = json.dumps(body or {}).encode("utf-8") + req = urllib.request.Request( + f"{base_url}{path}", method="POST", + headers={"Content-Type": "application/json"}, data=data, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def call_claude(user_message: str, model: str, timeout_s: float) -> tuple[str | None, str | None]: + if not shutil.which("claude"): + return None, "claude CLI not available" + args = [ + "claude", "-p", + "--model", model, + "--append-system-prompt", SYSTEM_PROMPT, + "--disable-slash-commands", + user_message, + ] + last_error = "" + for attempt in range(3): + if attempt > 0: + time.sleep(2 ** attempt) + try: + completed = subprocess.run( + args, capture_output=True, text=True, + timeout=timeout_s, cwd=get_sandbox_cwd(), + encoding="utf-8", errors="replace", + ) + except subprocess.TimeoutExpired: + last_error = f"{model} timed out" + continue + except Exception as exc: + last_error = f"subprocess error: {exc}" + continue + if completed.returncode == 0: + return (completed.stdout or "").strip(), None + stderr = (completed.stderr or "").strip()[:200] + last_error = f"{model} exit {completed.returncode}: {stderr}" + return None, last_error + + +def fetch_tag_distribution(base_url: str) -> dict[str, int]: + """Count tag occurrences across active memories (client-side).""" + try: + result = api_get(base_url, "/memory?active_only=true&limit=2000") + except Exception as e: + print(f"ERROR: could not fetch memories: {e}", file=sys.stderr) + return {} + mems = result.get("memories", []) + counts: dict[str, int] = {} + for m in mems: + tags = m.get("domain_tags") or [] + if isinstance(tags, str): + try: + tags = json.loads(tags) + except Exception: + tags = [] + if not isinstance(tags, list): + continue + for t in tags: + if not isinstance(t, str): + continue + key = t.strip().lower() + if key: + counts[key] = counts.get(key, 0) + 1 + return counts + + +def main() -> None: + parser = argparse.ArgumentParser(description="Phase 7C tag canonicalization detector") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--model", default=DEFAULT_MODEL) + parser.add_argument("--timeout-s", type=float, default=DEFAULT_TIMEOUT_S) + parser.add_argument("--no-auto-approve", action="store_true", + help="Disable autonomous apply; all proposals → human queue") + parser.add_argument("--dry-run", action="store_true", + help="Print decisions without touching state") + args = parser.parse_args() + + base = args.base_url.rstrip("/") + autonomous = not args.no_auto_approve + + print( + f"canonicalize_tags {TAG_CANON_PROMPT_VERSION} | model={args.model} | " + f"autonomous={autonomous} | auto-approve conf>={AUTO_APPROVE_CONF}" + ) + + dist = fetch_tag_distribution(base) + print(f"tag distribution: {len(dist)} unique tags, " + f"{sum(dist.values())} total references") + if not dist: + print("no tags found — nothing to canonicalize") + return + + user_msg = build_user_message(dist) + raw, err = call_claude(user_msg, args.model, args.timeout_s) + if err or raw is None: + print(f"ERROR: LLM call failed: {err}", file=sys.stderr) + return + + aliases_raw = parse_canon_output(raw) + print(f"LLM returned {len(aliases_raw)} raw alias proposals") + + auto_applied = 0 + auto_skipped_missing_canonical = 0 + proposals_created = 0 + duplicates_skipped = 0 + + for item in aliases_raw: + norm = normalize_alias_item(item) + if norm is None: + continue + alias = norm["alias"] + canonical = norm["canonical"] + confidence = norm["confidence"] + + alias_count = dist.get(alias, 0) + canonical_count = dist.get(canonical, 0) + + # Sanity: alias must actually exist in the current distribution + if alias_count < MIN_ALIAS_COUNT: + print(f" SKIP {alias!r} → {canonical!r}: alias not in distribution") + continue + if canonical_count == 0: + auto_skipped_missing_canonical += 1 + print(f" SKIP {alias!r} → {canonical!r}: canonical missing from distribution") + continue + + label = f"{alias!r} ({alias_count}) → {canonical!r} ({canonical_count}) conf={confidence:.2f}" + + auto_apply = autonomous and confidence >= AUTO_APPROVE_CONF + if auto_apply: + if args.dry_run: + auto_applied += 1 + print(f" [dry-run] would auto-apply: {label}") + continue + try: + result = api_post(base, "/admin/tags/aliases/apply", { + "alias": alias, "canonical": canonical, + "confidence": confidence, "reason": norm["reason"], + "alias_count": alias_count, "canonical_count": canonical_count, + "actor": "auto-tag-canon", + }) + touched = result.get("memories_touched", 0) + auto_applied += 1 + print(f" ✅ auto-applied: {label} ({touched} memories)") + except Exception as e: + print(f" ⚠️ auto-apply failed: {label} — {e}", file=sys.stderr) + time.sleep(0.2) + continue + + # Lower confidence → human review + if args.dry_run: + proposals_created += 1 + print(f" [dry-run] would propose for review: {label}") + continue + try: + result = api_post(base, "/admin/tags/aliases/propose", { + "alias": alias, "canonical": canonical, + "confidence": confidence, "reason": norm["reason"], + "alias_count": alias_count, "canonical_count": canonical_count, + }) + if result.get("proposal_id"): + proposals_created += 1 + print(f" → pending proposal: {label}") + else: + duplicates_skipped += 1 + print(f" (duplicate pending proposal): {label}") + except Exception as e: + print(f" ⚠️ propose failed: {label} — {e}", file=sys.stderr) + time.sleep(0.2) + + print( + f"\nsummary: proposals_seen={len(aliases_raw)} " + f"auto_applied={auto_applied} " + f"proposals_created={proposals_created} " + f"duplicates_skipped={duplicates_skipped} " + f"skipped_missing_canonical={auto_skipped_missing_canonical}" + ) + + +if __name__ == "__main__": + main() diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index f35c2a1..a38d04e 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -1517,6 +1517,125 @@ def api_graduation_status() -> dict: return out +# --- Phase 7C: tag canonicalization --- + + +class TagAliasProposalBody(BaseModel): + alias: str + canonical: str + confidence: float = 0.0 + reason: str = "" + alias_count: int = 0 + canonical_count: int = 0 + + +class TagAliasApplyBody(BaseModel): + alias: str + canonical: str + confidence: float = 0.9 + reason: str = "" + alias_count: int = 0 + canonical_count: int = 0 + actor: str = "auto-tag-canon" + + +class TagAliasResolveBody(BaseModel): + actor: str = "human-triage" + + +@router.get("/admin/tags/distribution") +def api_tag_distribution() -> dict: + """Current tag distribution across active memories (for UI / debug).""" + from atocore.memory.service import get_tag_distribution + dist = get_tag_distribution() + sorted_tags = sorted(dist.items(), key=lambda x: x[1], reverse=True) + return {"total_references": sum(dist.values()), "unique_tags": len(dist), + "tags": [{"tag": t, "count": c} for t, c in sorted_tags]} + + +@router.get("/admin/tags/aliases") +def api_list_tag_aliases(status: str = "pending", limit: int = 100) -> dict: + """List tag alias proposals (default: pending for review).""" + from atocore.memory.service import get_tag_alias_proposals + rows = get_tag_alias_proposals(status=status, limit=limit) + return {"proposals": rows, "count": len(rows)} + + +@router.post("/admin/tags/aliases/propose") +def api_propose_tag_alias(body: TagAliasProposalBody) -> dict: + """Submit a low-confidence alias proposal for human review.""" + from atocore.memory.service import create_tag_alias_proposal + pid = create_tag_alias_proposal( + alias=body.alias, canonical=body.canonical, + confidence=body.confidence, alias_count=body.alias_count, + canonical_count=body.canonical_count, reason=body.reason, + ) + if pid is None: + return {"proposal_id": None, "duplicate": True} + return {"proposal_id": pid, "duplicate": False} + + +@router.post("/admin/tags/aliases/apply") +def api_apply_tag_alias(body: TagAliasApplyBody) -> dict: + """Apply an alias rewrite directly (used by the auto-approval path). + + Creates a tag_aliases row in status=approved with the apply result + recorded, so autonomous merges land in the same audit surface as + human approvals. + """ + from datetime import datetime as _dt, timezone as _tz + + from atocore.memory.service import apply_tag_alias, create_tag_alias_proposal + from atocore.models.database import get_connection + + # Record proposal + apply + mark approved in one flow + pid = create_tag_alias_proposal( + alias=body.alias, canonical=body.canonical, + confidence=body.confidence, alias_count=body.alias_count, + canonical_count=body.canonical_count, reason=body.reason, + ) + if pid is None: + # A pending proposal already exists — don't double-apply. + raise HTTPException(status_code=409, detail="A pending proposal already exists for this (alias, canonical) pair — approve it via /admin/tags/aliases/{id}/approve") + try: + result = apply_tag_alias(body.alias, body.canonical, actor=body.actor) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + now_str = _dt.now(_tz.utc).strftime("%Y-%m-%d %H:%M:%S") + with get_connection() as conn: + conn.execute( + "UPDATE tag_aliases SET status = 'approved', resolved_at = ?, " + "resolved_by = ?, applied_to_memories = ? WHERE id = ?", + (now_str, body.actor, result["memories_touched"], pid), + ) + return { + "proposal_id": pid, + "memories_touched": result["memories_touched"], + "alias": body.alias, "canonical": body.canonical, + } + + +@router.post("/admin/tags/aliases/{proposal_id}/approve") +def api_approve_tag_alias(proposal_id: str, body: TagAliasResolveBody) -> dict: + """Human-in-the-loop approve for a pending proposal.""" + from atocore.memory.service import approve_tag_alias + result = approve_tag_alias(proposal_id, actor=body.actor) + if result is None: + raise HTTPException(status_code=404, detail="Proposal not found or already resolved") + return {"status": "approved", "proposal_id": proposal_id, + "memories_touched": result["memories_touched"]} + + +@router.post("/admin/tags/aliases/{proposal_id}/reject") +def api_reject_tag_alias(proposal_id: str, body: TagAliasResolveBody) -> dict: + """Human-in-the-loop reject for a pending proposal.""" + from atocore.memory.service import reject_tag_alias + if not reject_tag_alias(proposal_id, actor=body.actor): + raise HTTPException(status_code=404, detail="Proposal not found or already resolved") + return {"status": "rejected", "proposal_id": proposal_id} + + class DecayRunBody(BaseModel): idle_days_threshold: int = 30 daily_decay_factor: float = 0.97 diff --git a/src/atocore/memory/_tag_canon_prompt.py b/src/atocore/memory/_tag_canon_prompt.py new file mode 100644 index 0000000..5104afe --- /dev/null +++ b/src/atocore/memory/_tag_canon_prompt.py @@ -0,0 +1,158 @@ +"""Shared LLM prompt + parser for tag canonicalization (Phase 7C). + +Stdlib-only, importable from both the in-container service layer and the +host-side batch script that shells out to ``claude -p``. + +The prompt instructs the model to propose a map of domain_tag aliases +to their canonical form. Confidence is key here — we AUTO-APPLY high- +confidence aliases; low-confidence go to human review. Over-merging +distinct concepts ("optics" vs "optical" — sometimes equivalent, +sometimes not) destroys cross-cutting retrieval, so the model is +instructed to err conservative. +""" + +from __future__ import annotations + +import json +from typing import Any + +TAG_CANON_PROMPT_VERSION = "tagcanon-0.1.0" +MAX_TAGS_IN_PROMPT = 100 + +SYSTEM_PROMPT = """You canonicalize domain tags for AtoCore's memory layer. + +Input: a distribution of lowercase domain tags (keyword → usage count across active memories). Examples: "firmware: 23", "fw: 5", "firmware-control: 3", "optics: 18", "optical: 2". + +Your job: identify aliases — distinct strings that refer to the SAME concept — and map them to a single canonical form. The canonical should be the clearest / most-used / most-descriptive variant. + +STRICT RULES: + +1. ONLY propose aliases that are UNAMBIGUOUSLY equivalent. Examples: + - "fw" → "firmware" (abbreviation) + - "firmware-control" → "firmware" (compound narrowing — only if usage context makes it clear the narrower one is never used to DISTINGUISH from firmware-in-general) + - "py" → "python" + - "ml" → "machine-learning" + Do NOT merge: + - "optics" vs "optical" — these CAN diverge ("optics" = subsystem/product domain; "optical" = adjective used in non-optics contexts) + - "p04" vs "p04-gigabit" — project ids are their own namespace, never canonicalize + - "thermal" vs "temperature" — related but distinct + - Anything where you're not sure — skip it, human review will catch real aliases next week + +2. Confidence scale: + 0.9+ obvious abbreviation, very high usage disparity, no plausible alternative meaning + 0.7-0.9 likely alias, one-word-diff or standard contraction + 0.5-0.7 plausible but requires context — low count on alias side + <0.5 DO NOT PROPOSE — if you're under 0.5, skip the pair entirely + AtoCore auto-applies aliases at confidence >= 0.8; anything below goes to human review. + +3. The CANONICAL must actually appear in the input list (don't invent a new term). + +4. Never propose `alias == canonical`. Never propose circular mappings. + +5. Project tags (p04, p05, p06, abb-space, atomizer-v2, atocore, apm) are OFF LIMITS — they are project identifiers, not concepts. Leave them alone entirely. + +OUTPUT — raw JSON, no prose, no markdown fences: +{ + "aliases": [ + {"alias": "fw", "canonical": "firmware", "confidence": 0.95, "reason": "fw is a standard abbreviation of firmware; 5 uses vs 23"}, + {"alias": "ml", "canonical": "machine-learning", "confidence": 0.90, "reason": "ml is the universal abbreviation"} + ] +} + +Empty aliases list is fine if nothing in the distribution is a clear alias. Err conservative — one false merge can pollute retrieval for hundreds of memories.""" + + +def build_user_message(tag_distribution: dict[str, int]) -> str: + """Format the tag distribution for the model. + + Limited to MAX_TAGS_IN_PROMPT entries, sorted by count descending + so high-usage tags appear first (the LLM uses them as anchor points + for canonical selection). + """ + if not tag_distribution: + return "Empty tag distribution — return {\"aliases\": []}." + + sorted_tags = sorted(tag_distribution.items(), key=lambda x: x[1], reverse=True) + top = sorted_tags[:MAX_TAGS_IN_PROMPT] + lines = [f"{tag}: {count}" for tag, count in top] + return ( + f"Tag distribution across {sum(tag_distribution.values())} total tag references " + f"(showing top {len(top)} of {len(tag_distribution)} unique tags):\n\n" + + "\n".join(lines) + + "\n\nReturn the JSON aliases map now. Only propose UNAMBIGUOUS equivalents." + ) + + +def parse_canon_output(raw_output: str) -> list[dict[str, Any]]: + """Strip markdown fences / prose and return the parsed aliases list.""" + text = (raw_output or "").strip() + if text.startswith("```"): + text = text.strip("`") + nl = text.find("\n") + if nl >= 0: + text = text[nl + 1:] + if text.endswith("```"): + text = text[:-3] + text = text.strip() + + if not text.lstrip().startswith("{"): + start = text.find("{") + end = text.rfind("}") + if start >= 0 and end > start: + text = text[start:end + 1] + + try: + parsed = json.loads(text) + except json.JSONDecodeError: + return [] + + if not isinstance(parsed, dict): + return [] + aliases = parsed.get("aliases") or [] + if not isinstance(aliases, list): + return [] + return [a for a in aliases if isinstance(a, dict)] + + +# Project tokens that must never be canonicalized — they're project ids, +# not concepts. Keep this list in sync with the registered projects. +# Safe to be over-inclusive; extra entries just skip canonicalization. +PROTECTED_PROJECT_TOKENS = frozenset({ + "p04", "p04-gigabit", + "p05", "p05-interferometer", + "p06", "p06-polisher", + "p08", "abb-space", + "atomizer", "atomizer-v2", + "atocore", "apm", +}) + + +def normalize_alias_item(item: dict[str, Any]) -> dict[str, Any] | None: + """Validate one raw alias proposal. Returns None if unusable. + + Filters: non-strings, empty strings, identity mappings, protected + project tokens on either side. + """ + alias = str(item.get("alias") or "").strip().lower() + canonical = str(item.get("canonical") or "").strip().lower() + if not alias or not canonical: + return None + if alias == canonical: + return None + if alias in PROTECTED_PROJECT_TOKENS or canonical in PROTECTED_PROJECT_TOKENS: + return None + + try: + confidence = float(item.get("confidence", 0.0)) + except (TypeError, ValueError): + confidence = 0.0 + confidence = max(0.0, min(1.0, confidence)) + + reason = str(item.get("reason") or "").strip()[:300] + + return { + "alias": alias, + "canonical": canonical, + "confidence": confidence, + "reason": reason, + } diff --git a/src/atocore/memory/service.py b/src/atocore/memory/service.py index d62a853..caab3b4 100644 --- a/src/atocore/memory/service.py +++ b/src/atocore/memory/service.py @@ -1038,6 +1038,224 @@ def _validate_confidence(confidence: float) -> None: raise ValueError("Confidence must be between 0.0 and 1.0") +# --------------------------------------------------------------------- +# Phase 7C — Tag canonicalization +# --------------------------------------------------------------------- + + +def get_tag_distribution( + active_only: bool = True, + min_count: int = 1, +) -> dict[str, int]: + """Return {tag: occurrence_count} across memories for LLM input. + + Used by the canonicalization detector to spot alias clusters like + {firmware: 23, fw: 5, firmware-control: 3}. Only counts memories + in the requested status (active by default) so superseded/invalid + rows don't bias the distribution. + """ + import json as _json + counts: dict[str, int] = {} + query = "SELECT domain_tags FROM memories" + if active_only: + query += " WHERE status = 'active'" + with get_connection() as conn: + rows = conn.execute(query).fetchall() + for r in rows: + tags_raw = r["domain_tags"] + try: + tags = _json.loads(tags_raw) if tags_raw else [] + except Exception: + tags = [] + if not isinstance(tags, list): + continue + for t in tags: + if not isinstance(t, str): + continue + key = t.strip().lower() + if key: + counts[key] = counts.get(key, 0) + 1 + if min_count > 1: + counts = {k: v for k, v in counts.items() if v >= min_count} + return counts + + +def apply_tag_alias( + alias: str, + canonical: str, + actor: str = "tag-canon", +) -> dict: + """Rewrite every active memory's domain_tags: alias → canonical. + + Atomic per-memory. Dedupes within each memory's tag list (so if a + memory already has both alias AND canonical, we drop the alias and + keep canonical without duplicating). Writes one audit row per + touched memory with action="tag_canonicalized" so the full trail + is recoverable. + + Returns {"memories_touched": int, "alias": ..., "canonical": ...}. + """ + import json as _json + alias = (alias or "").strip().lower() + canonical = (canonical or "").strip().lower() + if not alias or not canonical: + raise ValueError("alias and canonical must be non-empty") + if alias == canonical: + raise ValueError("alias cannot equal canonical") + + touched: list[tuple[str, list[str], list[str]]] = [] + + with get_connection() as conn: + rows = conn.execute( + "SELECT id, domain_tags FROM memories WHERE status = 'active'" + ).fetchall() + + for r in rows: + raw = r["domain_tags"] + try: + tags = _json.loads(raw) if raw else [] + except Exception: + tags = [] + if not isinstance(tags, list): + continue + if alias not in tags: + continue + + old_tags = [t for t in tags if isinstance(t, str)] + new_tags: list[str] = [] + for t in old_tags: + rewritten = canonical if t == alias else t + if rewritten not in new_tags: + new_tags.append(rewritten) + if new_tags == old_tags: + continue + conn.execute( + "UPDATE memories SET domain_tags = ?, updated_at = CURRENT_TIMESTAMP " + "WHERE id = ?", + (_json.dumps(new_tags), r["id"]), + ) + touched.append((r["id"], old_tags, new_tags)) + + # Audit rows outside the transaction + for mem_id, old_tags, new_tags in touched: + _audit_memory( + memory_id=mem_id, + action="tag_canonicalized", + actor=actor, + before={"domain_tags": old_tags}, + after={"domain_tags": new_tags}, + note=f"{alias} → {canonical}", + ) + + if touched: + log.info("tag_alias_applied", alias=alias, canonical=canonical, memories_touched=len(touched)) + return { + "memories_touched": len(touched), + "alias": alias, + "canonical": canonical, + } + + +def create_tag_alias_proposal( + alias: str, + canonical: str, + confidence: float, + alias_count: int = 0, + canonical_count: int = 0, + reason: str = "", +) -> str | None: + """Insert a tag_aliases row in status=pending. + + Idempotent: if a pending proposal for (alias, canonical) already + exists, returns None. + """ + import json as _json # noqa: F401 — kept for parity with other helpers + alias = (alias or "").strip().lower() + canonical = (canonical or "").strip().lower() + if not alias or not canonical or alias == canonical: + return None + confidence = max(0.0, min(1.0, float(confidence))) + + proposal_id = str(uuid.uuid4()) + with get_connection() as conn: + existing = conn.execute( + "SELECT id FROM tag_aliases WHERE alias = ? AND canonical = ? " + "AND status = 'pending'", + (alias, canonical), + ).fetchone() + if existing: + return None + + conn.execute( + "INSERT INTO tag_aliases (id, alias, canonical, status, confidence, " + "alias_count, canonical_count, reason) " + "VALUES (?, ?, ?, 'pending', ?, ?, ?, ?)", + (proposal_id, alias, canonical, confidence, + int(alias_count), int(canonical_count), reason[:500]), + ) + log.info( + "tag_alias_proposed", + proposal_id=proposal_id, + alias=alias, + canonical=canonical, + confidence=round(confidence, 3), + ) + return proposal_id + + +def get_tag_alias_proposals(status: str = "pending", limit: int = 100) -> list[dict]: + """List tag alias proposals.""" + with get_connection() as conn: + rows = conn.execute( + "SELECT * FROM tag_aliases WHERE status = ? " + "ORDER BY confidence DESC, created_at DESC LIMIT ?", + (status, limit), + ).fetchall() + return [dict(r) for r in rows] + + +def approve_tag_alias( + proposal_id: str, + actor: str = "human-triage", +) -> dict | None: + """Apply the alias rewrite + mark the proposal approved. + + Returns the apply_tag_alias result dict, or None if the proposal + is not found or already resolved. + """ + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + with get_connection() as conn: + row = conn.execute( + "SELECT alias, canonical, status FROM tag_aliases WHERE id = ?", + (proposal_id,), + ).fetchone() + if row is None or row["status"] != "pending": + return None + alias, canonical = row["alias"], row["canonical"] + + result = apply_tag_alias(alias, canonical, actor=actor) + + with get_connection() as conn: + conn.execute( + "UPDATE tag_aliases SET status = 'approved', resolved_at = ?, " + "resolved_by = ?, applied_to_memories = ? WHERE id = ?", + (now_str, actor, result["memories_touched"], proposal_id), + ) + return result + + +def reject_tag_alias(proposal_id: str, actor: str = "human-triage") -> bool: + """Mark a tag alias proposal as rejected without applying it.""" + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + with get_connection() as conn: + result = conn.execute( + "UPDATE tag_aliases SET status = 'rejected', resolved_at = ?, " + "resolved_by = ? WHERE id = ? AND status = 'pending'", + (now_str, actor, proposal_id), + ) + return result.rowcount > 0 + + # --------------------------------------------------------------------- # Phase 7A — Memory Consolidation: merge-candidate lifecycle # --------------------------------------------------------------------- diff --git a/src/atocore/models/database.py b/src/atocore/models/database.py index b9605ec..0f11187 100644 --- a/src/atocore/models/database.py +++ b/src/atocore/models/database.py @@ -287,6 +287,33 @@ def _apply_migrations(conn: sqlite3.Connection) -> None: "CREATE INDEX IF NOT EXISTS idx_mmc_created_at ON memory_merge_candidates(created_at)" ) + # Phase 7C (Memory Consolidation — tag canonicalization): alias → canonical + # map for domain_tags. A weekly LLM pass proposes rows here; high-confidence + # ones auto-apply (rewrite domain_tags across all memories), low-confidence + # ones stay pending for human approval. Immutable history: resolved rows + # keep status=approved/rejected; the same alias can re-appear with a new + # id if the tag reaches a different canonical later. + conn.execute( + """ + CREATE TABLE IF NOT EXISTS tag_aliases ( + id TEXT PRIMARY KEY, + alias TEXT NOT NULL, + canonical TEXT NOT NULL, + status TEXT DEFAULT 'pending', + confidence REAL DEFAULT 0.0, + alias_count INTEGER DEFAULT 0, + canonical_count INTEGER DEFAULT 0, + reason TEXT DEFAULT '', + applied_to_memories INTEGER DEFAULT 0, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + resolved_at DATETIME, + resolved_by TEXT + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_tag_aliases_status ON tag_aliases(status)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_tag_aliases_alias ON tag_aliases(alias)") + def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() diff --git a/tests/test_tag_canon.py b/tests/test_tag_canon.py new file mode 100644 index 0000000..f08bfc5 --- /dev/null +++ b/tests/test_tag_canon.py @@ -0,0 +1,296 @@ +"""Phase 7C — tag canonicalization tests. + +Covers: + - prompt parser (fences, prose, empty) + - normalizer (identity, protected tokens, empty) + - get_tag_distribution counts across active memories + - apply_tag_alias rewrites + dedupes + audits + - create / approve / reject lifecycle + - idempotency (dup proposals skipped) +""" + +from __future__ import annotations + +import pytest + +from atocore.memory._tag_canon_prompt import ( + PROTECTED_PROJECT_TOKENS, + build_user_message, + normalize_alias_item, + parse_canon_output, +) +from atocore.memory.service import ( + apply_tag_alias, + approve_tag_alias, + create_memory, + create_tag_alias_proposal, + get_memory_audit, + get_tag_alias_proposals, + get_tag_distribution, + reject_tag_alias, +) +from atocore.models.database import get_connection, init_db + + +# --- Prompt parser --- + + +def test_parse_canon_output_handles_fences(): + raw = "```json\n{\"aliases\": [{\"alias\": \"fw\", \"canonical\": \"firmware\", \"confidence\": 0.9}]}\n```" + items = parse_canon_output(raw) + assert len(items) == 1 + assert items[0]["alias"] == "fw" + + +def test_parse_canon_output_handles_prose_prefix(): + raw = "Here you go:\n{\"aliases\": [{\"alias\": \"ml\", \"canonical\": \"machine-learning\", \"confidence\": 0.9}]}" + items = parse_canon_output(raw) + assert len(items) == 1 + + +def test_parse_canon_output_empty_list(): + assert parse_canon_output("{\"aliases\": []}") == [] + + +def test_parse_canon_output_malformed(): + assert parse_canon_output("not json at all") == [] + assert parse_canon_output("") == [] + + +# --- Normalizer --- + + +def test_normalize_alias_strips_and_lowercases(): + n = normalize_alias_item({"alias": " FW ", "canonical": "Firmware", "confidence": 0.95, "reason": "abbrev"}) + assert n == {"alias": "fw", "canonical": "firmware", "confidence": 0.95, "reason": "abbrev"} + + +def test_normalize_rejects_identity(): + assert normalize_alias_item({"alias": "foo", "canonical": "foo", "confidence": 0.9}) is None + + +def test_normalize_rejects_empty(): + assert normalize_alias_item({"alias": "", "canonical": "foo", "confidence": 0.9}) is None + assert normalize_alias_item({"alias": "foo", "canonical": "", "confidence": 0.9}) is None + + +def test_normalize_protects_project_tokens(): + # Project ids must not be canonicalized — they're their own namespace + assert "p04" in PROTECTED_PROJECT_TOKENS + assert normalize_alias_item({"alias": "p04", "canonical": "p04-gigabit", "confidence": 1.0}) is None + assert normalize_alias_item({"alias": "p04-gigabit", "canonical": "p04", "confidence": 1.0}) is None + assert normalize_alias_item({"alias": "apm", "canonical": "part-manager", "confidence": 1.0}) is None + + +def test_normalize_clamps_confidence(): + hi = normalize_alias_item({"alias": "a", "canonical": "b", "confidence": 2.5}) + assert hi["confidence"] == 1.0 + lo = normalize_alias_item({"alias": "a", "canonical": "b", "confidence": -0.5}) + assert lo["confidence"] == 0.0 + + +def test_normalize_handles_non_numeric_confidence(): + n = normalize_alias_item({"alias": "a", "canonical": "b", "confidence": "not a number"}) + assert n is not None and n["confidence"] == 0.0 + + +# --- build_user_message --- + + +def test_build_user_message_includes_top_tags(): + dist = {"firmware": 23, "fw": 5, "optics": 18, "optical": 2} + msg = build_user_message(dist) + assert "firmware: 23" in msg + assert "optics: 18" in msg + assert "aliases" in msg.lower() or "JSON" in msg + + +def test_build_user_message_empty(): + msg = build_user_message({}) + assert "Empty" in msg or "empty" in msg + + +# --- get_tag_distribution --- + + +def test_tag_distribution_counts_active_only(tmp_data_dir): + init_db() + create_memory("knowledge", "a", domain_tags=["firmware", "p06"]) + create_memory("knowledge", "b", domain_tags=["firmware"]) + create_memory("knowledge", "c", domain_tags=["optics"]) + + # Add an invalid memory — should NOT be counted + m_invalid = create_memory("knowledge", "d", domain_tags=["firmware", "ignored"]) + with get_connection() as conn: + conn.execute("UPDATE memories SET status = 'invalid' WHERE id = ?", (m_invalid.id,)) + + dist = get_tag_distribution() + assert dist.get("firmware") == 2 # two active memories + assert dist.get("optics") == 1 + assert dist.get("p06") == 1 + assert "ignored" not in dist + + +def test_tag_distribution_min_count_filter(tmp_data_dir): + init_db() + create_memory("knowledge", "a", domain_tags=["firmware"]) + create_memory("knowledge", "b", domain_tags=["firmware"]) + create_memory("knowledge", "c", domain_tags=["once"]) + + dist = get_tag_distribution(min_count=2) + assert "firmware" in dist + assert "once" not in dist + + +# --- apply_tag_alias --- + + +def test_apply_tag_alias_rewrites_across_memories(tmp_data_dir): + init_db() + m1 = create_memory("knowledge", "a", domain_tags=["fw", "p06"]) + m2 = create_memory("knowledge", "b", domain_tags=["fw"]) + m3 = create_memory("knowledge", "c", domain_tags=["optics"]) # untouched + + result = apply_tag_alias("fw", "firmware") + assert result["memories_touched"] == 2 + + import json as _json + with get_connection() as conn: + r1 = conn.execute("SELECT domain_tags FROM memories WHERE id = ?", (m1.id,)).fetchone() + r2 = conn.execute("SELECT domain_tags FROM memories WHERE id = ?", (m2.id,)).fetchone() + r3 = conn.execute("SELECT domain_tags FROM memories WHERE id = ?", (m3.id,)).fetchone() + assert "firmware" in _json.loads(r1["domain_tags"]) + assert "fw" not in _json.loads(r1["domain_tags"]) + assert "firmware" in _json.loads(r2["domain_tags"]) + assert _json.loads(r3["domain_tags"]) == ["optics"] # untouched + + +def test_apply_tag_alias_dedupes_when_both_present(tmp_data_dir): + """Memory has both fw AND firmware → rewrite collapses to just firmware.""" + init_db() + m = create_memory("knowledge", "dual-tagged", domain_tags=["fw", "firmware", "p06"]) + + result = apply_tag_alias("fw", "firmware") + assert result["memories_touched"] == 1 + + import json as _json + with get_connection() as conn: + r = conn.execute("SELECT domain_tags FROM memories WHERE id = ?", (m.id,)).fetchone() + tags = _json.loads(r["domain_tags"]) + assert tags.count("firmware") == 1 + assert "fw" not in tags + assert "p06" in tags + + +def test_apply_tag_alias_skips_memories_without_alias(tmp_data_dir): + init_db() + m = create_memory("knowledge", "no match", domain_tags=["optics", "p04"]) + result = apply_tag_alias("fw", "firmware") + assert result["memories_touched"] == 0 + + +def test_apply_tag_alias_writes_audit(tmp_data_dir): + init_db() + m = create_memory("knowledge", "audited", domain_tags=["fw"]) + apply_tag_alias("fw", "firmware", actor="auto-tag-canon") + + audit = get_memory_audit(m.id) + actions = [a["action"] for a in audit] + assert "tag_canonicalized" in actions + entry = next(a for a in audit if a["action"] == "tag_canonicalized") + assert entry["actor"] == "auto-tag-canon" + assert "fw → firmware" in entry["note"] + assert "fw" in entry["before"]["domain_tags"] + assert "firmware" in entry["after"]["domain_tags"] + + +def test_apply_tag_alias_rejects_identity(tmp_data_dir): + init_db() + with pytest.raises(ValueError): + apply_tag_alias("foo", "foo") + + +def test_apply_tag_alias_rejects_empty(tmp_data_dir): + init_db() + with pytest.raises(ValueError): + apply_tag_alias("", "firmware") + + +# --- Proposal lifecycle --- + + +def test_create_proposal_inserts_pending(tmp_data_dir): + init_db() + pid = create_tag_alias_proposal("fw", "firmware", confidence=0.65, + alias_count=5, canonical_count=23, + reason="standard abbreviation") + assert pid is not None + + rows = get_tag_alias_proposals(status="pending") + assert len(rows) == 1 + assert rows[0]["alias"] == "fw" + assert rows[0]["confidence"] == pytest.approx(0.65) + + +def test_create_proposal_idempotent(tmp_data_dir): + init_db() + first = create_tag_alias_proposal("fw", "firmware", confidence=0.6) + second = create_tag_alias_proposal("fw", "firmware", confidence=0.7) + assert first is not None + assert second is None + + +def test_approve_applies_rewrite(tmp_data_dir): + init_db() + m = create_memory("knowledge", "x", domain_tags=["fw"]) + pid = create_tag_alias_proposal("fw", "firmware", confidence=0.7) + result = approve_tag_alias(pid, actor="human-triage") + assert result is not None + assert result["memories_touched"] == 1 + + # Proposal now approved with applied_to_memories recorded + rows = get_tag_alias_proposals(status="approved") + assert len(rows) == 1 + assert rows[0]["applied_to_memories"] == 1 + + # Memory actually rewritten + import json as _json + with get_connection() as conn: + r = conn.execute("SELECT domain_tags FROM memories WHERE id = ?", (m.id,)).fetchone() + assert "firmware" in _json.loads(r["domain_tags"]) + + +def test_approve_already_resolved_returns_none(tmp_data_dir): + init_db() + pid = create_tag_alias_proposal("a", "b", confidence=0.6) + approve_tag_alias(pid) + assert approve_tag_alias(pid) is None # second approve — no-op + + +def test_reject_leaves_memories_untouched(tmp_data_dir): + init_db() + m = create_memory("knowledge", "x", domain_tags=["fw"]) + pid = create_tag_alias_proposal("fw", "firmware", confidence=0.6) + assert reject_tag_alias(pid) + + rows = get_tag_alias_proposals(status="rejected") + assert len(rows) == 1 + + # Memory still has the original tag + import json as _json + with get_connection() as conn: + r = conn.execute("SELECT domain_tags FROM memories WHERE id = ?", (m.id,)).fetchone() + assert "fw" in _json.loads(r["domain_tags"]) + + +# --- Schema sanity --- + + +def test_tag_aliases_table_exists(tmp_data_dir): + init_db() + with get_connection() as conn: + cols = [r["name"] for r in conn.execute("PRAGMA table_info(tag_aliases)").fetchall()] + expected = {"id", "alias", "canonical", "status", "confidence", + "alias_count", "canonical_count", "reason", + "applied_to_memories", "created_at", "resolved_at", "resolved_by"} + assert expected.issubset(set(cols))