Files
ATOCore/src/atocore/memory/service.py

270 lines
8.0 KiB
Python
Raw Normal View History

"""Memory Core — structured memory management.
Memory types (per Master Plan):
- identity: who the user is, role, background
- preference: how they like to work, style, tools
- project: project-specific knowledge and context
- episodic: what happened, conversations, events
- knowledge: verified facts, technical knowledge
- adaptation: learned corrections, behavioral adjustments
Memories have:
- confidence (0.01.0): how certain we are
- status (active/superseded/invalid): lifecycle state
- optional link to source chunk: traceability
"""
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("memory")
MEMORY_TYPES = [
"identity",
"preference",
"project",
"episodic",
"knowledge",
"adaptation",
]
@dataclass
class Memory:
id: str
memory_type: str
content: str
project: str
source_chunk_id: str
confidence: float
status: str
created_at: str
updated_at: str
def create_memory(
memory_type: str,
content: str,
project: str = "",
source_chunk_id: str = "",
confidence: float = 1.0,
) -> Memory:
"""Create a new memory entry."""
if memory_type not in MEMORY_TYPES:
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
_validate_confidence(confidence)
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# Check for duplicate content within same type+project
with get_connection() as conn:
existing = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active'",
(memory_type, content, project),
).fetchone()
if existing:
log.info("memory_duplicate_skipped", memory_type=memory_type, content_preview=content[:80])
return _row_to_memory(
conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone()
)
conn.execute(
"INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) "
"VALUES (?, ?, ?, ?, ?, ?, 'active')",
(memory_id, memory_type, content, project, source_chunk_id or None, confidence),
)
log.info("memory_created", memory_type=memory_type, content_preview=content[:80])
return Memory(
id=memory_id,
memory_type=memory_type,
content=content,
project=project,
source_chunk_id=source_chunk_id,
confidence=confidence,
status="active",
created_at=now,
updated_at=now,
)
def get_memories(
memory_type: str | None = None,
project: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
) -> list[Memory]:
"""Retrieve memories, optionally filtered."""
query = "SELECT * FROM memories WHERE 1=1"
params: list = []
if memory_type:
query += " AND memory_type = ?"
params.append(memory_type)
if project is not None:
query += " AND project = ?"
params.append(project)
if active_only:
query += " AND status = 'active'"
if min_confidence > 0:
query += " AND confidence >= ?"
params.append(min_confidence)
query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?"
params.append(limit)
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_memory(r) for r in rows]
def update_memory(
memory_id: str,
content: str | None = None,
confidence: float | None = None,
status: str | None = None,
) -> bool:
"""Update an existing memory."""
with get_connection() as conn:
existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
if existing is None:
return False
next_content = content if content is not None else existing["content"]
next_status = status if status is not None else existing["status"]
if confidence is not None:
_validate_confidence(confidence)
if next_status == "active":
duplicate = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
).fetchone()
if duplicate:
raise ValueError("Update would create a duplicate active memory")
updates = []
params: list = []
if content is not None:
updates.append("content = ?")
params.append(content)
if confidence is not None:
updates.append("confidence = ?")
params.append(confidence)
if status is not None:
if status not in ("active", "superseded", "invalid"):
raise ValueError(f"Invalid status '{status}'")
updates.append("status = ?")
params.append(status)
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(memory_id)
result = conn.execute(
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?",
params,
)
if result.rowcount > 0:
log.info("memory_updated", memory_id=memory_id)
return True
return False
def invalidate_memory(memory_id: str) -> bool:
"""Mark a memory as invalid (error correction)."""
return update_memory(memory_id, status="invalid")
def supersede_memory(memory_id: str) -> bool:
"""Mark a memory as superseded (replaced by newer info)."""
return update_memory(memory_id, status="superseded")
def get_memories_for_context(
memory_types: list[str] | None = None,
project: str | None = None,
budget: int = 500,
) -> tuple[str, int]:
"""Get formatted memories for context injection.
Returns (formatted_text, char_count).
Budget allocation per Master Plan section 9:
identity: 5%, preference: 5%, rest from retrieval budget
"""
if memory_types is None:
memory_types = ["identity", "preference"]
if budget <= 0:
return "", 0
header = "--- AtoCore Memory ---"
footer = "--- End Memory ---"
wrapper_chars = len(header) + len(footer) + 2
if budget <= wrapper_chars:
return "", 0
available = budget - wrapper_chars
selected_entries: list[str] = []
for index, mtype in enumerate(memory_types):
type_budget = available if index == len(memory_types) - 1 else max(0, available // (len(memory_types) - index))
type_used = 0
for mem in get_memories(
memory_type=mtype,
project=project,
min_confidence=0.5,
limit=10,
):
entry = f"[{mem.memory_type}] {mem.content}"
entry_len = len(entry) + 1
if entry_len > type_budget - type_used:
continue
selected_entries.append(entry)
type_used += entry_len
available -= type_used
if not selected_entries:
return "", 0
lines = [header, *selected_entries, footer]
text = "\n".join(lines)
log.info("memories_for_context", count=len(selected_entries), chars=len(text))
return text, len(text)
def _row_to_memory(row) -> Memory:
"""Convert a DB row to Memory dataclass."""
return Memory(
id=row["id"],
memory_type=row["memory_type"],
content=row["content"],
project=row["project"] or "",
source_chunk_id=row["source_chunk_id"] or "",
confidence=row["confidence"],
status=row["status"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)
def _validate_confidence(confidence: float) -> None:
if not 0.0 <= confidence <= 1.0:
raise ValueError("Confidence must be between 0.0 and 1.0")