"""Memory Core — structured memory management. Memory types (per Master Plan): - identity: who the user is, role, background - preference: how they like to work, style, tools - project: project-specific knowledge and context - episodic: what happened, conversations, events - knowledge: verified facts, technical knowledge - adaptation: learned corrections, behavioral adjustments Memories have: - confidence (0.0–1.0): how certain we are - status (active/superseded/invalid): lifecycle state - optional link to source chunk: traceability """ import json import uuid from dataclasses import dataclass from datetime import datetime, timezone from atocore.models.database import get_connection from atocore.observability.logger import get_logger log = get_logger("memory") MEMORY_TYPES = [ "identity", "preference", "project", "episodic", "knowledge", "adaptation", ] @dataclass class Memory: id: str memory_type: str content: str project: str source_chunk_id: str confidence: float status: str created_at: str updated_at: str def create_memory( memory_type: str, content: str, project: str = "", source_chunk_id: str = "", confidence: float = 1.0, ) -> Memory: """Create a new memory entry.""" if memory_type not in MEMORY_TYPES: raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}") memory_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() # Check for duplicate content within same type+project with get_connection() as conn: existing = conn.execute( "SELECT id FROM memories WHERE memory_type = ? AND content = ? AND status = 'active'", (memory_type, content), ).fetchone() if existing: log.info("memory_duplicate_skipped", memory_type=memory_type, content_preview=content[:80]) return _row_to_memory( conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone() ) conn.execute( "INSERT INTO memories (id, memory_type, content, source_chunk_id, confidence, status) " "VALUES (?, ?, ?, ?, ?, 'active')", (memory_id, memory_type, content, source_chunk_id or None, confidence), ) log.info("memory_created", memory_type=memory_type, content_preview=content[:80]) return Memory( id=memory_id, memory_type=memory_type, content=content, project=project, source_chunk_id=source_chunk_id, confidence=confidence, status="active", created_at=now, updated_at=now, ) def get_memories( memory_type: str | None = None, active_only: bool = True, min_confidence: float = 0.0, limit: int = 50, ) -> list[Memory]: """Retrieve memories, optionally filtered.""" query = "SELECT * FROM memories WHERE 1=1" params: list = [] if memory_type: query += " AND memory_type = ?" params.append(memory_type) if active_only: query += " AND status = 'active'" if min_confidence > 0: query += " AND confidence >= ?" params.append(min_confidence) query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?" params.append(limit) with get_connection() as conn: rows = conn.execute(query, params).fetchall() return [_row_to_memory(r) for r in rows] def update_memory( memory_id: str, content: str | None = None, confidence: float | None = None, status: str | None = None, ) -> bool: """Update an existing memory.""" updates = [] params: list = [] if content is not None: updates.append("content = ?") params.append(content) if confidence is not None: updates.append("confidence = ?") params.append(confidence) if status is not None: if status not in ("active", "superseded", "invalid"): raise ValueError(f"Invalid status '{status}'") updates.append("status = ?") params.append(status) if not updates: return False updates.append("updated_at = CURRENT_TIMESTAMP") params.append(memory_id) with get_connection() as conn: result = conn.execute( f"UPDATE memories SET {', '.join(updates)} WHERE id = ?", params, ) if result.rowcount > 0: log.info("memory_updated", memory_id=memory_id) return True return False def invalidate_memory(memory_id: str) -> bool: """Mark a memory as invalid (error correction).""" return update_memory(memory_id, status="invalid") def supersede_memory(memory_id: str) -> bool: """Mark a memory as superseded (replaced by newer info).""" return update_memory(memory_id, status="superseded") def get_memories_for_context( memory_types: list[str] | None = None, budget: int = 500, ) -> tuple[str, int]: """Get formatted memories for context injection. Returns (formatted_text, char_count). Budget allocation per Master Plan section 9: identity: 5%, preference: 5%, rest from retrieval budget """ if memory_types is None: memory_types = ["identity", "preference"] memories = [] for mtype in memory_types: memories.extend(get_memories(memory_type=mtype, min_confidence=0.5, limit=10)) if not memories: return "", 0 lines = ["--- AtoCore Memory ---"] used = len(lines[0]) + 1 included = [] for mem in memories: entry = f"[{mem.memory_type}] {mem.content}" entry_len = len(entry) + 1 if used + entry_len > budget: break lines.append(entry) used += entry_len included.append(mem) if len(included) == 0: return "", 0 lines.append("--- End Memory ---") text = "\n".join(lines) log.info("memories_for_context", count=len(included), chars=len(text)) return text, len(text) def _row_to_memory(row) -> Memory: """Convert a DB row to Memory dataclass.""" return Memory( id=row["id"], memory_type=row["memory_type"], content=row["content"], project="", source_chunk_id=row["source_chunk_id"] or "", confidence=row["confidence"], status=row["status"], created_at=row["created_at"], updated_at=row["updated_at"], )