Per-type ranking was still starving later types: when a p05 query
matched a 'knowledge' memory best but 'project' came first in the
type order, the project-type candidates filled the budget before
the knowledge-type pool was even ranked.
Collect all candidates into a single pool, dedupe by id, then
rank the whole pool once against the query before walking the
flat budget. Python's stable sort preserves insertion order (which
still reflects the caller's memory_types order) as a natural
tiebreaker when scores are equal.
Regression surfaced by the retrieval eval harness:
p05-vendor-signal still missing 'Zygo' after 5aeeb1c — the vendor
memory was type=knowledge but never reached the ranker because
type=project consumed the budget first.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
479 lines
16 KiB
Python
479 lines
16 KiB
Python
"""Memory Core — structured memory management.
|
||
|
||
Memory types (per Master Plan):
|
||
- identity: who the user is, role, background
|
||
- preference: how they like to work, style, tools
|
||
- project: project-specific knowledge and context
|
||
- episodic: what happened, conversations, events
|
||
- knowledge: verified facts, technical knowledge
|
||
- adaptation: learned corrections, behavioral adjustments
|
||
|
||
Memories have:
|
||
- confidence (0.0–1.0): how certain we are
|
||
- status: lifecycle state, one of MEMORY_STATUSES
|
||
* candidate: extracted from an interaction, awaiting human review
|
||
(Phase 9 Commit C). Candidates are NEVER included in
|
||
context packs.
|
||
* active: promoted/curated, visible to retrieval and context
|
||
* superseded: replaced by a newer entry
|
||
* invalid: rejected / error-corrected
|
||
- last_referenced_at / reference_count: reinforcement signal
|
||
(Phase 9 Commit B). Bumped whenever a captured interaction's
|
||
response content echoes this memory.
|
||
- optional link to source chunk: traceability
|
||
"""
|
||
|
||
import uuid
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
|
||
from atocore.models.database import get_connection
|
||
from atocore.observability.logger import get_logger
|
||
from atocore.projects.registry import resolve_project_name
|
||
|
||
log = get_logger("memory")
|
||
|
||
MEMORY_TYPES = [
|
||
"identity",
|
||
"preference",
|
||
"project",
|
||
"episodic",
|
||
"knowledge",
|
||
"adaptation",
|
||
]
|
||
|
||
MEMORY_STATUSES = [
|
||
"candidate",
|
||
"active",
|
||
"superseded",
|
||
"invalid",
|
||
]
|
||
|
||
|
||
@dataclass
|
||
class Memory:
|
||
id: str
|
||
memory_type: str
|
||
content: str
|
||
project: str
|
||
source_chunk_id: str
|
||
confidence: float
|
||
status: str
|
||
created_at: str
|
||
updated_at: str
|
||
last_referenced_at: str = ""
|
||
reference_count: int = 0
|
||
|
||
|
||
def create_memory(
|
||
memory_type: str,
|
||
content: str,
|
||
project: str = "",
|
||
source_chunk_id: str = "",
|
||
confidence: float = 1.0,
|
||
status: str = "active",
|
||
) -> Memory:
|
||
"""Create a new memory entry.
|
||
|
||
``status`` defaults to ``active`` for backward compatibility. Pass
|
||
``candidate`` when the memory is being proposed by the Phase 9 Commit C
|
||
extractor and still needs human review before it can influence context.
|
||
"""
|
||
if memory_type not in MEMORY_TYPES:
|
||
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
|
||
if status not in MEMORY_STATUSES:
|
||
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
|
||
_validate_confidence(confidence)
|
||
|
||
# Canonicalize the project through the registry so an alias and
|
||
# the canonical id store under the same bucket. This keeps
|
||
# reinforcement queries (which use the interaction's project) and
|
||
# context retrieval (which uses the registry-canonicalized hint)
|
||
# consistent with how memories are created.
|
||
project = resolve_project_name(project)
|
||
|
||
memory_id = str(uuid.uuid4())
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Check for duplicate content within the same type+project at the same status.
|
||
# Scoping by status keeps active curation separate from the candidate
|
||
# review queue: a candidate and an active memory with identical text can
|
||
# legitimately coexist if the candidate is a fresh extraction of something
|
||
# already curated.
|
||
with get_connection() as conn:
|
||
existing = conn.execute(
|
||
"SELECT id FROM memories "
|
||
"WHERE memory_type = ? AND content = ? AND project = ? AND status = ?",
|
||
(memory_type, content, project, status),
|
||
).fetchone()
|
||
if existing:
|
||
log.info(
|
||
"memory_duplicate_skipped",
|
||
memory_type=memory_type,
|
||
status=status,
|
||
content_preview=content[:80],
|
||
)
|
||
return _row_to_memory(
|
||
conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone()
|
||
)
|
||
|
||
conn.execute(
|
||
"INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) "
|
||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
(memory_id, memory_type, content, project, source_chunk_id or None, confidence, status),
|
||
)
|
||
|
||
log.info(
|
||
"memory_created",
|
||
memory_type=memory_type,
|
||
status=status,
|
||
content_preview=content[:80],
|
||
)
|
||
|
||
return Memory(
|
||
id=memory_id,
|
||
memory_type=memory_type,
|
||
content=content,
|
||
project=project,
|
||
source_chunk_id=source_chunk_id,
|
||
confidence=confidence,
|
||
status=status,
|
||
created_at=now,
|
||
updated_at=now,
|
||
last_referenced_at="",
|
||
reference_count=0,
|
||
)
|
||
|
||
|
||
def get_memories(
|
||
memory_type: str | None = None,
|
||
project: str | None = None,
|
||
active_only: bool = True,
|
||
min_confidence: float = 0.0,
|
||
limit: int = 50,
|
||
status: str | None = None,
|
||
) -> list[Memory]:
|
||
"""Retrieve memories, optionally filtered.
|
||
|
||
When ``status`` is provided explicitly, it takes precedence over
|
||
``active_only`` so callers can list the candidate review queue via
|
||
``get_memories(status='candidate')``. When ``status`` is omitted the
|
||
legacy ``active_only`` behaviour still applies.
|
||
"""
|
||
if status is not None and status not in MEMORY_STATUSES:
|
||
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
|
||
|
||
query = "SELECT * FROM memories WHERE 1=1"
|
||
params: list = []
|
||
|
||
if memory_type:
|
||
query += " AND memory_type = ?"
|
||
params.append(memory_type)
|
||
if project is not None:
|
||
# Canonicalize on the read side so a caller passing an alias
|
||
# finds rows that were stored under the canonical id (and
|
||
# vice versa). resolve_project_name returns the input
|
||
# unchanged for unregistered names so empty-string queries
|
||
# for "no project scope" still work.
|
||
query += " AND project = ?"
|
||
params.append(resolve_project_name(project))
|
||
if status is not None:
|
||
query += " AND status = ?"
|
||
params.append(status)
|
||
elif active_only:
|
||
query += " AND status = 'active'"
|
||
if min_confidence > 0:
|
||
query += " AND confidence >= ?"
|
||
params.append(min_confidence)
|
||
|
||
query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?"
|
||
params.append(limit)
|
||
|
||
with get_connection() as conn:
|
||
rows = conn.execute(query, params).fetchall()
|
||
|
||
return [_row_to_memory(r) for r in rows]
|
||
|
||
|
||
def update_memory(
|
||
memory_id: str,
|
||
content: str | None = None,
|
||
confidence: float | None = None,
|
||
status: str | None = None,
|
||
) -> bool:
|
||
"""Update an existing memory."""
|
||
with get_connection() as conn:
|
||
existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
|
||
if existing is None:
|
||
return False
|
||
|
||
next_content = content if content is not None else existing["content"]
|
||
next_status = status if status is not None else existing["status"]
|
||
if confidence is not None:
|
||
_validate_confidence(confidence)
|
||
|
||
if next_status == "active":
|
||
duplicate = conn.execute(
|
||
"SELECT id FROM memories "
|
||
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
|
||
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
|
||
).fetchone()
|
||
if duplicate:
|
||
raise ValueError("Update would create a duplicate active memory")
|
||
|
||
updates = []
|
||
params: list = []
|
||
|
||
if content is not None:
|
||
updates.append("content = ?")
|
||
params.append(content)
|
||
if confidence is not None:
|
||
updates.append("confidence = ?")
|
||
params.append(confidence)
|
||
if status is not None:
|
||
if status not in MEMORY_STATUSES:
|
||
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
|
||
updates.append("status = ?")
|
||
params.append(status)
|
||
|
||
if not updates:
|
||
return False
|
||
|
||
updates.append("updated_at = CURRENT_TIMESTAMP")
|
||
params.append(memory_id)
|
||
|
||
result = conn.execute(
|
||
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
|
||
if result.rowcount > 0:
|
||
log.info("memory_updated", memory_id=memory_id)
|
||
return True
|
||
return False
|
||
|
||
|
||
def invalidate_memory(memory_id: str) -> bool:
|
||
"""Mark a memory as invalid (error correction)."""
|
||
return update_memory(memory_id, status="invalid")
|
||
|
||
|
||
def supersede_memory(memory_id: str) -> bool:
|
||
"""Mark a memory as superseded (replaced by newer info)."""
|
||
return update_memory(memory_id, status="superseded")
|
||
|
||
|
||
def promote_memory(memory_id: str) -> bool:
|
||
"""Promote a candidate memory to active (Phase 9 Commit C review queue).
|
||
|
||
Returns False if the memory does not exist or is not currently a
|
||
candidate. Raises ValueError only if the promotion would create a
|
||
duplicate active memory (delegates to update_memory's existing check).
|
||
"""
|
||
with get_connection() as conn:
|
||
row = conn.execute(
|
||
"SELECT status FROM memories WHERE id = ?", (memory_id,)
|
||
).fetchone()
|
||
if row is None:
|
||
return False
|
||
if row["status"] != "candidate":
|
||
return False
|
||
return update_memory(memory_id, status="active")
|
||
|
||
|
||
def reject_candidate_memory(memory_id: str) -> bool:
|
||
"""Reject a candidate memory (Phase 9 Commit C).
|
||
|
||
Sets the candidate's status to ``invalid`` so it drops out of the
|
||
review queue without polluting the active set. Returns False if the
|
||
memory does not exist or is not currently a candidate.
|
||
"""
|
||
with get_connection() as conn:
|
||
row = conn.execute(
|
||
"SELECT status FROM memories WHERE id = ?", (memory_id,)
|
||
).fetchone()
|
||
if row is None:
|
||
return False
|
||
if row["status"] != "candidate":
|
||
return False
|
||
return update_memory(memory_id, status="invalid")
|
||
|
||
|
||
def reinforce_memory(
|
||
memory_id: str,
|
||
confidence_delta: float = 0.02,
|
||
) -> tuple[bool, float, float]:
|
||
"""Bump a memory's confidence and reference count (Phase 9 Commit B).
|
||
|
||
Returns a 3-tuple ``(applied, old_confidence, new_confidence)``.
|
||
``applied`` is False if the memory does not exist or is not in the
|
||
``active`` state — reinforcement only touches live memories so the
|
||
candidate queue and invalidated history are never silently revived.
|
||
|
||
Confidence is capped at 1.0. last_referenced_at is set to the current
|
||
UTC time in SQLite-comparable format. reference_count is incremented
|
||
by one per call (not per delta amount).
|
||
"""
|
||
if confidence_delta < 0:
|
||
raise ValueError("confidence_delta must be non-negative for reinforcement")
|
||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||
with get_connection() as conn:
|
||
row = conn.execute(
|
||
"SELECT confidence, status FROM memories WHERE id = ?", (memory_id,)
|
||
).fetchone()
|
||
if row is None or row["status"] != "active":
|
||
return False, 0.0, 0.0
|
||
old_confidence = float(row["confidence"])
|
||
new_confidence = min(1.0, old_confidence + confidence_delta)
|
||
conn.execute(
|
||
"UPDATE memories SET confidence = ?, last_referenced_at = ?, "
|
||
"reference_count = COALESCE(reference_count, 0) + 1 "
|
||
"WHERE id = ?",
|
||
(new_confidence, now, memory_id),
|
||
)
|
||
log.info(
|
||
"memory_reinforced",
|
||
memory_id=memory_id,
|
||
old_confidence=round(old_confidence, 4),
|
||
new_confidence=round(new_confidence, 4),
|
||
)
|
||
return True, old_confidence, new_confidence
|
||
|
||
|
||
def get_memories_for_context(
|
||
memory_types: list[str] | None = None,
|
||
project: str | None = None,
|
||
budget: int = 500,
|
||
header: str = "--- AtoCore Memory ---",
|
||
footer: str = "--- End Memory ---",
|
||
query: str | None = None,
|
||
) -> tuple[str, int]:
|
||
"""Get formatted memories for context injection.
|
||
|
||
Returns (formatted_text, char_count).
|
||
|
||
Budget allocation per Master Plan section 9:
|
||
identity: 5%, preference: 5%, rest from retrieval budget
|
||
|
||
The caller can override ``header`` / ``footer`` to distinguish
|
||
multiple memory blocks in the same pack (e.g. identity/preference
|
||
vs project/knowledge memories).
|
||
|
||
When ``query`` is provided, candidates within each memory type
|
||
are ranked by lexical overlap against the query (stemmed token
|
||
intersection, ties broken by confidence). Without a query,
|
||
candidates fall through in the order ``get_memories`` returns
|
||
them — which is effectively "by confidence desc".
|
||
"""
|
||
if memory_types is None:
|
||
memory_types = ["identity", "preference"]
|
||
|
||
if budget <= 0:
|
||
return "", 0
|
||
wrapper_chars = len(header) + len(footer) + 2
|
||
if budget <= wrapper_chars:
|
||
return "", 0
|
||
|
||
available = budget - wrapper_chars
|
||
selected_entries: list[str] = []
|
||
used = 0
|
||
|
||
# Pre-tokenize the query once. ``_score_memory_for_query`` is a
|
||
# free function below that reuses the reinforcement tokenizer so
|
||
# lexical scoring here matches the reinforcement matcher.
|
||
query_tokens: set[str] | None = None
|
||
if query:
|
||
from atocore.memory.reinforcement import _normalize, _tokenize
|
||
|
||
query_tokens = _tokenize(_normalize(query))
|
||
if not query_tokens:
|
||
query_tokens = None
|
||
|
||
# Collect ALL candidates across the requested types into one
|
||
# pool, then rank globally before the budget walk. Ranking per
|
||
# type and walking types in order would starve later types when
|
||
# the first type's candidates filled the budget — even if a
|
||
# later-type candidate matched the query perfectly. Type order
|
||
# is preserved as a stable tiebreaker inside
|
||
# ``_rank_memories_for_query`` via Python's stable sort.
|
||
pool: list[Memory] = []
|
||
seen_ids: set[str] = set()
|
||
for mtype in memory_types:
|
||
for mem in get_memories(
|
||
memory_type=mtype,
|
||
project=project,
|
||
min_confidence=0.5,
|
||
limit=30,
|
||
):
|
||
if mem.id in seen_ids:
|
||
continue
|
||
seen_ids.add(mem.id)
|
||
pool.append(mem)
|
||
|
||
if query_tokens is not None:
|
||
pool = _rank_memories_for_query(pool, query_tokens)
|
||
|
||
for mem in pool:
|
||
entry = f"[{mem.memory_type}] {mem.content}"
|
||
entry_len = len(entry) + 1
|
||
if entry_len > available - used:
|
||
continue
|
||
selected_entries.append(entry)
|
||
used += entry_len
|
||
|
||
if not selected_entries:
|
||
return "", 0
|
||
|
||
lines = [header, *selected_entries, footer]
|
||
text = "\n".join(lines)
|
||
|
||
log.info("memories_for_context", count=len(selected_entries), chars=len(text))
|
||
return text, len(text)
|
||
|
||
|
||
def _rank_memories_for_query(
|
||
memories: list["Memory"],
|
||
query_tokens: set[str],
|
||
) -> list["Memory"]:
|
||
"""Rerank a memory list by lexical overlap with a pre-tokenized query.
|
||
|
||
Ordering key: (overlap_count DESC, confidence DESC). When a query
|
||
shares no tokens with a memory, overlap is zero and confidence
|
||
acts as the sole tiebreaker — which matches the pre-query
|
||
behaviour and keeps no-query calls stable.
|
||
"""
|
||
from atocore.memory.reinforcement import _normalize, _tokenize
|
||
|
||
scored: list[tuple[int, float, Memory]] = []
|
||
for mem in memories:
|
||
mem_tokens = _tokenize(_normalize(mem.content))
|
||
overlap = len(mem_tokens & query_tokens) if mem_tokens else 0
|
||
scored.append((overlap, mem.confidence, mem))
|
||
scored.sort(key=lambda t: (t[0], t[1]), reverse=True)
|
||
return [mem for _, _, mem in scored]
|
||
|
||
|
||
def _row_to_memory(row) -> Memory:
|
||
"""Convert a DB row to Memory dataclass."""
|
||
keys = row.keys() if hasattr(row, "keys") else []
|
||
last_ref = row["last_referenced_at"] if "last_referenced_at" in keys else None
|
||
ref_count = row["reference_count"] if "reference_count" in keys else 0
|
||
return Memory(
|
||
id=row["id"],
|
||
memory_type=row["memory_type"],
|
||
content=row["content"],
|
||
project=row["project"] or "",
|
||
source_chunk_id=row["source_chunk_id"] or "",
|
||
confidence=row["confidence"],
|
||
status=row["status"],
|
||
created_at=row["created_at"],
|
||
updated_at=row["updated_at"],
|
||
last_referenced_at=last_ref or "",
|
||
reference_count=int(ref_count or 0),
|
||
)
|
||
|
||
|
||
def _validate_confidence(confidence: float) -> None:
|
||
if not 0.0 <= confidence <= 1.0:
|
||
raise ValueError("Confidence must be between 0.0 and 1.0")
|