feat(phase9-C): rule-based candidate extractor and review queue

Phase 9 Commit C. Closes the capture loop: Commit A records what
AtoCore fed the LLM and what came back, Commit B bumps confidence on
active memories the response actually references, and this commit
turns structured cues in the response into candidate memories for a
human review queue.

Nothing extracted here is ever automatically promoted into trusted
state. Every candidate sits at status="candidate" until a human (or
later, a confident automatic policy) calls /memory/{id}/promote or
/memory/{id}/reject. This keeps the "bad memory is worse than no
memory" invariant from the operating model intact.

New module: src/atocore/memory/extractor.py
- MemoryCandidate dataclass (type, content, rule, source_span,
  project, confidence, source_interaction_id)
- extract_candidates_from_interaction(interaction): runs a fixed set
  of regex rules over the response + response_summary and returns
  a list of candidates

V0 rule set (deliberately narrow to keep false positives low):
- decision_heading     ## Decision: / ## Decision - / ## Decision —
                       -> adaptation candidate
- constraint_heading   ## Constraint: ...      -> project candidate
- requirement_heading  ## Requirement: ...     -> project candidate
- fact_heading         ## Fact: ...            -> knowledge candidate
- preference_sentence  "I prefer X" / "the user prefers X"
                       -> preference candidate
- decided_to_sentence  "decided to X"          -> adaptation candidate
- requirement_sentence "the requirement is X"  -> project candidate

Extractor post-processing:
- clean_value: collapse whitespace, strip trailing punctuation
- min content length 8 chars, max 280 (keeps candidates reviewable)
- dedupe by (memory_type, normalized value, rule)
- drop candidates whose content already matches an active memory of
  the same type+project so the queue doesn't ask humans to re-curate
  things they already promoted

Memory service (extends Commit B candidate-status foundation):
- promote_memory(id): candidate -> active (404 if not a candidate)
- reject_candidate_memory(id): candidate -> invalid
- both are no-ops if the target isn't currently a candidate so the
  API can surface 404 without the caller needing to pre-check

API endpoints (new):
- POST /interactions/{id}/extract             run extractor, preview-only
  body: {"persist": false}                    (default) returns candidates
        {"persist": true}                     creates candidate memories
- POST /memory/{id}/promote                   candidate -> active
- POST /memory/{id}/reject                    candidate -> invalid
- GET  /memory?status=candidate               list review queue explicitly
      (existing endpoint now accepts status= override)
- GET  /memory now also returns reference_count and last_referenced_at
  per memory so the Commit B reinforcement signal is visible to clients

Trust model unchanged:
- candidates NEVER appear in context packs (get_memories_for_context
  still filters to active via the active_only default)
- candidates NEVER get reinforced by the Commit B loop (reinforcement
  refuses non-active memories)
- trusted project state is untouched end-to-end

Tests (25 new, all green):
- heading pattern: decision, constraint, requirement, fact
- separator variants :, -, em-dash
- sentence patterns: preference, decided_to, requirement
- rejects too-short matches
- dedupes identical matches
- strips trailing punctuation
- carries project and source_interaction_id onto candidates
- drops candidates that duplicate an existing active memory
- returns empty for prose without structural cues
- candidate and active coexist in the memory table
- promote_memory moves candidate -> active
- promote on non-candidate returns False
- reject_candidate_memory moves candidate -> invalid
- reject on non-candidate returns False
- get_memories(status="candidate") returns just the queue
- POST /interactions/{id}/extract preview-only path
- POST /interactions/{id}/extract persist=true path
- POST /interactions/{id}/extract 404 for missing interaction
- POST /memory/{id}/promote success + 404 on non-candidate
- POST /memory/{id}/reject 404 on missing
- GET /memory?status=candidate surfaces the queue
- GET /memory?status=<invalid> returns 400

Full suite: 160 passing (was 135).

