Files
ATOCore/src/atocore/memory/service.py
Anto01 fb6298a9a1 fix(P1+P2): canonicalize project names at every trust boundary
Three findings from codex's review of the previous P1+P2 fix. The
earlier commit (f2372ef) only fixed alias resolution at the context
builder. Codex correctly pointed out that the same fragmentation
applies at every other place a project name crosses a boundary —
project_state writes/reads, interaction capture/listing/filtering,
memory create/queries, and reinforcement's downstream queries. Plus
a real bug in the interaction `since` filter where the storage
format and the documented ISO format don't compare cleanly.

The fix is one helper used at every boundary instead of duplicating
the resolution inline.

New helper: src/atocore/projects/registry.py::resolve_project_name
---------------------------------------------------------------
- Single canonicalization boundary for project names
- Returns the canonical project_id when the input matches any
  registered id or alias
- Returns the input unchanged for empty/None and for unregistered
  names (preserves backwards compat with hand-curated state that
  predates the registry)
- Documented as the contract that every read/write at the trust
  boundary should pass through

P1 — Trusted Project State endpoints
------------------------------------
src/atocore/context/project_state.py: set_state, get_state, and
invalidate_state now all canonicalize project_name through
resolve_project_name BEFORE looking up or creating the project row.

Before this fix:
- POST /project/state with project="p05" called ensure_project("p05")
  which created a separate row in the projects table
- The state row was attached to that alias project_id
- Later context builds canonicalized "p05" -> "p05-interferometer"
  via the builder fix from f2372ef and never found the state
- Result: trusted state silently fragmented across alias rows

After this fix:
- The alias is resolved to the canonical id at every entry point
- Two captures (one via "p05", one via "p05-interferometer") write
  to the same row
- get_state via either alias or the canonical id finds the same row

Fixes the highest-priority gap codex flagged because Trusted Project
State is supposed to be the most dependable layer in the AtoCore
trust hierarchy.

P2.a — Interaction capture project canonicalization
----------------------------------------------------
src/atocore/interactions/service.py: record_interaction now
canonicalizes project before storing, so interaction.project is
always the canonical id regardless of what the client passed.

Downstream effects:
- reinforce_from_interaction queries memories by interaction.project
  -> previously missed memories stored under canonical id
  -> now consistent because interaction.project IS the canonical id
- the extractor stamps candidates with interaction.project
  -> previously created candidates in alias buckets
  -> now creates candidates in the canonical bucket
- list_interactions(project=alias) was already broken, now fixed by
  canonicalizing the filter input on the read side too

Memory service applied the same fix:
- src/atocore/memory/service.py: create_memory and get_memories
  both canonicalize project through resolve_project_name
- This keeps stored memory.project consistent with the
  reinforcement query path

P2.b — Interaction `since` filter format normalization
------------------------------------------------------
src/atocore/interactions/service.py: new _normalize_since helper.

The bug:
- created_at is stored as 'YYYY-MM-DD HH:MM:SS' (no timezone, UTC by
  convention) so it sorts lexically and compares cleanly with the
  SQLite CURRENT_TIMESTAMP default
- The `since` parameter was documented as ISO 8601 but compared as
  a raw string against the storage format
- The lexically-greater 'T' separator means an ISO timestamp like
  '2026-04-07T12:00:00Z' is GREATER than the storage form
  '2026-04-07 12:00:00' for the same instant
- Result: a client passing ISO `since` got an empty result for any
  row from the same day, even though those rows existed and were
  technically "after" the cutoff in real-world time

The fix:
- _normalize_since accepts ISO 8601 with T, optional Z suffix,
  optional fractional seconds, optional +HH:MM offsets
- Uses datetime.fromisoformat for parsing (Python 3.11+)
- Converts to UTC and reformats as the storage format before the
  SQL comparison
- The bare storage format still works (backwards compat path is a
  regex match that returns the input unchanged)
