diff --git a/src/atocore/api/routes.py b/src/atocore/api/routes.py index f121d2d..b9f4bb6 100644 --- a/src/atocore/api/routes.py +++ b/src/atocore/api/routes.py @@ -17,6 +17,14 @@ from atocore.context.project_state import ( set_state, ) from atocore.ingestion.pipeline import ingest_file, ingest_folder, get_ingestion_stats +from atocore.memory.service import ( + MEMORY_TYPES, + create_memory, + get_memories, + invalidate_memory, + supersede_memory, + update_memory, +) from atocore.observability.logger import get_logger from atocore.retrieval.retriever import retrieve from atocore.retrieval.vector_store import get_vector_store @@ -63,6 +71,19 @@ class ContextBuildResponse(BaseModel): chunks: list[dict] +class MemoryCreateRequest(BaseModel): + memory_type: str + content: str + project: str = "" + confidence: float = 1.0 + + +class MemoryUpdateRequest(BaseModel): + content: str | None = None + confidence: float | None = None + status: str | None = None + + class ProjectStateSetRequest(BaseModel): project: str category: str @@ -153,6 +174,77 @@ def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse: ) +@router.post("/memory") +def api_create_memory(req: MemoryCreateRequest) -> dict: + """Create a new memory entry.""" + try: + mem = create_memory( + memory_type=req.memory_type, + content=req.content, + project=req.project, + confidence=req.confidence, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type} + + +@router.get("/memory") +def api_get_memories( + memory_type: str | None = None, + active_only: bool = True, + min_confidence: float = 0.0, + limit: int = 50, +) -> dict: + """List memories, optionally filtered.""" + memories = get_memories( + memory_type=memory_type, + active_only=active_only, + min_confidence=min_confidence, + limit=limit, + ) + return { + "memories": [ + { + "id": m.id, + "memory_type": m.memory_type, + "content": m.content, + "confidence": m.confidence, + "status": m.status, + "updated_at": m.updated_at, + } + for m in memories + ], + "types": MEMORY_TYPES, + } + + +@router.put("/memory/{memory_id}") +def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict: + """Update an existing memory.""" + try: + success = update_memory( + memory_id=memory_id, + content=req.content, + confidence=req.confidence, + status=req.status, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + if not success: + raise HTTPException(status_code=404, detail="Memory not found") + return {"status": "updated", "id": memory_id} + + +@router.delete("/memory/{memory_id}") +def api_invalidate_memory(memory_id: str) -> dict: + """Invalidate a memory (error correction).""" + success = invalidate_memory(memory_id) + if not success: + raise HTTPException(status_code=404, detail="Memory not found") + return {"status": "invalidated", "id": memory_id} + + @router.post("/project/state") def api_set_project_state(req: ProjectStateSetRequest) -> dict: """Set or update a trusted project state entry.""" diff --git a/src/atocore/context/builder.py b/src/atocore/context/builder.py index ddb2066..087eacc 100644 --- a/src/atocore/context/builder.py +++ b/src/atocore/context/builder.py @@ -1,8 +1,9 @@ """Context pack assembly: retrieve, rank, budget, format. Trust precedence (per Master Plan): - 1. Trusted Project State → always included first, uses its own budget slice - 2. Retrieved chunks → ranked, deduplicated, budget-constrained + 1. Trusted Project State → always included first, highest authority + 2. Identity + Preference memories → included next + 3. Retrieved chunks → ranked, deduplicated, budget-constrained """ import time @@ -11,6 +12,7 @@ from pathlib import Path from atocore.config import settings from atocore.context.project_state import format_project_state, get_state +from atocore.memory.service import get_memories_for_context from atocore.observability.logger import get_logger from atocore.retrieval.retriever import ChunkResult, retrieve @@ -23,9 +25,10 @@ SYSTEM_PREFIX = ( "When project state is provided, treat it as the most authoritative source." ) -# Budget allocation (per Master Plan section 9) -# project_state gets up to 20% of budget, retrieval gets the rest +# Budget allocation (per Master Plan section 9): +# identity: 5%, preferences: 5%, project state: 20%, retrieval: 60%+ PROJECT_STATE_BUDGET_RATIO = 0.20 +MEMORY_BUDGET_RATIO = 0.10 # 5% identity + 5% preference # Last built context pack for debug inspection _last_context_pack: "ContextPack | None" = None @@ -45,6 +48,8 @@ class ContextPack: chunks_used: list[ContextChunk] = field(default_factory=list) project_state_text: str = "" project_state_chars: int = 0 + memory_text: str = "" + memory_chars: int = 0 total_chars: int = 0 budget: int = 0 budget_remaining: int = 0 @@ -64,7 +69,8 @@ def build_context( Trust precedence applied: 1. Project state is injected first (highest trust) - 2. Retrieved chunks fill the remaining budget + 2. Identity + preference memories (second trust level) + 3. Retrieved chunks fill the remaining budget """ global _last_context_pack start = time.time() @@ -73,48 +79,48 @@ def build_context( # 1. Get Trusted Project State (highest precedence) project_state_text = "" project_state_chars = 0 - state_budget = int(budget * PROJECT_STATE_BUDGET_RATIO) if project_hint: state_entries = get_state(project_hint) if state_entries: project_state_text = format_project_state(state_entries) project_state_chars = len(project_state_text) - # If state exceeds its budget, it still gets included (it's highest trust) - # but we log it - if project_state_chars > state_budget: - log.info( - "project_state_exceeds_budget", - state_chars=project_state_chars, - state_budget=state_budget, - ) - # 2. Calculate remaining budget for retrieval - retrieval_budget = budget - project_state_chars + # 2. Get identity + preference memories (second precedence) + memory_budget = int(budget * MEMORY_BUDGET_RATIO) + memory_text, memory_chars = get_memories_for_context( + memory_types=["identity", "preference"], + budget=memory_budget, + ) - # 3. Retrieve candidates + # 3. Calculate remaining budget for retrieval + retrieval_budget = budget - project_state_chars - memory_chars + + # 4. Retrieve candidates candidates = retrieve(user_prompt, top_k=settings.context_top_k) - # 4. Score and rank + # 5. Score and rank scored = _rank_chunks(candidates, project_hint) - # 5. Select within remaining budget + # 6. Select within remaining budget selected = _select_within_budget(scored, max(retrieval_budget, 0)) - # 6. Format full context - formatted = _format_full_context(project_state_text, selected) + # 7. Format full context + formatted = _format_full_context(project_state_text, memory_text, selected) - # 7. Build full prompt + # 8. Build full prompt full_prompt = f"{SYSTEM_PREFIX}\n\n{formatted}\n\n{user_prompt}" retrieval_chars = sum(c.char_count for c in selected) - total_chars = project_state_chars + retrieval_chars + total_chars = project_state_chars + memory_chars + retrieval_chars duration_ms = int((time.time() - start) * 1000) pack = ContextPack( chunks_used=selected, project_state_text=project_state_text, project_state_chars=project_state_chars, + memory_text=memory_text, + memory_chars=memory_chars, total_chars=total_chars, budget=budget, budget_remaining=budget - total_chars, @@ -131,6 +137,7 @@ def build_context( "context_built", chunks_used=len(selected), project_state_chars=project_state_chars, + memory_chars=memory_chars, retrieval_chars=retrieval_chars, total_chars=total_chars, budget_remaining=budget - total_chars, @@ -209,17 +216,23 @@ def _select_within_budget( def _format_full_context( project_state_text: str, + memory_text: str, chunks: list[ContextChunk], ) -> str: - """Format project state + retrieved chunks into full context block.""" + """Format project state + memories + retrieved chunks into full context block.""" parts = [] - # Project state first (highest trust) + # 1. Project state first (highest trust) if project_state_text: parts.append(project_state_text) parts.append("") - # Retrieved chunks + # 2. Identity + preference memories (second trust level) + if memory_text: + parts.append(memory_text) + parts.append("") + + # 3. Retrieved chunks (lowest trust) if chunks: parts.append("--- AtoCore Retrieved Context ---") for chunk in chunks: @@ -229,7 +242,7 @@ def _format_full_context( parts.append(chunk.content) parts.append("") parts.append("--- End Context ---") - elif not project_state_text: + elif not project_state_text and not memory_text: parts.append("--- AtoCore Context ---\nNo relevant context found.\n--- End Context ---") return "\n".join(parts) @@ -250,12 +263,14 @@ def _pack_to_dict(pack: ContextPack) -> dict: "query": pack.query, "project_hint": pack.project_hint, "project_state_chars": pack.project_state_chars, + "memory_chars": pack.memory_chars, "chunks_used": len(pack.chunks_used), "total_chars": pack.total_chars, "budget": pack.budget, "budget_remaining": pack.budget_remaining, "duration_ms": pack.duration_ms, "has_project_state": bool(pack.project_state_text), + "has_memories": bool(pack.memory_text), "chunks": [ { "source_file": c.source_file, diff --git a/src/atocore/memory/__init__.py b/src/atocore/memory/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/atocore/memory/service.py b/src/atocore/memory/service.py new file mode 100644 index 0000000..7e7d6c6 --- /dev/null +++ b/src/atocore/memory/service.py @@ -0,0 +1,231 @@ +"""Memory Core — structured memory management. + +Memory types (per Master Plan): + - identity: who the user is, role, background + - preference: how they like to work, style, tools + - project: project-specific knowledge and context + - episodic: what happened, conversations, events + - knowledge: verified facts, technical knowledge + - adaptation: learned corrections, behavioral adjustments + +Memories have: + - confidence (0.0–1.0): how certain we are + - status (active/superseded/invalid): lifecycle state + - optional link to source chunk: traceability +""" + +import json +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone + +from atocore.models.database import get_connection +from atocore.observability.logger import get_logger + +log = get_logger("memory") + +MEMORY_TYPES = [ + "identity", + "preference", + "project", + "episodic", + "knowledge", + "adaptation", +] + + +@dataclass +class Memory: + id: str + memory_type: str + content: str + project: str + source_chunk_id: str + confidence: float + status: str + created_at: str + updated_at: str + + +def create_memory( + memory_type: str, + content: str, + project: str = "", + source_chunk_id: str = "", + confidence: float = 1.0, +) -> Memory: + """Create a new memory entry.""" + if memory_type not in MEMORY_TYPES: + raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}") + + memory_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + + # Check for duplicate content within same type+project + with get_connection() as conn: + existing = conn.execute( + "SELECT id FROM memories WHERE memory_type = ? AND content = ? AND status = 'active'", + (memory_type, content), + ).fetchone() + if existing: + log.info("memory_duplicate_skipped", memory_type=memory_type, content_preview=content[:80]) + return _row_to_memory( + conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone() + ) + + conn.execute( + "INSERT INTO memories (id, memory_type, content, source_chunk_id, confidence, status) " + "VALUES (?, ?, ?, ?, ?, 'active')", + (memory_id, memory_type, content, source_chunk_id or None, confidence), + ) + + log.info("memory_created", memory_type=memory_type, content_preview=content[:80]) + + return Memory( + id=memory_id, + memory_type=memory_type, + content=content, + project=project, + source_chunk_id=source_chunk_id, + confidence=confidence, + status="active", + created_at=now, + updated_at=now, + ) + + +def get_memories( + memory_type: str | None = None, + active_only: bool = True, + min_confidence: float = 0.0, + limit: int = 50, +) -> list[Memory]: + """Retrieve memories, optionally filtered.""" + query = "SELECT * FROM memories WHERE 1=1" + params: list = [] + + if memory_type: + query += " AND memory_type = ?" + params.append(memory_type) + if active_only: + query += " AND status = 'active'" + if min_confidence > 0: + query += " AND confidence >= ?" + params.append(min_confidence) + + query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?" + params.append(limit) + + with get_connection() as conn: + rows = conn.execute(query, params).fetchall() + + return [_row_to_memory(r) for r in rows] + + +def update_memory( + memory_id: str, + content: str | None = None, + confidence: float | None = None, + status: str | None = None, +) -> bool: + """Update an existing memory.""" + updates = [] + params: list = [] + + if content is not None: + updates.append("content = ?") + params.append(content) + if confidence is not None: + updates.append("confidence = ?") + params.append(confidence) + if status is not None: + if status not in ("active", "superseded", "invalid"): + raise ValueError(f"Invalid status '{status}'") + updates.append("status = ?") + params.append(status) + + if not updates: + return False + + updates.append("updated_at = CURRENT_TIMESTAMP") + params.append(memory_id) + + with get_connection() as conn: + result = conn.execute( + f"UPDATE memories SET {', '.join(updates)} WHERE id = ?", + params, + ) + + if result.rowcount > 0: + log.info("memory_updated", memory_id=memory_id) + return True + return False + + +def invalidate_memory(memory_id: str) -> bool: + """Mark a memory as invalid (error correction).""" + return update_memory(memory_id, status="invalid") + + +def supersede_memory(memory_id: str) -> bool: + """Mark a memory as superseded (replaced by newer info).""" + return update_memory(memory_id, status="superseded") + + +def get_memories_for_context( + memory_types: list[str] | None = None, + budget: int = 500, +) -> tuple[str, int]: + """Get formatted memories for context injection. + + Returns (formatted_text, char_count). + + Budget allocation per Master Plan section 9: + identity: 5%, preference: 5%, rest from retrieval budget + """ + if memory_types is None: + memory_types = ["identity", "preference"] + + memories = [] + for mtype in memory_types: + memories.extend(get_memories(memory_type=mtype, min_confidence=0.5, limit=10)) + + if not memories: + return "", 0 + + lines = ["--- AtoCore Memory ---"] + used = len(lines[0]) + 1 + included = [] + + for mem in memories: + entry = f"[{mem.memory_type}] {mem.content}" + entry_len = len(entry) + 1 + if used + entry_len > budget: + break + lines.append(entry) + used += entry_len + included.append(mem) + + if len(included) == 0: + return "", 0 + + lines.append("--- End Memory ---") + text = "\n".join(lines) + + log.info("memories_for_context", count=len(included), chars=len(text)) + return text, len(text) + + +def _row_to_memory(row) -> Memory: + """Convert a DB row to Memory dataclass.""" + return Memory( + id=row["id"], + memory_type=row["memory_type"], + content=row["content"], + project="", + source_chunk_id=row["source_chunk_id"] or "", + confidence=row["confidence"], + status=row["status"], + created_at=row["created_at"], + updated_at=row["updated_at"], + ) diff --git a/src/atocore/models/database.py b/src/atocore/models/database.py index 1e07189..592eea3 100644 --- a/src/atocore/models/database.py +++ b/src/atocore/models/database.py @@ -5,7 +5,7 @@ from contextlib import contextmanager from pathlib import Path from typing import Generator -from atocore.config import settings +import atocore.config as _config from atocore.observability.logger import get_logger log = get_logger("database") @@ -70,7 +70,7 @@ CREATE INDEX IF NOT EXISTS idx_interactions_project ON interactions(project_id); def _ensure_data_dir() -> None: - settings.data_dir.mkdir(parents=True, exist_ok=True) + _config.settings.data_dir.mkdir(parents=True, exist_ok=True) def init_db() -> None: @@ -78,14 +78,14 @@ def init_db() -> None: _ensure_data_dir() with get_connection() as conn: conn.executescript(SCHEMA_SQL) - log.info("database_initialized", path=str(settings.db_path)) + log.info("database_initialized", path=str(_config.settings.db_path)) @contextmanager def get_connection() -> Generator[sqlite3.Connection, None, None]: """Get a database connection with row factory.""" _ensure_data_dir() - conn = sqlite3.connect(str(settings.db_path)) + conn = sqlite3.connect(str(_config.settings.db_path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") try: diff --git a/tests/conftest.py b/tests/conftest.py index 5bbafe0..b8081c1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,8 +6,9 @@ from pathlib import Path import pytest -# Force test data directory -os.environ["ATOCORE_DATA_DIR"] = tempfile.mkdtemp(prefix="atocore_test_") +# Default test data directory — overridden per-test by fixtures +_default_test_dir = tempfile.mkdtemp(prefix="atocore_test_") +os.environ["ATOCORE_DATA_DIR"] = _default_test_dir os.environ["ATOCORE_DEBUG"] = "true" diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 0000000..8ee0811 --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,133 @@ +"""Tests for Memory Core.""" + +import os +import tempfile + +import pytest + +import atocore.config as _config +from atocore.models.database import init_db + + +@pytest.fixture(autouse=True) +def isolated_db(): + """Give each test a completely isolated database.""" + tmpdir = tempfile.mkdtemp() + os.environ["ATOCORE_DATA_DIR"] = tmpdir + + # Replace the global settings so all modules see the new data_dir + _config.settings = _config.Settings() + + # Also reset any module-level references to the old settings + import atocore.models.database + # database.py now uses _config.settings dynamically, so no patch needed + + init_db() + yield tmpdir + + +def test_create_memory(isolated_db): + from atocore.memory.service import create_memory + mem = create_memory("identity", "User is a mechanical engineer specializing in optics") + assert mem.memory_type == "identity" + assert mem.status == "active" + assert mem.confidence == 1.0 + + +def test_create_memory_invalid_type(isolated_db): + from atocore.memory.service import create_memory + with pytest.raises(ValueError, match="Invalid memory type"): + create_memory("invalid_type", "some content") + + +def test_create_memory_dedup(isolated_db): + from atocore.memory.service import create_memory + m1 = create_memory("identity", "User is an engineer") + m2 = create_memory("identity", "User is an engineer") + assert m1.id == m2.id + + +def test_get_memories_all(isolated_db): + from atocore.memory.service import create_memory, get_memories + create_memory("identity", "User is an engineer") + create_memory("preference", "Prefers Python with type hints") + create_memory("knowledge", "Zerodur has near-zero thermal expansion") + + mems = get_memories() + assert len(mems) == 3 + + +def test_get_memories_by_type(isolated_db): + from atocore.memory.service import create_memory, get_memories + create_memory("identity", "User is an engineer") + create_memory("preference", "Prefers concise code") + create_memory("preference", "Uses FastAPI for APIs") + + mems = get_memories(memory_type="preference") + assert len(mems) == 2 + + +def test_get_memories_active_only(isolated_db): + from atocore.memory.service import create_memory, get_memories, invalidate_memory + m = create_memory("knowledge", "Fact about optics") + invalidate_memory(m.id) + + assert len(get_memories(active_only=True)) == 0 + assert len(get_memories(active_only=False)) == 1 + + +def test_get_memories_min_confidence(isolated_db): + from atocore.memory.service import create_memory, get_memories + create_memory("knowledge", "High confidence fact", confidence=0.9) + create_memory("knowledge", "Low confidence fact", confidence=0.3) + + high = get_memories(min_confidence=0.5) + assert len(high) == 1 + assert high[0].confidence == 0.9 + + +def test_update_memory(isolated_db): + from atocore.memory.service import create_memory, get_memories, update_memory + mem = create_memory("knowledge", "Initial fact") + update_memory(mem.id, content="Updated fact", confidence=0.8) + + mems = get_memories() + assert len(mems) == 1 + assert mems[0].content == "Updated fact" + assert mems[0].confidence == 0.8 + + +def test_invalidate_memory(isolated_db): + from atocore.memory.service import create_memory, get_memories, invalidate_memory + mem = create_memory("knowledge", "Wrong fact") + invalidate_memory(mem.id) + assert len(get_memories(active_only=True)) == 0 + + +def test_supersede_memory(isolated_db): + from atocore.memory.service import create_memory, get_memories, supersede_memory + mem = create_memory("knowledge", "Old fact") + supersede_memory(mem.id) + + mems = get_memories(active_only=False) + assert len(mems) == 1 + assert mems[0].status == "superseded" + + +def test_memories_for_context(isolated_db): + from atocore.memory.service import create_memory, get_memories_for_context + create_memory("identity", "User is a senior mechanical engineer") + create_memory("preference", "Prefers Python with type hints") + + text, chars = get_memories_for_context(memory_types=["identity", "preference"], budget=500) + assert "--- AtoCore Memory ---" in text + assert "[identity]" in text + assert "[preference]" in text + assert chars > 0 + + +def test_memories_for_context_empty(isolated_db): + from atocore.memory.service import get_memories_for_context + text, chars = get_memories_for_context() + assert text == "" + assert chars == 0