What Phase 9 looks like end to end after this commit
----------------------------------------------------
prompt
  -> context pack assembled
    -> LLM response
      -> POST /interactions (capture)
         -> automatic Commit B reinforcement (active memories only)
         -> [optional] POST /interactions/{id}/extract
            -> Commit C extractor proposes candidates
               -> human reviews via GET /memory?status=candidate
                  -> POST /memory/{id}/promote  (candidate -> active)
                  OR POST /memory/{id}/reject   (candidate -> invalid)

Not in this commit (deferred on purpose):
- Decay of unused memories (we keep reference_count and
  last_referenced_at so a later decay job has the signal it needs)
- LLM-based extractor as an alternative to the regex rules
- Automatic promotion of high-confidence candidates
- Candidate-to-entity upgrade path (needs the engineering layer
  memory-vs-entities decision, planned in a coming architecture doc)
This commit is contained in:
2026-04-06 21:24:17 -04:00
parent 2704997256
commit 53147d326c
3 changed files with 719 additions and 8 deletions

View File

@@ -30,6 +30,10 @@ from atocore.interactions.service import (
list_interactions,
record_interaction,
)
from atocore.memory.extractor import (
MemoryCandidate,
extract_candidates_from_interaction,
)
from atocore.memory.reinforcement import reinforce_from_interaction
from atocore.memory.service import (
MEMORY_STATUSES,
@@ -351,15 +355,25 @@ def api_get_memories(
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
status: str | None = None,
) -> dict:
"""List memories, optionally filtered."""
memories = get_memories(
memory_type=memory_type,
project=project,
active_only=active_only,
min_confidence=min_confidence,
limit=limit,
)
"""List memories, optionally filtered.
When ``status`` is given explicitly it overrides ``active_only`` so
the Phase 9 Commit C review queue can be listed via
``GET /memory?status=candidate``.
"""
try:
memories = get_memories(
memory_type=memory_type,
project=project,
active_only=active_only,
min_confidence=min_confidence,
limit=limit,
status=status,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"memories": [
{
@@ -369,11 +383,14 @@ def api_get_memories(
"project": m.project,
"confidence": m.confidence,
"status": m.status,
"reference_count": m.reference_count,
"last_referenced_at": m.last_referenced_at,
"updated_at": m.updated_at,
}
for m in memories
],
"types": MEMORY_TYPES,
"statuses": MEMORY_STATUSES,
}
@@ -403,6 +420,33 @@ def api_invalidate_memory(memory_id: str) -> dict:
return {"status": "invalidated", "id": memory_id}
@router.post("/memory/{memory_id}/promote")
def api_promote_memory(memory_id: str) -> dict:
"""Promote a candidate memory to active (Phase 9 Commit C)."""
try:
success = promote_memory(memory_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not a candidate: {memory_id}",
)
return {"status": "promoted", "id": memory_id}
@router.post("/memory/{memory_id}/reject")
def api_reject_candidate_memory(memory_id: str) -> dict:
"""Reject a candidate memory (Phase 9 Commit C review queue)."""
success = reject_candidate_memory(memory_id)
if not success:
raise HTTPException(
status_code=404,
detail=f"Memory not found or not a candidate: {memory_id}",
)
return {"status": "rejected", "id": memory_id}
@router.post("/project/state")
def api_set_project_state(req: ProjectStateSetRequest) -> dict:
"""Set or update a trusted project state entry."""
@@ -528,6 +572,70 @@ def api_reinforce_interaction(interaction_id: str) -> dict:
}
class InteractionExtractRequest(BaseModel):
persist: bool = False
@router.post("/interactions/{interaction_id}/extract")
def api_extract_from_interaction(
interaction_id: str,
req: InteractionExtractRequest | None = None,
) -> dict:
"""Extract candidate memories from a captured interaction.
Phase 9 Commit C. The extractor is rule-based and deliberately
conservative — it only surfaces candidates that matched an explicit
structural cue (decision heading, preference sentence, etc.). By
default the candidates are returned *without* being persisted so a
caller can preview them before committing to a review queue. Pass
``persist: true`` to immediately create candidate memories for
each extraction result.
"""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
payload = req or InteractionExtractRequest()
candidates: list[MemoryCandidate] = extract_candidates_from_interaction(interaction)
persisted_ids: list[str] = []
if payload.persist:
for candidate in candidates:
try:
mem = create_memory(
memory_type=candidate.memory_type,
content=candidate.content,
project=candidate.project,
confidence=candidate.confidence,
status="candidate",
)
persisted_ids.append(mem.id)
except ValueError as e:
log.error(
"extract_persist_failed",
interaction_id=interaction_id,
rule=candidate.rule,
error=str(e),
)
return {
"interaction_id": interaction_id,
"candidate_count": len(candidates),
"persisted": payload.persist,
"persisted_ids": persisted_ids,
"candidates": [
{
"memory_type": c.memory_type,
"content": c.content,
"project": c.project,
"confidence": c.confidence,
"rule": c.rule,
"source_span": c.source_span,
}
for c in candidates
],
}
@router.get("/interactions")
def api_list_interactions(
project: str | None = None,

View File

@@ -0,0 +1,229 @@
"""Rule-based candidate-memory extraction from captured interactions.
Phase 9 Commit C. This module reads an interaction's response text and
produces a list of *candidate* memories that a human can later review
and either promote to active or reject. Nothing extracted here is ever
automatically promoted into trusted state — the AtoCore trust rule is
that bad memory is worse than no memory, so the extractor is
conservative on purpose.
Design rules for V0
-------------------
1. Rule-based only. No LLM calls. The extractor should be fast, cheap,
fully explainable, and produce the same output for the same input
across runs.
2. Patterns match obvious, high-signal structures and are intentionally
narrow. False positives are more harmful than false negatives because
every candidate means review work for a human.
3. Every extracted candidate records which pattern fired and which text
span it came from, so a reviewer can audit the extractor's reasoning.
4. Patterns should feel like idioms the user already writes in their
PKM and interaction notes:
* ``## Decision: ...`` and variants
* ``## Constraint: ...`` and variants
* ``I prefer <X>`` / ``the user prefers <X>``
* ``decided to <X>``
* ``<X> is a requirement`` / ``requirement: <X>``
5. Candidates are de-duplicated against already-active memories of the
same type+project so review queues don't fill up with things the
user has already curated.
The extractor produces ``MemoryCandidate`` objects. The caller decides
whether to persist them via ``create_memory(..., status="candidate")``.
Persistence is kept out of the extractor itself so it can be tested
without touching the database and so future extractors (LLM-based,
structural, ontology-driven) can be swapped in cleanly.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from atocore.interactions.service import Interaction
from atocore.memory.service import MEMORY_TYPES, get_memories
from atocore.observability.logger import get_logger
log = get_logger("extractor")
# Every candidate is attributed to the rule that fired so reviewers can
# audit why it was proposed.
@dataclass
class MemoryCandidate:
memory_type: str
content: str
rule: str
source_span: str
project: str = ""
confidence: float = 0.5 # default review-queue confidence
source_interaction_id: str = ""
# ---------------------------------------------------------------------------
# Pattern definitions
# ---------------------------------------------------------------------------
#
# Each pattern maps to:
# - the memory type the candidate should land in
# - a compiled regex over the response text
# - a short human-readable rule id
#
# Regexes are intentionally anchored to obvious structural cues so random
# prose doesn't light them up. All are case-insensitive and DOTALL so
# they can span a line break inside a single logical phrase.
_RULES: list[tuple[str, str, re.Pattern]] = [
(
"decision_heading",
"adaptation",
re.compile(
r"^[ \t]*#{1,6}[ \t]*decision[ \t]*[:\-\u2014][ \t]*(?P<value>.+?)$",
re.IGNORECASE | re.MULTILINE,
),
),
(
"constraint_heading",
"project",
re.compile(
r"^[ \t]*#{1,6}[ \t]*constraint[ \t]*[:\-\u2014][ \t]*(?P<value>.+?)$",
re.IGNORECASE | re.MULTILINE,
),
),
(
"requirement_heading",
"project",
re.compile(
r"^[ \t]*#{1,6}[ \t]*requirement[ \t]*[:\-\u2014][ \t]*(?P<value>.+?)$",
re.IGNORECASE | re.MULTILINE,
),
),
(
"fact_heading",
"knowledge",
re.compile(
r"^[ \t]*#{1,6}[ \t]*fact[ \t]*[:\-\u2014][ \t]*(?P<value>.+?)$",
re.IGNORECASE | re.MULTILINE,
),
),
(
"preference_sentence",
"preference",
re.compile(
r"(?:^|[\s\.])(?:I|the user)\s+prefer(?:s)?\s+(?P<value>[^\n\.\!]{6,200})",
re.IGNORECASE,
),
),
(
"decided_to_sentence",
"adaptation",
re.compile(
r"(?:^|[\s\.])(?:I|we|the user)\s+decided\s+to\s+(?P<value>[^\n\.\!]{6,200})",
re.IGNORECASE,
),
),
(
"requirement_sentence",
"project",
re.compile(
r"(?:^|[\s\.])(?:the[ \t]+)?requirement\s+(?:is|was)\s+(?P<value>[^\n\.\!]{6,200})",
re.IGNORECASE,
),
),
]
# A minimum content length after trimming stops silly one-word candidates.
_MIN_CANDIDATE_LENGTH = 8
# A maximum content length keeps candidates reviewable at a glance.
_MAX_CANDIDATE_LENGTH = 280
def extract_candidates_from_interaction(
interaction: Interaction,
) -> list[MemoryCandidate]:
"""Return a list of candidate memories for human review.
The returned candidates are not persisted. The caller can iterate
over the result and call ``create_memory(..., status="candidate")``
for each one it wants to land.
"""
text = _combined_response_text(interaction)
if not text:
return []
raw_candidates: list[MemoryCandidate] = []
seen_spans: set[tuple[str, str, str]] = set() # (type, normalized_value, rule)
for rule_id, memory_type, pattern in _RULES:
for match in pattern.finditer(text):
value = _clean_value(match.group("value"))
if len(value) < _MIN_CANDIDATE_LENGTH or len(value) > _MAX_CANDIDATE_LENGTH:
continue
normalized = value.lower()
dedup_key = (memory_type, normalized, rule_id)
if dedup_key in seen_spans:
continue
seen_spans.add(dedup_key)
raw_candidates.append(
MemoryCandidate(
memory_type=memory_type,
content=value,
rule=rule_id,
source_span=match.group(0).strip(),
project=interaction.project or "",
confidence=0.5,
source_interaction_id=interaction.id,
)
)
# Drop anything that duplicates an already-active memory of the
# same type and project so reviewers aren't asked to re-curate
# things they already promoted.
filtered = [c for c in raw_candidates if not _matches_existing_active(c)]
if filtered:
log.info(
"extraction_produced_candidates",
interaction_id=interaction.id,
candidate_count=len(filtered),
dropped_as_duplicate=len(raw_candidates) - len(filtered),
)
return filtered
def _combined_response_text(interaction: Interaction) -> str:
parts: list[str] = []
if interaction.response:
parts.append(interaction.response)
if interaction.response_summary:
parts.append(interaction.response_summary)
return "\n".join(parts).strip()
def _clean_value(raw: str) -> str:
"""Trim whitespace, strip trailing punctuation, collapse inner spaces."""
cleaned = re.sub(r"\s+", " ", raw).strip()
# Trim trailing punctuation that commonly trails sentences but is not
# part of the fact itself.
cleaned = cleaned.rstrip(".;,!?\u2014-")
return cleaned.strip()
def _matches_existing_active(candidate: MemoryCandidate) -> bool:
"""Return True if an identical active memory already exists."""
if candidate.memory_type not in MEMORY_TYPES:
return False
try:
existing = get_memories(
memory_type=candidate.memory_type,
project=candidate.project or None,
active_only=True,
limit=200,
)
except Exception as exc: # pragma: no cover - defensive
log.error("extractor_existing_lookup_failed", error=str(exc))
return False
needle = candidate.content.lower()
for mem in existing:
if mem.content.lower() == needle:
return True
return False