- Unparseable input is returned as-is so the comparison degrades
  gracefully (rows just don't match) instead of raising and
  breaking the listing endpoint

builder.py refactor
-------------------
The previous P1 fix had inline canonicalization. Now it uses the
shared helper for consistency:
- import changed from get_registered_project to resolve_project_name
- the inline lookup is replaced with a single helper call
- the comment block now points at representation-authority.md for
  the canonicalization contract

New shared test fixture: tests/conftest.py::project_registry
------------------------------------------------------------
- Standardizes the registry-setup pattern that was duplicated
  across test_context_builder.py, test_project_state.py,
  test_interactions.py, and test_reinforcement.py
- Returns a callable that takes (project_id, [aliases]) tuples
  and writes them into a temp registry file with the env var
  pointed at it and config.settings reloaded
- Used by all 12 new regression tests in this commit

Tests (12 new, all green on first run)
--------------------------------------
test_project_state.py:
- test_set_state_canonicalizes_alias: write via alias, read via
  every alias and the canonical id, verify same row id
- test_get_state_canonicalizes_alias_after_canonical_write
- test_invalidate_state_canonicalizes_alias
- test_unregistered_project_state_still_works (backwards compat)

test_interactions.py:
- test_record_interaction_canonicalizes_project
- test_list_interactions_canonicalizes_project_filter
- test_list_interactions_since_accepts_iso_with_t_separator
- test_list_interactions_since_accepts_z_suffix
- test_list_interactions_since_accepts_offset
- test_list_interactions_since_storage_format_still_works

test_reinforcement.py:
- test_reinforcement_works_when_capture_uses_alias (end-to-end:
  capture under alias, seed memory under canonical, verify
  reinforcement matches)
- test_get_memories_filter_by_alias

Full suite: 174 passing (was 162), 1 warning. The +12 is the
new regression tests, no existing tests regressed.

What's still NOT canonicalized (and why)
----------------------------------------
- _rank_chunks's secondary substring boost in builder.py — the
  retriever already does the right thing via its own
  _project_match_boost which calls get_registered_project. The
  redundant secondary boost still uses the raw hint but it's a
  multiplicative factor on top of correct retrieval, not a
  filter, so it can't drop relevant chunks. Tracked as a future
  cleanup but not a P1.
- update_memory's project field (you can't change a memory's
  project after creation in the API anyway).
- The retriever's project_hint parameter on direct /query calls
  — same reasoning as the builder boost, plus the retriever's
  own get_registered_project call already handles aliases there.
2026-04-07 08:29:33 -04:00

