Files
ATOCore/src/atocore/memory/service.py
Anto01 8951c624fe fix(R7/R9): overlap-density ranking + project trust-preservation
R7: ranking scorer now uses overlap-density (overlap_count /
memory_token_count) as primary key instead of raw overlap count.
A 5-token memory with 3 overlapping tokens (density 0.6) now beats
a 40-token overview memory with 3 overlapping tokens (density 0.075)
at the same absolute count. Secondary: absolute overlap. Tertiary:
confidence. Targeting p06-firmware-interface harness fixture.

R9: when the LLM extractor returns a project that differs from the
interaction's known project, it now checks the project registry.
If the model's project is a registered canonical ID, trust it. If
not (hallucinated name), fall back to the interaction's project.
Uses load_project_registry() for the check. The host-side script
mirrors this via an API call to GET /projects at startup.

Two new tests: test_parser_keeps_registered_model_project and
test_parser_rejects_hallucinated_project.

Test count: 280 -> 281.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 14:34:33 -04:00

495 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Memory Core — structured memory management.
Memory types (per Master Plan):
- identity: who the user is, role, background
- preference: how they like to work, style, tools
- project: project-specific knowledge and context
- episodic: what happened, conversations, events
- knowledge: verified facts, technical knowledge
- adaptation: learned corrections, behavioral adjustments
Memories have:
- confidence (0.01.0): how certain we are
- status: lifecycle state, one of MEMORY_STATUSES
* candidate: extracted from an interaction, awaiting human review
(Phase 9 Commit C). Candidates are NEVER included in
context packs.
* active: promoted/curated, visible to retrieval and context
* superseded: replaced by a newer entry
* invalid: rejected / error-corrected
- last_referenced_at / reference_count: reinforcement signal
(Phase 9 Commit B). Bumped whenever a captured interaction's
response content echoes this memory.
- optional link to source chunk: traceability
"""
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
from atocore.projects.registry import resolve_project_name
log = get_logger("memory")
MEMORY_TYPES = [
"identity",
"preference",
"project",
"episodic",
"knowledge",
"adaptation",
]
MEMORY_STATUSES = [
"candidate",
"active",
"superseded",
"invalid",
]
@dataclass
class Memory:
id: str
memory_type: str
content: str
project: str
source_chunk_id: str
confidence: float
status: str
created_at: str
updated_at: str
last_referenced_at: str = ""
reference_count: int = 0
def create_memory(
memory_type: str,
content: str,
project: str = "",
source_chunk_id: str = "",
confidence: float = 1.0,
status: str = "active",
) -> Memory:
"""Create a new memory entry.
``status`` defaults to ``active`` for backward compatibility. Pass
``candidate`` when the memory is being proposed by the Phase 9 Commit C
extractor and still needs human review before it can influence context.
"""
if memory_type not in MEMORY_TYPES:
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
if status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
_validate_confidence(confidence)
# Canonicalize the project through the registry so an alias and
# the canonical id store under the same bucket. This keeps
# reinforcement queries (which use the interaction's project) and
# context retrieval (which uses the registry-canonicalized hint)
# consistent with how memories are created.
project = resolve_project_name(project)
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# Check for duplicate content within the same type+project at the same status.
# Scoping by status keeps active curation separate from the candidate
# review queue: a candidate and an active memory with identical text can
# legitimately coexist if the candidate is a fresh extraction of something
# already curated.
with get_connection() as conn:
existing = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = ?",
(memory_type, content, project, status),
).fetchone()
if existing:
log.info(
"memory_duplicate_skipped",
memory_type=memory_type,
status=status,
content_preview=content[:80],
)
return _row_to_memory(
conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone()
)
conn.execute(
"INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(memory_id, memory_type, content, project, source_chunk_id or None, confidence, status),
)
log.info(
"memory_created",
memory_type=memory_type,
status=status,
content_preview=content[:80],
)
return Memory(
id=memory_id,
memory_type=memory_type,
content=content,
project=project,
source_chunk_id=source_chunk_id,
confidence=confidence,
status=status,
created_at=now,
updated_at=now,
last_referenced_at="",
reference_count=0,
)
def get_memories(
memory_type: str | None = None,
project: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
status: str | None = None,
) -> list[Memory]:
"""Retrieve memories, optionally filtered.
When ``status`` is provided explicitly, it takes precedence over
``active_only`` so callers can list the candidate review queue via
``get_memories(status='candidate')``. When ``status`` is omitted the
legacy ``active_only`` behaviour still applies.
"""
if status is not None and status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
query = "SELECT * FROM memories WHERE 1=1"
params: list = []
if memory_type:
query += " AND memory_type = ?"
params.append(memory_type)
if project is not None:
# Canonicalize on the read side so a caller passing an alias
# finds rows that were stored under the canonical id (and
# vice versa). resolve_project_name returns the input
# unchanged for unregistered names so empty-string queries
# for "no project scope" still work.
query += " AND project = ?"
params.append(resolve_project_name(project))
if status is not None:
query += " AND status = ?"
params.append(status)
elif active_only:
query += " AND status = 'active'"
if min_confidence > 0:
query += " AND confidence >= ?"
params.append(min_confidence)
query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?"
params.append(limit)
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_memory(r) for r in rows]
def update_memory(
memory_id: str,
content: str | None = None,
confidence: float | None = None,
status: str | None = None,
) -> bool:
"""Update an existing memory."""
with get_connection() as conn:
existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
if existing is None:
return False
next_content = content if content is not None else existing["content"]
next_status = status if status is not None else existing["status"]
if confidence is not None:
_validate_confidence(confidence)
if next_status == "active":
duplicate = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
).fetchone()
if duplicate:
raise ValueError("Update would create a duplicate active memory")
updates = []
params: list = []
if content is not None:
updates.append("content = ?")
params.append(content)
if confidence is not None:
updates.append("confidence = ?")
params.append(confidence)
if status is not None:
if status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
updates.append("status = ?")
params.append(status)
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(memory_id)
result = conn.execute(
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?",
params,
)
if result.rowcount > 0:
log.info("memory_updated", memory_id=memory_id)
return True
return False
def invalidate_memory(memory_id: str) -> bool:
"""Mark a memory as invalid (error correction)."""
return update_memory(memory_id, status="invalid")
def supersede_memory(memory_id: str) -> bool:
"""Mark a memory as superseded (replaced by newer info)."""
return update_memory(memory_id, status="superseded")
def promote_memory(memory_id: str) -> bool:
"""Promote a candidate memory to active (Phase 9 Commit C review queue).
Returns False if the memory does not exist or is not currently a
candidate. Raises ValueError only if the promotion would create a
duplicate active memory (delegates to update_memory's existing check).
"""
with get_connection() as conn:
row = conn.execute(
"SELECT status FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if row is None:
return False
if row["status"] != "candidate":
return False
return update_memory(memory_id, status="active")
def reject_candidate_memory(memory_id: str) -> bool:
"""Reject a candidate memory (Phase 9 Commit C).
Sets the candidate's status to ``invalid`` so it drops out of the
review queue without polluting the active set. Returns False if the
memory does not exist or is not currently a candidate.
"""
with get_connection() as conn:
row = conn.execute(
"SELECT status FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if row is None:
return False
if row["status"] != "candidate":
return False
return update_memory(memory_id, status="invalid")
def reinforce_memory(
memory_id: str,
confidence_delta: float = 0.02,
) -> tuple[bool, float, float]:
"""Bump a memory's confidence and reference count (Phase 9 Commit B).
Returns a 3-tuple ``(applied, old_confidence, new_confidence)``.
``applied`` is False if the memory does not exist or is not in the
``active`` state — reinforcement only touches live memories so the
candidate queue and invalidated history are never silently revived.
Confidence is capped at 1.0. last_referenced_at is set to the current
UTC time in SQLite-comparable format. reference_count is incremented
by one per call (not per delta amount).
"""
if confidence_delta < 0:
raise ValueError("confidence_delta must be non-negative for reinforcement")
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
row = conn.execute(
"SELECT confidence, status FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if row is None or row["status"] != "active":
return False, 0.0, 0.0
old_confidence = float(row["confidence"])
new_confidence = min(1.0, old_confidence + confidence_delta)
conn.execute(
"UPDATE memories SET confidence = ?, last_referenced_at = ?, "
"reference_count = COALESCE(reference_count, 0) + 1 "
"WHERE id = ?",
(new_confidence, now, memory_id),
)
log.info(
"memory_reinforced",
memory_id=memory_id,
old_confidence=round(old_confidence, 4),
new_confidence=round(new_confidence, 4),
)
return True, old_confidence, new_confidence
def get_memories_for_context(
memory_types: list[str] | None = None,
project: str | None = None,
budget: int = 500,
header: str = "--- AtoCore Memory ---",
footer: str = "--- End Memory ---",
query: str | None = None,
) -> tuple[str, int]:
"""Get formatted memories for context injection.
Returns (formatted_text, char_count).
Budget allocation per Master Plan section 9:
identity: 5%, preference: 5%, rest from retrieval budget
The caller can override ``header`` / ``footer`` to distinguish
multiple memory blocks in the same pack (e.g. identity/preference
vs project/knowledge memories).
When ``query`` is provided, candidates within each memory type
are ranked by lexical overlap against the query (stemmed token
intersection, ties broken by confidence). Without a query,
candidates fall through in the order ``get_memories`` returns
them — which is effectively "by confidence desc".
"""
if memory_types is None:
memory_types = ["identity", "preference"]
if budget <= 0:
return "", 0
wrapper_chars = len(header) + len(footer) + 2
if budget <= wrapper_chars:
return "", 0
available = budget - wrapper_chars
selected_entries: list[str] = []
used = 0
# Pre-tokenize the query once. ``_score_memory_for_query`` is a
# free function below that reuses the reinforcement tokenizer so
# lexical scoring here matches the reinforcement matcher.
query_tokens: set[str] | None = None
if query:
from atocore.memory.reinforcement import _normalize, _tokenize
query_tokens = _tokenize(_normalize(query))
if not query_tokens:
query_tokens = None
# Collect ALL candidates across the requested types into one
# pool, then rank globally before the budget walk. Ranking per
# type and walking types in order would starve later types when
# the first type's candidates filled the budget — even if a
# later-type candidate matched the query perfectly. Type order
# is preserved as a stable tiebreaker inside
# ``_rank_memories_for_query`` via Python's stable sort.
pool: list[Memory] = []
seen_ids: set[str] = set()
for mtype in memory_types:
for mem in get_memories(
memory_type=mtype,
project=project,
min_confidence=0.5,
limit=30,
):
if mem.id in seen_ids:
continue
seen_ids.add(mem.id)
pool.append(mem)
if query_tokens is not None:
pool = _rank_memories_for_query(pool, query_tokens)
# Per-entry cap prevents a single long memory from monopolizing
# the band. With 16 p06 memories competing for ~700 chars, an
# uncapped 530-char overview memory fills the entire budget before
# a query-relevant 150-char memory gets a slot. The cap ensures at
# least 2-3 entries fit regardless of individual memory length.
max_entry_chars = 250
for mem in pool:
content = mem.content
if len(content) > max_entry_chars:
content = content[:max_entry_chars - 3].rstrip() + "..."
entry = f"[{mem.memory_type}] {content}"
entry_len = len(entry) + 1
if entry_len > available - used:
continue
selected_entries.append(entry)
used += entry_len
if not selected_entries:
return "", 0
lines = [header, *selected_entries, footer]
text = "\n".join(lines)
log.info("memories_for_context", count=len(selected_entries), chars=len(text))
return text, len(text)
def _rank_memories_for_query(
memories: list["Memory"],
query_tokens: set[str],
) -> list["Memory"]:
"""Rerank a memory list by lexical overlap with a pre-tokenized query.
Primary key: overlap_density (overlap_count / memory_token_count),
which rewards short focused memories that match the query precisely
over long overview memories that incidentally share a few tokens.
Secondary: absolute overlap count. Tertiary: confidence.
R7 fix: previously overlap_count alone was the primary key, so a
40-token overview memory with 3 overlapping tokens tied a 5-token
memory with 3 overlapping tokens, and the overview won on
confidence. Now the short memory's density (0.6) beats the
overview's density (0.075).
"""
from atocore.memory.reinforcement import _normalize, _tokenize
scored: list[tuple[float, int, float, Memory]] = []
for mem in memories:
mem_tokens = _tokenize(_normalize(mem.content))
overlap = len(mem_tokens & query_tokens) if mem_tokens else 0
density = overlap / len(mem_tokens) if mem_tokens else 0.0
scored.append((density, overlap, mem.confidence, mem))
scored.sort(key=lambda t: (t[0], t[1], t[2]), reverse=True)
return [mem for _, _, _, mem in scored]
def _row_to_memory(row) -> Memory:
"""Convert a DB row to Memory dataclass."""
keys = row.keys() if hasattr(row, "keys") else []
last_ref = row["last_referenced_at"] if "last_referenced_at" in keys else None
ref_count = row["reference_count"] if "reference_count" in keys else 0
return Memory(
id=row["id"],
memory_type=row["memory_type"],
content=row["content"],
project=row["project"] or "",
source_chunk_id=row["source_chunk_id"] or "",
confidence=row["confidence"],
status=row["status"],
created_at=row["created_at"],
updated_at=row["updated_at"],
last_referenced_at=last_ref or "",
reference_count=int(ref_count or 0),
)
def _validate_confidence(confidence: float) -> None:
if not 0.0 <= confidence <= 1.0:
raise ValueError("Confidence must be between 0.0 and 1.0")