')
for m in matching_mems:
proj = f' {m.project}' if m.project else ''
- lines.append(f'
[{m.memory_type}]{proj} {m.content[:200]}
')
+ tags_html = ""
+ if m.domain_tags:
+ tag_links = " ".join(
+ f'{t}'
+ for t in m.domain_tags[:5]
+ )
+ tags_html = f' {tag_links}'
+ expiry_html = ""
+ if m.valid_until:
+ expiry_html = f' valid until {m.valid_until[:10]}'
+ lines.append(
+ f'
')
if not entities and not matching_mems:
@@ -302,6 +319,11 @@ _TEMPLATE = """
.triage-notice { background: var(--card); border-left: 4px solid var(--accent); padding: 0.6rem 1rem; border-radius: 4px; margin: 0.8rem 0; }
.triage-warning { background: #fef3c7; color: #78350f; border-left: 4px solid #d97706; padding: 0.6rem 1rem; border-radius: 4px; margin: 0.8rem 0; }
@media (prefers-color-scheme: dark) { .triage-warning { background: #451a03; color: #fde68a; } }
+ .mem-tags { display: inline-flex; gap: 0.25rem; flex-wrap: wrap; vertical-align: middle; }
+ .tag-badge { background: var(--accent); color: white; padding: 0.1rem 0.5rem; border-radius: 10px; font-size: 0.7rem; font-family: monospace; text-decoration: none; font-weight: 500; }
+ .tag-badge:hover { opacity: 0.85; text-decoration: none; }
+ .mem-expiry { font-size: 0.75rem; color: #d97706; font-style: italic; margin-left: 0.4rem; }
+ @media (prefers-color-scheme: dark) { .mem-expiry { color: #fbbf24; } }
diff --git a/src/atocore/memory/_llm_prompt.py b/src/atocore/memory/_llm_prompt.py
index 2d0c1b0..d144838 100644
--- a/src/atocore/memory/_llm_prompt.py
+++ b/src/atocore/memory/_llm_prompt.py
@@ -21,7 +21,7 @@ from __future__ import annotations
import json
from typing import Any
-LLM_EXTRACTOR_VERSION = "llm-0.4.0"
+LLM_EXTRACTOR_VERSION = "llm-0.5.0"
MAX_RESPONSE_CHARS = 8000
MAX_PROMPT_CHARS = 2000
MEMORY_TYPES = {"identity", "preference", "project", "episodic", "knowledge", "adaptation"}
@@ -84,6 +84,36 @@ DOMAINS for knowledge candidates (required when type=knowledge and project is em
physics, materials, optics, mechanics, manufacturing, metrology,
controls, software, math, finance, business
+DOMAIN TAGS (Phase 3):
+Every candidate gets domain_tags — a lowercase list of topical keywords
+that describe the SUBJECT matter regardless of project. This is how
+cross-project retrieval works: a query about "optics" surfaces matches
+from p04 + p05 + p06 without naming each project.
+
+Good tags: single lowercase words or hyphenated terms.
+Examples:
+ - "ABB quote received for P04" → ["abb", "p04", "procurement", "optics"]
+ - "USB SSD mandatory on polisher" → ["p06", "firmware", "storage"]
+ - "CTE dominates WFE at F/1.2" → ["optics", "materials", "thermal"]
+ - "Antoine prefers OAuth over API keys" → ["security", "auth", "preference"]
+
+Tag 2-5 items. Use domain keywords (optics, thermal, firmware), project
+tokens when relevant (p04, abb), and lifecycle words (procurement, design,
+validation) as appropriate.
+
+VALID_UNTIL (Phase 3):
+A memory can have an expiry date if it describes time-bounded truth.
+Use valid_until for:
+ - Status snapshots: "current blocker is X" → valid_until = ~2 weeks out
+ - Scheduled events: "meeting with vendor Friday" → valid_until = meeting date
+ - Quotes with expiry: "quote valid until May 31"
+ - Interim decisions pending ratification
+Leave empty (null) for:
+ - Durable design decisions ("Option B selected")
+ - Engineering insights ("CTE dominates at F/1.2")
+ - Ratified requirements, architectural commitments
+Default = null (permanent). Format: ISO date "YYYY-MM-DD" or empty.
+
TRUST HIERARCHY:
- project-specific: set project to the project id, leave domain empty
@@ -99,7 +129,7 @@ OUTPUT RULES:
- Empty array [] is fine when the conversation has no durable signal
Each element:
-{"type": "project|knowledge|preference|adaptation|episodic", "content": "...", "project": "...", "domain": "", "confidence": 0.5}"""
+{"type": "project|knowledge|preference|adaptation|episodic", "content": "...", "project": "...", "domain": "", "confidence": 0.5, "domain_tags": ["tag1","tag2"], "valid_until": null}"""
def build_user_message(prompt: str, response: str, project_hint: str) -> str:
@@ -174,10 +204,36 @@ def normalize_candidate_item(item: dict[str, Any]) -> dict[str, Any] | None:
if domain and not model_project:
content = f"[{domain}] {content}"
+ # Phase 3: domain_tags + valid_until
+ raw_tags = item.get("domain_tags") or []
+ if isinstance(raw_tags, str):
+ # Tolerate comma-separated string fallback
+ raw_tags = [t.strip() for t in raw_tags.split(",") if t.strip()]
+ if not isinstance(raw_tags, list):
+ raw_tags = []
+ domain_tags = []
+ for t in raw_tags[:10]: # cap at 10
+ if not isinstance(t, str):
+ continue
+ tag = t.strip().lower()
+ if tag and tag not in domain_tags:
+ domain_tags.append(tag)
+
+ valid_until = item.get("valid_until")
+ if valid_until is not None:
+ valid_until = str(valid_until).strip()
+ # Accept ISO date "YYYY-MM-DD" or full timestamp; empty/"null" → none
+ if valid_until.lower() in ("", "null", "none", "permanent"):
+ valid_until = ""
+ else:
+ valid_until = ""
+
return {
"type": mem_type,
"content": content[:1000],
"project": model_project,
"domain": domain,
"confidence": confidence,
+ "domain_tags": domain_tags,
+ "valid_until": valid_until,
}
diff --git a/src/atocore/memory/service.py b/src/atocore/memory/service.py
index 2bce862..07cb2a2 100644
--- a/src/atocore/memory/service.py
+++ b/src/atocore/memory/service.py
@@ -63,6 +63,30 @@ class Memory:
updated_at: str
last_referenced_at: str = ""
reference_count: int = 0
+ domain_tags: list[str] | None = None
+ valid_until: str = "" # ISO UTC; empty = permanent
+
+
+def _normalize_tags(tags) -> list[str]:
+ """Coerce a tags value (list, JSON string, None) to a clean lowercase list."""
+ import json as _json
+ if tags is None:
+ return []
+ if isinstance(tags, str):
+ try:
+ tags = _json.loads(tags) if tags.strip().startswith("[") else []
+ except Exception:
+ tags = []
+ if not isinstance(tags, list):
+ return []
+ out = []
+ for t in tags:
+ if not isinstance(t, str):
+ continue
+ t = t.strip().lower()
+ if t and t not in out:
+ out.append(t)
+ return out
def create_memory(
@@ -72,34 +96,36 @@ def create_memory(
source_chunk_id: str = "",
confidence: float = 1.0,
status: str = "active",
+ domain_tags: list[str] | None = None,
+ valid_until: str = "",
) -> Memory:
"""Create a new memory entry.
``status`` defaults to ``active`` for backward compatibility. Pass
``candidate`` when the memory is being proposed by the Phase 9 Commit C
extractor and still needs human review before it can influence context.
+
+ Phase 3: ``domain_tags`` is a list of lowercase domain strings
+ (optics, mechanics, firmware, ...) for cross-project retrieval.
+ ``valid_until`` is an ISO UTC timestamp; memories with valid_until
+ in the past are excluded from context packs (but remain queryable).
"""
+ import json as _json
+
if memory_type not in MEMORY_TYPES:
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
if status not in MEMORY_STATUSES:
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
_validate_confidence(confidence)
- # Canonicalize the project through the registry so an alias and
- # the canonical id store under the same bucket. This keeps
- # reinforcement queries (which use the interaction's project) and
- # context retrieval (which uses the registry-canonicalized hint)
- # consistent with how memories are created.
project = resolve_project_name(project)
+ tags = _normalize_tags(domain_tags)
+ tags_json = _json.dumps(tags)
+ valid_until = (valid_until or "").strip() or None
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
- # Check for duplicate content within the same type+project at the same status.
- # Scoping by status keeps active curation separate from the candidate
- # review queue: a candidate and an active memory with identical text can
- # legitimately coexist if the candidate is a fresh extraction of something
- # already curated.
with get_connection() as conn:
existing = conn.execute(
"SELECT id FROM memories "
@@ -118,9 +144,11 @@ def create_memory(
)
conn.execute(
- "INSERT INTO memories (id, memory_type, content, project, source_chunk_id, confidence, status) "
- "VALUES (?, ?, ?, ?, ?, ?, ?)",
- (memory_id, memory_type, content, project, source_chunk_id or None, confidence, status),
+ "INSERT INTO memories (id, memory_type, content, project, source_chunk_id, "
+ "confidence, status, domain_tags, valid_until) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ (memory_id, memory_type, content, project, source_chunk_id or None,
+ confidence, status, tags_json, valid_until),
)
log.info(
@@ -128,6 +156,8 @@ def create_memory(
memory_type=memory_type,
status=status,
content_preview=content[:80],
+ tags=tags,
+ valid_until=valid_until or "",
)
return Memory(
@@ -142,6 +172,8 @@ def create_memory(
updated_at=now,
last_referenced_at="",
reference_count=0,
+ domain_tags=tags,
+ valid_until=valid_until or "",
)
@@ -200,8 +232,13 @@ def update_memory(
content: str | None = None,
confidence: float | None = None,
status: str | None = None,
+ memory_type: str | None = None,
+ domain_tags: list[str] | None = None,
+ valid_until: str | None = None,
) -> bool:
"""Update an existing memory."""
+ import json as _json
+
with get_connection() as conn:
existing = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
if existing is None:
@@ -235,6 +272,17 @@ def update_memory(
raise ValueError(f"Invalid status '{status}'. Must be one of: {MEMORY_STATUSES}")
updates.append("status = ?")
params.append(status)
+ if memory_type is not None:
+ if memory_type not in MEMORY_TYPES:
+ raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
+ updates.append("memory_type = ?")
+ params.append(memory_type)
+ if domain_tags is not None:
+ updates.append("domain_tags = ?")
+ params.append(_json.dumps(_normalize_tags(domain_tags)))
+ if valid_until is not None:
+ updates.append("valid_until = ?")
+ params.append(valid_until.strip() or None)
if not updates:
return False
@@ -488,8 +536,14 @@ def get_memories_for_context(
seen_ids.add(mem.id)
pool.append(mem)
+ # Phase 3: filter out expired memories (valid_until in the past).
+ # Raw API queries still return them (for audit/history) but context
+ # packs must not surface stale facts.
+ if pool:
+ pool = _filter_expired(pool)
+
if query_tokens is not None:
- pool = _rank_memories_for_query(pool, query_tokens)
+ pool = _rank_memories_for_query(pool, query_tokens, query=query)
# Per-entry cap prevents a single long memory from monopolizing
# the band. With 16 p06 memories competing for ~700 chars, an
@@ -518,40 +572,78 @@ def get_memories_for_context(
return text, len(text)
+def _filter_expired(memories: list["Memory"]) -> list["Memory"]:
+ """Drop memories whose valid_until is in the past (UTC comparison)."""
+ now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+ out = []
+ for m in memories:
+ vu = (m.valid_until or "").strip()
+ if not vu:
+ out.append(m)
+ continue
+ # Compare as string (ISO dates/timestamps sort correctly lexicographically
+ # when they have the same format; date-only vs full ts both start YYYY-MM-DD).
+ if vu[:10] >= now_iso:
+ out.append(m)
+ # else: expired, drop silently
+ return out
+
+
def _rank_memories_for_query(
memories: list["Memory"],
query_tokens: set[str],
+ query: str | None = None,
) -> list["Memory"]:
"""Rerank a memory list by lexical overlap with a pre-tokenized query.
Primary key: overlap_density (overlap_count / memory_token_count),
which rewards short focused memories that match the query precisely
over long overview memories that incidentally share a few tokens.
- Secondary: absolute overlap count. Tertiary: confidence.
+ Secondary: absolute overlap count. Tertiary: domain-tag match.
+ Quaternary: confidence.
- R7 fix: previously overlap_count alone was the primary key, so a
- 40-token overview memory with 3 overlapping tokens tied a 5-token
- memory with 3 overlapping tokens, and the overview won on
- confidence. Now the short memory's density (0.6) beats the
- overview's density (0.075).
+ Phase 3: domain_tags contribute a boost when they appear in the
+ query text. A memory tagged [optics, thermal] for a query about
+ "optics coating" gets promoted above a memory without those tags.
+ Tag boost fires AFTER content-overlap density so it only breaks
+ ties among content-similar candidates.
"""
from atocore.memory.reinforcement import _normalize, _tokenize
- scored: list[tuple[float, int, float, Memory]] = []
+ query_lower = (query or "").lower()
+
+ scored: list[tuple[float, int, int, float, Memory]] = []
for mem in memories:
mem_tokens = _tokenize(_normalize(mem.content))
overlap = len(mem_tokens & query_tokens) if mem_tokens else 0
density = overlap / len(mem_tokens) if mem_tokens else 0.0
- scored.append((density, overlap, mem.confidence, mem))
- scored.sort(key=lambda t: (t[0], t[1], t[2]), reverse=True)
- return [mem for _, _, _, mem in scored]
+
+ # Tag boost: count how many of the memory's domain_tags appear
+ # as substrings in the raw query. Strong signal for topical match.
+ tag_hits = 0
+ for tag in (mem.domain_tags or []):
+ if tag and tag in query_lower:
+ tag_hits += 1
+
+ scored.append((density, overlap, tag_hits, mem.confidence, mem))
+ scored.sort(key=lambda t: (t[0], t[1], t[2], t[3]), reverse=True)
+ return [mem for _, _, _, _, mem in scored]
def _row_to_memory(row) -> Memory:
"""Convert a DB row to Memory dataclass."""
+ import json as _json
keys = row.keys() if hasattr(row, "keys") else []
last_ref = row["last_referenced_at"] if "last_referenced_at" in keys else None
ref_count = row["reference_count"] if "reference_count" in keys else 0
+ tags_raw = row["domain_tags"] if "domain_tags" in keys else None
+ try:
+ tags = _json.loads(tags_raw) if tags_raw else []
+ if not isinstance(tags, list):
+ tags = []
+ except Exception:
+ tags = []
+ valid_until = row["valid_until"] if "valid_until" in keys else None
return Memory(
id=row["id"],
memory_type=row["memory_type"],
@@ -564,6 +656,8 @@ def _row_to_memory(row) -> Memory:
updated_at=row["updated_at"],
last_referenced_at=last_ref or "",
reference_count=int(ref_count or 0),
+ domain_tags=tags,
+ valid_until=valid_until or "",
)
diff --git a/src/atocore/models/database.py b/src/atocore/models/database.py
index 77f50ba..293d6a9 100644
--- a/src/atocore/models/database.py
+++ b/src/atocore/models/database.py
@@ -119,6 +119,23 @@ def _apply_migrations(conn: sqlite3.Connection) -> None:
"CREATE INDEX IF NOT EXISTS idx_memories_last_referenced ON memories(last_referenced_at)"
)
+ # Phase 3 (Auto-Organization V1): domain tags + expiry.
+ # domain_tags is a JSON array of lowercase strings (optics, mechanics,
+ # firmware, business, etc.) inferred by the LLM during triage. Used for
+ # cross-project retrieval: a query about "optics" can surface matches from
+ # p04 + p05 + p06 without knowing all the project names.
+ # valid_until is an ISO UTC timestamp beyond which the memory is
+ # considered stale. get_memories_for_context filters these out of context
+ # packs automatically so ephemeral facts (status snapshots, weekly counts)
+ # don't pollute grounding once they've aged out.
+ if not _column_exists(conn, "memories", "domain_tags"):
+ conn.execute("ALTER TABLE memories ADD COLUMN domain_tags TEXT DEFAULT '[]'")
+ if not _column_exists(conn, "memories", "valid_until"):
+ conn.execute("ALTER TABLE memories ADD COLUMN valid_until DATETIME")
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_memories_valid_until ON memories(valid_until)"
+ )
+
# Phase 9 Commit A: capture loop columns on the interactions table.
# The original schema only carried prompt + project_id + a context_pack
# JSON blob. To make interactions a real audit trail of what AtoCore fed
diff --git a/tests/test_memory.py b/tests/test_memory.py
index f5f6238..5ae0673 100644
--- a/tests/test_memory.py
+++ b/tests/test_memory.py
@@ -264,6 +264,94 @@ def test_expire_stale_candidates(isolated_db):
assert mem["status"] == "invalid"
+# --- Phase 3: domain_tags + valid_until ---
+
+
+def test_create_memory_with_tags_and_valid_until(isolated_db):
+ from atocore.memory.service import create_memory
+
+ mem = create_memory(
+ "knowledge",
+ "CTE gradient dominates WFE at F/1.2",
+ domain_tags=["optics", "thermal", "materials"],
+ valid_until="2027-01-01",
+ )
+ assert mem.domain_tags == ["optics", "thermal", "materials"]
+ assert mem.valid_until == "2027-01-01"
+
+
+def test_create_memory_normalizes_tags(isolated_db):
+ from atocore.memory.service import create_memory
+
+ mem = create_memory(
+ "knowledge",
+ "some content here",
+ domain_tags=[" Optics ", "OPTICS", "Thermal", ""],
+ )
+ # Duplicates and empty removed; lowercased; stripped
+ assert mem.domain_tags == ["optics", "thermal"]
+
+
+def test_update_memory_sets_tags_and_valid_until(isolated_db):
+ from atocore.memory.service import create_memory, update_memory
+ from atocore.models.database import get_connection
+
+ mem = create_memory("knowledge", "some content for update test")
+ assert update_memory(
+ mem.id,
+ domain_tags=["controls", "firmware"],
+ valid_until="2026-12-31",
+ )
+ with get_connection() as conn:
+ row = conn.execute("SELECT domain_tags, valid_until FROM memories WHERE id = ?", (mem.id,)).fetchone()
+ import json as _json
+ assert _json.loads(row["domain_tags"]) == ["controls", "firmware"]
+ assert row["valid_until"] == "2026-12-31"
+
+
+def test_get_memories_for_context_excludes_expired(isolated_db):
+ """Expired active memories must not land in context packs."""
+ from atocore.memory.service import create_memory, get_memories_for_context
+
+ # Active but expired
+ create_memory(
+ "knowledge",
+ "stale snapshot from long ago period",
+ valid_until="2020-01-01",
+ confidence=1.0,
+ )
+ # Active and valid
+ create_memory(
+ "knowledge",
+ "durable engineering insight stays valid forever",
+ confidence=1.0,
+ )
+
+ text, _ = get_memories_for_context(memory_types=["knowledge"], budget=600)
+ assert "durable engineering" in text
+ assert "stale snapshot" not in text
+
+
+def test_context_builder_tag_boost_orders_results(isolated_db):
+ """Memories with tags matching query should rank higher."""
+ from atocore.memory.service import create_memory, get_memories_for_context
+
+ create_memory("knowledge", "generic content has no obvious overlap with topic", confidence=0.8, domain_tags=[])
+ create_memory("knowledge", "generic content has no obvious overlap topic here", confidence=0.8, domain_tags=["optics"])
+
+ text, _ = get_memories_for_context(
+ memory_types=["knowledge"],
+ budget=2000,
+ query="tell me about optics",
+ )
+ # Tagged memory should appear before the untagged one
+ idx_tagged = text.find("overlap topic here")
+ idx_untagged = text.find("overlap with topic")
+ assert idx_tagged != -1
+ assert idx_untagged != -1
+ assert idx_tagged < idx_untagged
+
+
def test_expire_stale_candidates_keeps_reinforced(isolated_db):
from atocore.memory.service import create_memory, expire_stale_candidates
from atocore.models.database import get_connection