get_memories_for_context now accepts an optional query string.
When provided, candidate memories are reranked by lexical overlap
with the query (stemmed token intersection, ties broken by
confidence) before the budget walk. Without a query the order is
unchanged — effectively "by confidence desc" as before — so
non-builder callers see no behaviour change.
The fetch limit is raised from 10 to 30 so there's a real pool to
rerank. Token overlap reuses _normalize/_tokenize from
reinforcement.py so ranking and reinforcement matching share the
same notion of distinctive terms.
build_context passes the user_prompt through to both the identity/
preference and project-memory calls. The retrieval harness
regression the fix is targeting:
- p05-vendor-signal FAIL @ 1161645: "Zygo" missing from the pack
even though an active vendor memory contained it. Root cause:
higher-confidence p05 memories filled the 25% budget slice
before the vendor memory ever got a chance. Query-aware ordering
puts the vendor memory first when the query is about vendors.
New regression test test_project_memories_query_relevance_ordering
locks the behaviour in with two p05 memories and a tight budget.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
471 lines
16 KiB
Python
471 lines
16 KiB
Python
"""Memory Core — structured memory management.
|
||
|
||
Memory types (per Master Plan):
|
||
- identity: who the user is, role, background
|
||
- preference: how they like to work, style, tools
|
||
- project: project-specific knowledge and context
|
||
- episodic: what happened, conversations, events
|
||
- knowledge: verified facts, technical knowledge
|
||
- adaptation: learned corrections, behavioral adjustments
|
||
|
||
Memories have:
|
||
- confidence (0.0–1.0): how certain we are
|
||
- status: lifecycle state, one of MEMORY_STATUSES
|
||
* candidate: extracted from an interaction, awaiting human review
|
||
(Phase 9 Commit C). Candidates are NEVER included in
|
||
context packs.
|
||
* active: promoted/curated, visible to retrieval and context
|
||
* superseded: replaced by a newer entry
|
||
* invalid: rejected / error-corrected
|
||
- last_referenced_at / reference_count: reinforcement signal
|
||
(Phase 9 Commit B). Bumped whenever a captured interaction's
|
||
response content echoes this memory.
|
||
- optional link to source chunk: traceability
|
||
"""
|
||
|
||
import uuid
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
|
||
from atocore.models.database import get_connection
|
||
from atocore.observability.logger import get_logger
|
||
from atocore.projects.registry import resolve_project_name
|
||
|
||
log = get_logger("memory")
|
||
|
||
MEMORY_TYPES = [
|
||
"identity",
|
||
"preference",
|
||
"project",
|
||
"episodic",
|
||
"knowledge",
|
||
"adaptation",
|
||
]
|
||
|
||
MEMORY_STATUSES = [
|
||
"candidate",
|
||
"active",
|
||
"superseded",
|
||
"invalid",
|
||
]
|
||
|
||
|
||
@dataclass
|
||
class Memory:
|
||
id: str
|
||
memory_type: str
|
||
content: str
|
||
project: str
|
||
source_chunk_id: str
|
||
confidence: float
|
||
status: str
|
||
created_at: str
|
||
updated_at: str
|
||
last_referenced_at: str = ""
|
||
reference_count: int = 0
|
||
|
||
|
||
def create_memory(
|
||
memory_type: str,
|
||
content: str,
|
||
project: str = "",
|
||
source_chunk_id: str = "",
|
||
confidence: float = 1.0,
|
||
status: str = "active",
|
||
) -> Memory:
|
||
"""Create a new memory entry.
|
||
|
||
``status`` defaults to ``active`` for backward compatibility. Pass
|
||
``candidate`` when the memory is being proposed by the Phase 9 Commit C
|
||
extractor and still needs human review before it can influence context.
|
||
"""
|
||
if memory_type not in MEMORY_TYPES:
|
||
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
|
||
if status not in MEMORY_STATUSES:
|
||
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
|
||
_validate_confidence(confidence)
|
||
|
||
# Canonicalize the project through the registry so an alias and
|
||
# the canonical id store under the same bucket. This keeps
|
||
# reinforcement queries (which use the interaction's project) and
|
||
# context retrieval (which uses the registry-canonicalized hint)
|
||
# consistent with how memories are created.
|
||
project = resolve_project_name(project)
|
||
|
||
memory_id = str(uuid.uuid4())
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Check for duplicate content within the same type+project at the same status.
|
||
# Scoping by status keeps active curation separate from the candidate
|
||
# review queue: a candidate and an active memory with identical text can
|
||
# legitimately coexist if the candidate is a fresh extraction of something
|
||
# already curated.
|
||
with get_connection() as conn:
|
||
existing = conn.execute(
|
||
"SELECT id FROM memories "
|
||
"WHERE memory_type = ? AND content = ? AND project = ? AND status = ?",
|
||
(memory_type, content, project, status),
|
||
).fetchone()
|
||
if existing:
|
||
log.info(
|
||
"memory_duplicate_skipped",
|
||
memory_type=memory_type,
|
||
status=status,
|
||
content_preview=content[:80],
|
||
)
|
||
return _row_to_memory(
|
||
conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone()
|
||
)
|
||
|
||
conn.execute(
|
||
"INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) "
|
||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
(memory_id, memory_type, content, project, source_chunk_id or None, confidence, status),
|
||
)
|
||
|
||
log.info(
|
||
"memory_created",
|
||
memory_type=memory_type,
|
||
status=status,
|
||
content_preview=content[:80],
|
||
)
|
||
|
||
return Memory(
|
||
id=memory_id,
|
||
memory_type=memory_type,
|
||
content=content,
|
||
project=project,
|
||
source_chunk_id=source_chunk_id,
|
||
confidence=confidence,
|
||
status=status,
|
||
created_at=now,
|
||
updated_at=now,
|
||
last_referenced_at="",
|
||
reference_count=0,
|
||
)
|
||
|
||
|
||
def get_memories(
|
||
memory_type: str | None = None,
|
||
project: str | None = None,
|
||
active_only: bool = True,
|
||
min_confidence: float = 0.0,
|
||
limit: int = 50,
|
||
status: str | None = None,
|
||
) -> list[Memory]:
|
||
"""Retrieve memories, optionally filtered.
|
||
|
||
When ``status`` is provided explicitly, it takes precedence over
|
||
``active_only`` so callers can list the candidate review queue via
|
||
``get_memories(status='candidate')``. When ``status`` is omitted the
|
||
legacy ``active_only`` behaviour still applies.
|
||
"""
|
||
if status is not None and status not in MEMORY_STATUSES:
|
||
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
|
||
|
||
query = "SELECT * FROM memories WHERE 1=1"
|
||
params: list = []
|
||
|
||
if memory_type:
|
||
query += " AND memory_type = ?"
|
||
params.append(memory_type)
|
||
if project is not None:
|
||
# Canonicalize on the read side so a caller passing an alias
|
||
# finds rows that were stored under the canonical id (and
|
||
# vice versa). resolve_project_name returns the input
|
||
# unchanged for unregistered names so empty-string queries
|
||
# for "no project scope" still work.
|
||
query += " AND project = ?"
|
||
params.append(resolve_project_name(project))
|
||
if status is not None:
|
||
query += " AND status = ?"
|
||
params.append(status)
|
||
elif active_only:
|
||
query += " AND status = 'active'"
|
||
if min_confidence > 0:
|
||
query += " AND confidence >= ?"
|
||
params.append(min_confidence)
|
||
|
||
query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?"
|
||
params.append(limit)
|
||
|
||
with get_connection() as conn:
|
||
rows = conn.execute(query, params).fetchall()
|
||
|
||
return [_row_to_memory(r) for r in rows]
|
||
|
||
|
||
def update_memory(
|
||
memory_id: str,
|
||
content: str | None = None,
|
||
confidence: float | None = None,
|
||
status: str | None = None,
|
||
) -> bool:
|
||
"""Update an existing memory."""
|
||
with get_connection() as conn:
|
||
existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
|
||
if existing is None:
|
||
return False
|
||
|
||
next_content = content if content is not None else existing["content"]
|
||
next_status = status if status is not None else existing["status"]
|
||
if confidence is not None:
|
||
_validate_confidence(confidence)
|
||
|
||
if next_status == "active":
|
||
duplicate = conn.execute(
|
||
"SELECT id FROM memories "
|
||
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
|
||
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
|
||
).fetchone()
|
||
if duplicate:
|
||
raise ValueError("Update would create a duplicate active memory")
|
||
|
||
updates = []
|
||
params: list = []
|
||
|
||
if content is not None:
|
||
updates.append("content = ?")
|
||
params.append(content)
|
||
if confidence is not None:
|
||
updates.append("confidence = ?")
|
||
params.append(confidence)
|
||
if status is not None:
|
||
if status not in MEMORY_STATUSES:
|
||
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
|
||
updates.append("status = ?")
|
||
params.append(status)
|
||
|
||
if not updates:
|
||
return False
|
||
|
||
updates.append("updated_at = CURRENT_TIMESTAMP")
|
||
params.append(memory_id)
|
||
|
||
result = conn.execute(
|
||
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
|
||
if result.rowcount > 0:
|
||
log.info("memory_updated", memory_id=memory_id)
|
||
return True
|
||
return False
|
||
|
||
|
||
def invalidate_memory(memory_id: str) -> bool:
|
||
"""Mark a memory as invalid (error correction)."""
|
||
return update_memory(memory_id, status="invalid")
|
||
|
||
|
||
def supersede_memory(memory_id: str) -> bool:
|
||
"""Mark a memory as superseded (replaced by newer info)."""
|
||
return update_memory(memory_id, status="superseded")
|
||
|
||
|
||
def promote_memory(memory_id: str) -> bool:
|
||
"""Promote a candidate memory to active (Phase 9 Commit C review queue).
|
||
|
||
Returns False if the memory does not exist or is not currently a
|
||
candidate. Raises ValueError only if the promotion would create a
|
||
duplicate active memory (delegates to update_memory's existing check).
|
||
"""
|
||
with get_connection() as conn:
|
||
row = conn.execute(
|
||
"SELECT status FROM memories WHERE id = ?", (memory_id,)
|
||
).fetchone()
|
||
if row is None:
|
||
return False
|
||
if row["status"] != "candidate":
|
||
return False
|
||
return update_memory(memory_id, status="active")
|
||
|
||
|
||
def reject_candidate_memory(memory_id: str) -> bool:
|
||
"""Reject a candidate memory (Phase 9 Commit C).
|
||
|
||
Sets the candidate's status to ``invalid`` so it drops out of the
|
||
review queue without polluting the active set. Returns False if the
|
||
memory does not exist or is not currently a candidate.
|
||
"""
|
||
with get_connection() as conn:
|
||
row = conn.execute(
|
||
"SELECT status FROM memories WHERE id = ?", (memory_id,)
|
||
).fetchone()
|
||
if row is None:
|
||
return False
|
||
if row["status"] != "candidate":
|
||
return False
|
||
return update_memory(memory_id, status="invalid")
|
||
|
||
|
||
def reinforce_memory(
|
||
memory_id: str,
|
||
confidence_delta: float = 0.02,
|
||
) -> tuple[bool, float, float]:
|
||
"""Bump a memory's confidence and reference count (Phase 9 Commit B).
|
||
|
||
Returns a 3-tuple ``(applied, old_confidence, new_confidence)``.
|
||
``applied`` is False if the memory does not exist or is not in the
|
||
``active`` state — reinforcement only touches live memories so the
|
||
candidate queue and invalidated history are never silently revived.
|
||
|
||
Confidence is capped at 1.0. last_referenced_at is set to the current
|
||
UTC time in SQLite-comparable format. reference_count is incremented
|
||
by one per call (not per delta amount).
|
||
"""
|
||
if confidence_delta < 0:
|
||
raise ValueError("confidence_delta must be non-negative for reinforcement")
|
||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||
with get_connection() as conn:
|
||
row = conn.execute(
|
||
"SELECT confidence, status FROM memories WHERE id = ?", (memory_id,)
|
||
).fetchone()
|
||
if row is None or row["status"] != "active":
|
||
return False, 0.0, 0.0
|
||
old_confidence = float(row["confidence"])
|
||
new_confidence = min(1.0, old_confidence + confidence_delta)
|
||
conn.execute(
|
||
"UPDATE memories SET confidence = ?, last_referenced_at = ?, "
|
||
"reference_count = COALESCE(reference_count, 0) + 1 "
|
||
"WHERE id = ?",
|
||
(new_confidence, now, memory_id),
|
||
)
|
||
log.info(
|
||
"memory_reinforced",
|
||
memory_id=memory_id,
|
||
old_confidence=round(old_confidence, 4),
|
||
new_confidence=round(new_confidence, 4),
|
||
)
|
||
return True, old_confidence, new_confidence
|
||
|
||
|
||
def get_memories_for_context(
|
||
memory_types: list[str] | None = None,
|
||
project: str | None = None,
|
||
budget: int = 500,
|
||
header: str = "--- AtoCore Memory ---",
|
||
footer: str = "--- End Memory ---",
|
||
query: str | None = None,
|
||
) -> tuple[str, int]:
|
||
"""Get formatted memories for context injection.
|
||
|
||
Returns (formatted_text, char_count).
|
||
|
||
Budget allocation per Master Plan section 9:
|
||
identity: 5%, preference: 5%, rest from retrieval budget
|
||
|
||
The caller can override ``header`` / ``footer`` to distinguish
|
||
multiple memory blocks in the same pack (e.g. identity/preference
|
||
vs project/knowledge memories).
|
||
|
||
When ``query`` is provided, candidates within each memory type
|
||
are ranked by lexical overlap against the query (stemmed token
|
||
intersection, ties broken by confidence). Without a query,
|
||
candidates fall through in the order ``get_memories`` returns
|
||
them — which is effectively "by confidence desc".
|
||
"""
|
||
if memory_types is None:
|
||
memory_types = ["identity", "preference"]
|
||
|
||
if budget <= 0:
|
||
return "", 0
|
||
wrapper_chars = len(header) + len(footer) + 2
|
||
if budget <= wrapper_chars:
|
||
return "", 0
|
||
|
||
available = budget - wrapper_chars
|
||
selected_entries: list[str] = []
|
||
used = 0
|
||
|
||
# Pre-tokenize the query once. ``_score_memory_for_query`` is a
|
||
# free function below that reuses the reinforcement tokenizer so
|
||
# lexical scoring here matches the reinforcement matcher.
|
||
query_tokens: set[str] | None = None
|
||
if query:
|
||
from atocore.memory.reinforcement import _normalize, _tokenize
|
||
|
||
query_tokens = _tokenize(_normalize(query))
|
||
if not query_tokens:
|
||
query_tokens = None
|
||
|
||
# Flat budget across types so paragraph-length project memories
|
||
# aren't starved by an even slice. Types are still walked in order
|
||
# (identity/preference first when they're the input), so earlier
|
||
# types still get first pick when the budget is tight.
|
||
for mtype in memory_types:
|
||
# Raise the fetch limit above the budget slice so query-relevance
|
||
# ordering has a real pool to rerank. Without a query, the extras
|
||
# just fall off the end harmlessly.
|
||
candidates = get_memories(
|
||
memory_type=mtype,
|
||
project=project,
|
||
min_confidence=0.5,
|
||
limit=30,
|
||
)
|
||
if query_tokens is not None:
|
||
candidates = _rank_memories_for_query(candidates, query_tokens)
|
||
for mem in candidates:
|
||
entry = f"[{mem.memory_type}] {mem.content}"
|
||
entry_len = len(entry) + 1
|
||
if entry_len > available - used:
|
||
continue
|
||
selected_entries.append(entry)
|
||
used += entry_len
|
||
|
||
if not selected_entries:
|
||
return "", 0
|
||
|
||
lines = [header, *selected_entries, footer]
|
||
text = "\n".join(lines)
|
||
|
||
log.info("memories_for_context", count=len(selected_entries), chars=len(text))
|
||
return text, len(text)
|
||
|
||
|
||
def _rank_memories_for_query(
|
||
memories: list["Memory"],
|
||
query_tokens: set[str],
|
||
) -> list["Memory"]:
|
||
"""Rerank a memory list by lexical overlap with a pre-tokenized query.
|
||
|
||
Ordering key: (overlap_count DESC, confidence DESC). When a query
|
||
shares no tokens with a memory, overlap is zero and confidence
|
||
acts as the sole tiebreaker — which matches the pre-query
|
||
behaviour and keeps no-query calls stable.
|
||
"""
|
||
from atocore.memory.reinforcement import _normalize, _tokenize
|
||
|
||
scored: list[tuple[int, float, Memory]] = []
|
||
for mem in memories:
|
||
mem_tokens = _tokenize(_normalize(mem.content))
|
||
overlap = len(mem_tokens & query_tokens) if mem_tokens else 0
|
||
scored.append((overlap, mem.confidence, mem))
|
||
scored.sort(key=lambda t: (t[0], t[1]), reverse=True)
|
||
return [mem for _, _, mem in scored]
|
||
|
||
|
||
def _row_to_memory(row) -> Memory:
|
||
"""Convert a DB row to Memory dataclass."""
|
||
keys = row.keys() if hasattr(row, "keys") else []
|
||
last_ref = row["last_referenced_at"] if "last_referenced_at" in keys else None
|
||
ref_count = row["reference_count"] if "reference_count" in keys else 0
|
||
return Memory(
|
||
id=row["id"],
|
||
memory_type=row["memory_type"],
|
||
content=row["content"],
|
||
project=row["project"] or "",
|
||
source_chunk_id=row["source_chunk_id"] or "",
|
||
confidence=row["confidence"],
|
||
status=row["status"],
|
||
created_at=row["created_at"],
|
||
updated_at=row["updated_at"],
|
||
last_referenced_at=last_ref or "",
|
||
reference_count=int(ref_count or 0),
|
||
)
|
||
|
||
|
||
def _validate_confidence(confidence: float) -> None:
|
||
if not 0.0 <= confidence <= 1.0:
|
||
raise ValueError("Confidence must be between 0.0 and 1.0")
|