420 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Memory Core — structured memory management.
Memory types (per Master Plan):
- identity: who the user is, role, background
- preference: how they like to work, style, tools
- project: project-specific knowledge and context
- episodic: what happened, conversations, events
- knowledge: verified facts, technical knowledge
- adaptation: learned corrections, behavioral adjustments
Memories have:
- confidence (0.01.0): how certain we are
- status: lifecycle state, one of MEMORY_STATUSES
* candidate: extracted from an interaction, awaiting human review
(Phase 9 Commit C). Candidates are NEVER included in
context packs.
* active: promoted/curated, visible to retrieval and context
* superseded: replaced by a newer entry
* invalid: rejected / error-corrected
- last_referenced_at / reference_count: reinforcement signal
(Phase 9 Commit B). Bumped whenever a captured interaction's
response content echoes this memory.
- optional link to source chunk: traceability
"""
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
from atocore.projects.registry import resolve_project_name
log = get_logger("memory")
MEMORY_TYPES = [
"identity",
"preference",
"project",
"episodic",
"knowledge",
"adaptation",
]
MEMORY_STATUSES = [
"candidate",
"active",
"superseded",
"invalid",
]
@dataclass
class Memory:
id: str
memory_type: str
content: str
project: str
source_chunk_id: str
confidence: float
status: str
created_at: str
updated_at: str
last_referenced_at: str = ""
reference_count: int = 0
def create_memory(
memory_type: str,
content: str,
project: str = "",
source_chunk_id: str = "",
confidence: float = 1.0,
status: str = "active",
) -> Memory:
"""Create a new memory entry.
``status`` defaults to ``active`` for backward compatibility. Pass
``candidate`` when the memory is being proposed by the Phase 9 Commit C
extractor and still needs human review before it can influence context.
"""
if memory_type not in MEMORY_TYPES:
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
if status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
_validate_confidence(confidence)
# Canonicalize the project through the registry so an alias and
# the canonical id store under the same bucket. This keeps
# reinforcement queries (which use the interaction's project) and
# context retrieval (which uses the registry-canonicalized hint)
# consistent with how memories are created.
project = resolve_project_name(project)
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# Check for duplicate content within the same type+project at the same status.
# Scoping by status keeps active curation separate from the candidate
# review queue: a candidate and an active memory with identical text can
# legitimately coexist if the candidate is a fresh extraction of something
# already curated.
with get_connection() as conn:
existing = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = ?",
(memory_type, content, project, status),
).fetchone()
if existing:
log.info(
"memory_duplicate_skipped",
memory_type=memory_type,
status=status,
content_preview=content[:80],
)
return _row_to_memory(
conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone()
)
conn.execute(
"INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(memory_id, memory_type, content, project, source_chunk_id or None, confidence, status),
)
log.info(
"memory_created",
memory_type=memory_type,
status=status,
content_preview=content[:80],
)
return Memory(
id=memory_id,
memory_type=memory_type,
content=content,
project=project,
source_chunk_id=source_chunk_id,
confidence=confidence,
status=status,
created_at=now,
updated_at=now,
last_referenced_at="",
reference_count=0,
)
def get_memories(
memory_type: str | None = None,
project: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
status: str | None = None,
) -> list[Memory]:
"""Retrieve memories, optionally filtered.
When ``status`` is provided explicitly, it takes precedence over
``active_only`` so callers can list the candidate review queue via
``get_memories(status='candidate')``. When ``status`` is omitted the
legacy ``active_only`` behaviour still applies.
"""
if status is not None and status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
query = "SELECT * FROM memories WHERE 1=1"
params: list = []
if memory_type:
query += " AND memory_type = ?"
params.append(memory_type)
if project is not None:
# Canonicalize on the read side so a caller passing an alias
# finds rows that were stored under the canonical id (and
# vice versa). resolve_project_name returns the input
# unchanged for unregistered names so empty-string queries
# for "no project scope" still work.
query += " AND project = ?"
params.append(resolve_project_name(project))
if status is not None:
query += " AND status = ?"
params.append(status)
elif active_only:
query += " AND status = 'active'"
if min_confidence > 0:
query += " AND confidence >= ?"
params.append(min_confidence)
query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?"
params.append(limit)
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_memory(r) for r in rows]
def update_memory(
memory_id: str,
content: str | None = None,
confidence: float | None = None,
status: str | None = None,
) -> bool:
"""Update an existing memory."""
with get_connection() as conn:
existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
if existing is None:
return False
next_content = content if content is not None else existing["content"]
next_status = status if status is not None else existing["status"]
if confidence is not None:
_validate_confidence(confidence)
if next_status == "active":
duplicate = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
).fetchone()
if duplicate:
raise ValueError("Update would create a duplicate active memory")
updates = []
params: list = []
if content is not None:
updates.append("content = ?")
params.append(content)
if confidence is not None:
updates.append("confidence = ?")
params.append(confidence)
if status is not None:
if status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
updates.append("status = ?")
params.append(status)
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(memory_id)
result = conn.execute(
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?",
params,
)
if result.rowcount > 0:
log.info("memory_updated", memory_id=memory_id)
return True
return False
def invalidate_memory(memory_id: str) -> bool:
"""Mark a memory as invalid (error correction)."""
return update_memory(memory_id, status="invalid")
def supersede_memory(memory_id: str) -> bool:
"""Mark a memory as superseded (replaced by newer info)."""
return update_memory(memory_id, status="superseded")
def promote_memory(memory_id: str) -> bool:
"""Promote a candidate memory to active (Phase 9 Commit C review queue).
Returns False if the memory does not exist or is not currently a
candidate. Raises ValueError only if the promotion would create a
duplicate active memory (delegates to update_memory's existing check).
"""
with get_connection() as conn:
row = conn.execute(
"SELECT status FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if row is None:
return False
if row["status"] != "candidate":
return False
return update_memory(memory_id, status="active")
def reject_candidate_memory(memory_id: str) -> bool:
"""Reject a candidate memory (Phase 9 Commit C).
Sets the candidate's status to ``invalid`` so it drops out of the
review queue without polluting the active set. Returns False if the
memory does not exist or is not currently a candidate.
"""
with get_connection() as conn:
row = conn.execute(
"SELECT status FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if row is None:
return False
if row["status"] != "candidate":
return False
return update_memory(memory_id, status="invalid")
def reinforce_memory(
memory_id: str,
confidence_delta: float = 0.02,
) -> tuple[bool, float, float]:
"""Bump a memory's confidence and reference count (Phase 9 Commit B).
Returns a 3-tuple ``(applied, old_confidence, new_confidence)``.
``applied`` is False if the memory does not exist or is not in the
``active`` state — reinforcement only touches live memories so the
candidate queue and invalidated history are never silently revived.
Confidence is capped at 1.0. last_referenced_at is set to the current
UTC time in SQLite-comparable format. reference_count is incremented
by one per call (not per delta amount).
"""
if confidence_delta < 0:
raise ValueError("confidence_delta must be non-negative for reinforcement")
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with get_connection() as conn:
row = conn.execute(
"SELECT confidence, status FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if row is None or row["status"] != "active":
return False, 0.0, 0.0
old_confidence = float(row["confidence"])
new_confidence = min(1.0, old_confidence + confidence_delta)
conn.execute(
"UPDATE memories SET confidence = ?, last_referenced_at = ?, "
"reference_count = COALESCE(reference_count, 0) + 1 "
"WHERE id = ?",
(new_confidence, now, memory_id),
)
log.info(
"memory_reinforced",
memory_id=memory_id,
old_confidence=round(old_confidence, 4),
new_confidence=round(new_confidence, 4),
)
return True, old_confidence, new_confidence
def get_memories_for_context(
memory_types: list[str] | None = None,
project: str | None = None,
budget: int = 500,
) -> tuple[str, int]:
"""Get formatted memories for context injection.
Returns (formatted_text, char_count).
Budget allocation per Master Plan section 9:
identity: 5%, preference: 5%, rest from retrieval budget
"""
if memory_types is None:
memory_types = ["identity", "preference"]
if budget <= 0:
return "", 0
header = "--- AtoCore Memory ---"
footer = "--- End Memory ---"
wrapper_chars = len(header) + len(footer) + 2
if budget <= wrapper_chars:
return "", 0
available = budget - wrapper_chars
selected_entries: list[str] = []
for index, mtype in enumerate(memory_types):
type_budget = available if index == len(memory_types) - 1 else max(0, available // (len(memory_types) - index))
type_used = 0
for mem in get_memories(
memory_type=mtype,
project=project,
min_confidence=0.5,
limit=10,
):
entry = f"[{mem.memory_type}] {mem.content}"
entry_len = len(entry) + 1
if entry_len > type_budget - type_used:
continue
selected_entries.append(entry)
type_used += entry_len
available -= type_used
if not selected_entries:
return "", 0
lines = [header, *selected_entries, footer]
text = "\n".join(lines)
log.info("memories_for_context", count=len(selected_entries), chars=len(text))
return text, len(text)
def _row_to_memory(row) -> Memory:
"""Convert a DB row to Memory dataclass."""
keys = row.keys() if hasattr(row, "keys") else []
last_ref = row["last_referenced_at"] if "last_referenced_at" in keys else None
ref_count = row["reference_count"] if "reference_count" in keys else 0
return Memory(
id=row["id"],
memory_type=row["memory_type"],
content=row["content"],
project=row["project"] or "",
source_chunk_id=row["source_chunk_id"] or "",
confidence=row["confidence"],
status=row["status"],
created_at=row["created_at"],
updated_at=row["updated_at"],
last_referenced_at=last_ref or "",
reference_count=int(ref_count or 0),
)
def _validate_confidence(confidence: float) -> None:
if not 0.0 <= confidence <= 1.0:
raise ValueError("Confidence must be between 0.0 and 1.0")