"""Memory Core — structured memory management. Memory types (per Master Plan): - identity: who the user is, role, background - preference: how they like to work, style, tools - project: project-specific knowledge and context - episodic: what happened, conversations, events - knowledge: verified facts, technical knowledge - adaptation: learned corrections, behavioral adjustments Memories have: - confidence (0.0–1.0): how certain we are - status: lifecycle state, one of MEMORY_STATUSES * candidate: extracted from an interaction, awaiting human review (Phase 9 Commit C). Candidates are NEVER included in context packs. * active: promoted/curated, visible to retrieval and context * superseded: replaced by a newer entry * invalid: rejected / error-corrected - last_referenced_at / reference_count: reinforcement signal (Phase 9 Commit B). Bumped whenever a captured interaction's response content echoes this memory. - optional link to source chunk: traceability """ import uuid from dataclasses import dataclass from datetime import datetime, timezone from atocore.models.database import get_connection from atocore.observability.logger import get_logger from atocore.projects.registry import resolve_project_name log = get_logger("memory") MEMORY_TYPES = [ "identity", "preference", "project", "episodic", "knowledge", "adaptation", ] MEMORY_STATUSES = [ "candidate", "active", "superseded", "invalid", ] @dataclass class Memory: id: str memory_type: str content: str project: str source_chunk_id: str confidence: float status: str created_at: str updated_at: str last_referenced_at: str = "" reference_count: int = 0 def create_memory( memory_type: str, content: str, project: str = "", source_chunk_id: str = "", confidence: float = 1.0, status: str = "active", ) -> Memory: """Create a new memory entry. ``status`` defaults to ``active`` for backward compatibility. Pass ``candidate`` when the memory is being proposed by the Phase 9 Commit C extractor and still needs human review before it can influence context. """ if memory_type not in MEMORY_TYPES: raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}") if status not in MEMORY_STATUSES: raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}") _validate_confidence(confidence) # Canonicalize the project through the registry so an alias and # the canonical id store under the same bucket. This keeps # reinforcement queries (which use the interaction's project) and # context retrieval (which uses the registry-canonicalized hint) # consistent with how memories are created. project = resolve_project_name(project) memory_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() # Check for duplicate content within the same type+project at the same status. # Scoping by status keeps active curation separate from the candidate # review queue: a candidate and an active memory with identical text can # legitimately coexist if the candidate is a fresh extraction of something # already curated. with get_connection() as conn: existing = conn.execute( "SELECT id FROM memories " "WHERE memory_type = ? AND content = ? AND project = ? AND status = ?", (memory_type, content, project, status), ).fetchone() if existing: log.info( "memory_duplicate_skipped", memory_type=memory_type, status=status, content_preview=content[:80], ) return _row_to_memory( conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone() ) conn.execute( "INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (memory_id, memory_type, content, project, source_chunk_id or None, confidence, status), ) log.info( "memory_created", memory_type=memory_type, status=status, content_preview=content[:80], ) return Memory( id=memory_id, memory_type=memory_type, content=content, project=project, source_chunk_id=source_chunk_id, confidence=confidence, status=status, created_at=now, updated_at=now, last_referenced_at="", reference_count=0, ) def get_memories( memory_type: str | None = None, project: str | None = None, active_only: bool = True, min_confidence: float = 0.0, limit: int = 50, status: str | None = None, ) -> list[Memory]: """Retrieve memories, optionally filtered. When ``status`` is provided explicitly, it takes precedence over ``active_only`` so callers can list the candidate review queue via ``get_memories(status='candidate')``. When ``status`` is omitted the legacy ``active_only`` behaviour still applies. """ if status is not None and status not in MEMORY_STATUSES: raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}") query = "SELECT * FROM memories WHERE 1=1" params: list = [] if memory_type: query += " AND memory_type = ?" params.append(memory_type) if project is not None: # Canonicalize on the read side so a caller passing an alias # finds rows that were stored under the canonical id (and # vice versa). resolve_project_name returns the input # unchanged for unregistered names so empty-string queries # for "no project scope" still work. query += " AND project = ?" params.append(resolve_project_name(project)) if status is not None: query += " AND status = ?" params.append(status) elif active_only: query += " AND status = 'active'" if min_confidence > 0: query += " AND confidence >= ?" params.append(min_confidence) query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?" params.append(limit) with get_connection() as conn: rows = conn.execute(query, params).fetchall() return [_row_to_memory(r) for r in rows] def update_memory( memory_id: str, content: str | None = None, confidence: float | None = None, status: str | None = None, ) -> bool: """Update an existing memory.""" with get_connection() as conn: existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone() if existing is None: return False next_content = content if content is not None else existing["content"] next_status = status if status is not None else existing["status"] if confidence is not None: _validate_confidence(confidence) if next_status == "active": duplicate = conn.execute( "SELECT id FROM memories " "WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?", (existing["memory_type"], next_content, existing["project"] or "", memory_id), ).fetchone() if duplicate: raise ValueError("Update would create a duplicate active memory") updates = [] params: list = [] if content is not None: updates.append("content = ?") params.append(content) if confidence is not None: updates.append("confidence = ?") params.append(confidence) if status is not None: if status not in MEMORY_STATUSES: raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}") updates.append("status = ?") params.append(status) if not updates: return False updates.append("updated_at = CURRENT_TIMESTAMP") params.append(memory_id) result = conn.execute( f"UPDATE memories SET {', '.join(updates)} WHERE id = ?", params, ) if result.rowcount > 0: log.info("memory_updated", memory_id=memory_id) return True return False def invalidate_memory(memory_id: str) -> bool: """Mark a memory as invalid (error correction).""" return update_memory(memory_id, status="invalid") def supersede_memory(memory_id: str) -> bool: """Mark a memory as superseded (replaced by newer info).""" return update_memory(memory_id, status="superseded") def promote_memory(memory_id: str) -> bool: """Promote a candidate memory to active (Phase 9 Commit C review queue). Returns False if the memory does not exist or is not currently a candidate. Raises ValueError only if the promotion would create a duplicate active memory (delegates to update_memory's existing check). """ with get_connection() as conn: row = conn.execute( "SELECT status FROM memories WHERE id = ?", (memory_id,) ).fetchone() if row is None: return False if row["status"] != "candidate": return False return update_memory(memory_id, status="active") def reject_candidate_memory(memory_id: str) -> bool: """Reject a candidate memory (Phase 9 Commit C). Sets the candidate's status to ``invalid`` so it drops out of the review queue without polluting the active set. Returns False if the memory does not exist or is not currently a candidate. """ with get_connection() as conn: row = conn.execute( "SELECT status FROM memories WHERE id = ?", (memory_id,) ).fetchone() if row is None: return False if row["status"] != "candidate": return False return update_memory(memory_id, status="invalid") def reinforce_memory( memory_id: str, confidence_delta: float = 0.02, ) -> tuple[bool, float, float]: """Bump a memory's confidence and reference count (Phase 9 Commit B). Returns a 3-tuple ``(applied, old_confidence, new_confidence)``. ``applied`` is False if the memory does not exist or is not in the ``active`` state — reinforcement only touches live memories so the candidate queue and invalidated history are never silently revived. Confidence is capped at 1.0. last_referenced_at is set to the current UTC time in SQLite-comparable format. reference_count is incremented by one per call (not per delta amount). """ if confidence_delta < 0: raise ValueError("confidence_delta must be non-negative for reinforcement") now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") with get_connection() as conn: row = conn.execute( "SELECT confidence, status FROM memories WHERE id = ?", (memory_id,) ).fetchone() if row is None or row["status"] != "active": return False, 0.0, 0.0 old_confidence = float(row["confidence"]) new_confidence = min(1.0, old_confidence + confidence_delta) conn.execute( "UPDATE memories SET confidence = ?, last_referenced_at = ?, " "reference_count = COALESCE(reference_count, 0) + 1 " "WHERE id = ?", (new_confidence, now, memory_id), ) log.info( "memory_reinforced", memory_id=memory_id, old_confidence=round(old_confidence, 4), new_confidence=round(new_confidence, 4), ) return True, old_confidence, new_confidence def get_memories_for_context( memory_types: list[str] | None = None, project: str | None = None, budget: int = 500, header: str = "--- AtoCore Memory ---", footer: str = "--- End Memory ---", query: str | None = None, ) -> tuple[str, int]: """Get formatted memories for context injection. Returns (formatted_text, char_count). Budget allocation per Master Plan section 9: identity: 5%, preference: 5%, rest from retrieval budget The caller can override ``header`` / ``footer`` to distinguish multiple memory blocks in the same pack (e.g. identity/preference vs project/knowledge memories). When ``query`` is provided, candidates within each memory type are ranked by lexical overlap against the query (stemmed token intersection, ties broken by confidence). Without a query, candidates fall through in the order ``get_memories`` returns them — which is effectively "by confidence desc". """ if memory_types is None: memory_types = ["identity", "preference"] if budget <= 0: return "", 0 wrapper_chars = len(header) + len(footer) + 2 if budget <= wrapper_chars: return "", 0 available = budget - wrapper_chars selected_entries: list[str] = [] used = 0 # Pre-tokenize the query once. ``_score_memory_for_query`` is a # free function below that reuses the reinforcement tokenizer so # lexical scoring here matches the reinforcement matcher. query_tokens: set[str] | None = None if query: from atocore.memory.reinforcement import _normalize, _tokenize query_tokens = _tokenize(_normalize(query)) if not query_tokens: query_tokens = None # Collect ALL candidates across the requested types into one # pool, then rank globally before the budget walk. Ranking per # type and walking types in order would starve later types when # the first type's candidates filled the budget — even if a # later-type candidate matched the query perfectly. Type order # is preserved as a stable tiebreaker inside # ``_rank_memories_for_query`` via Python's stable sort. pool: list[Memory] = [] seen_ids: set[str] = set() for mtype in memory_types: for mem in get_memories( memory_type=mtype, project=project, min_confidence=0.5, limit=30, ): if mem.id in seen_ids: continue seen_ids.add(mem.id) pool.append(mem) if query_tokens is not None: pool = _rank_memories_for_query(pool, query_tokens) # Per-entry cap prevents a single long memory from monopolizing # the band. With 16 p06 memories competing for ~700 chars, an # uncapped 530-char overview memory fills the entire budget before # a query-relevant 150-char memory gets a slot. The cap ensures at # least 2-3 entries fit regardless of individual memory length. max_entry_chars = 250 for mem in pool: content = mem.content if len(content) > max_entry_chars: content = content[:max_entry_chars - 3].rstrip() + "..." entry = f"[{mem.memory_type}] {content}" entry_len = len(entry) + 1 if entry_len > available - used: continue selected_entries.append(entry) used += entry_len if not selected_entries: return "", 0 lines = [header, *selected_entries, footer] text = "\n".join(lines) log.info("memories_for_context", count=len(selected_entries), chars=len(text)) return text, len(text) def _rank_memories_for_query( memories: list["Memory"], query_tokens: set[str], ) -> list["Memory"]: """Rerank a memory list by lexical overlap with a pre-tokenized query. Ordering key: (overlap_count DESC, confidence DESC). When a query shares no tokens with a memory, overlap is zero and confidence acts as the sole tiebreaker — which matches the pre-query behaviour and keeps no-query calls stable. """ from atocore.memory.reinforcement import _normalize, _tokenize scored: list[tuple[int, float, Memory]] = [] for mem in memories: mem_tokens = _tokenize(_normalize(mem.content)) overlap = len(mem_tokens & query_tokens) if mem_tokens else 0 scored.append((overlap, mem.confidence, mem)) scored.sort(key=lambda t: (t[0], t[1]), reverse=True) return [mem for _, _, mem in scored] def _row_to_memory(row) -> Memory: """Convert a DB row to Memory dataclass.""" keys = row.keys() if hasattr(row, "keys") else [] last_ref = row["last_referenced_at"] if "last_referenced_at" in keys else None ref_count = row["reference_count"] if "reference_count" in keys else 0 return Memory( id=row["id"], memory_type=row["memory_type"], content=row["content"], project=row["project"] or "", source_chunk_id=row["source_chunk_id"] or "", confidence=row["confidence"], status=row["status"], created_at=row["created_at"], updated_at=row["updated_at"], last_referenced_at=last_ref or "", reference_count=int(ref_count or 0), ) def _validate_confidence(confidence: float) -> None: if not 0.0 <= confidence <= 1.0: raise ValueError("Confidence must be between 0.0 and 1.0")