fix(memory): Wave 1 — SQL-aggregate dashboard counts + memory write-path fixes
Closes three live-affecting bugs surfaced by the 2026-04-29 Codex review,
all in the memory write/read path. Pre-deploy on Dalidou the live
discrepancy was dashboard.memories.active=315 vs integrity active=1091.
1. /admin/dashboard counts now SQL-aggregate (no sampling).
New get_memory_count_summary() helper. Dashboard memories.{active,
candidates,by_type,by_project,reinforced,by_status,total} all derive
from full-table SQL, not a confidence-sorted limit=500 sample. Post
deploy the dashboard active count must match the integrity panel.
2. PUT /memory/{id} accepts project; auto-triage now applies it.
Added project to MemoryUpdateRequest and update_memory() with
resolve_project_name canonicalization, before/after audit, and
duplicate-active check scoped to the new project. scripts/auto_triage.py
suggested-project correction now PUTs {"project": suggested} so
misattribution flags actually retarget the memory.
3. POST /memory/{id}/invalidate uses direct id lookup.
New get_memory(id) helper. Replaces the old
_get_memories(status="active", limit=1) lookup, which only saw the
highest-confidence active row. Active memories outside slot 0 no
longer 404. Same status-guard structure applied to
POST /memory/{id}/supersede so candidates can't silently flip to
superseded.
14 regression tests added (572 -> 586 locally). Reviewed by Codex twice:
verdict GO on tip 9604c3e.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -303,6 +303,7 @@ class MemoryUpdateRequest(BaseModel):
|
||||
memory_type: str | None = None
|
||||
domain_tags: list[str] | None = None
|
||||
valid_until: str | None = None
|
||||
project: str | None = None
|
||||
|
||||
|
||||
class ProjectStateSetRequest(BaseModel):
|
||||
@@ -636,6 +637,7 @@ def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
|
||||
memory_type=req.memory_type,
|
||||
domain_tags=req.domain_tags,
|
||||
valid_until=req.valid_until,
|
||||
project=req.project,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
@@ -794,33 +796,25 @@ def api_invalidate_memory(
|
||||
req: MemoryInvalidateRequest | None = None,
|
||||
) -> dict:
|
||||
"""Retract an active memory (Issue E — active → invalid)."""
|
||||
from atocore.memory.service import get_memories as _get_memories, invalidate_memory
|
||||
from atocore.memory.service import get_memory, invalidate_memory
|
||||
|
||||
reason = req.reason if req else ""
|
||||
# Quick existence/status check for a clean 404 vs 409.
|
||||
existing = [
|
||||
m for m in _get_memories(status="active", limit=1)
|
||||
if m.id == memory_id
|
||||
]
|
||||
if not existing:
|
||||
# Fall through to generic not-active if the id exists in another status.
|
||||
all_match = [
|
||||
m for m in _get_memories(status="candidate", limit=5000)
|
||||
+ _get_memories(status="invalid", limit=5000)
|
||||
+ _get_memories(status="superseded", limit=5000)
|
||||
if m.id == memory_id
|
||||
]
|
||||
if all_match:
|
||||
if all_match[0].status == "invalid":
|
||||
return {"status": "already_invalid", "id": memory_id}
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail=(
|
||||
f"Memory {memory_id} is {all_match[0].status}; "
|
||||
"use /reject for candidates"
|
||||
),
|
||||
)
|
||||
# Direct id lookup — earlier code used get_memories(status='active', limit=1)
|
||||
# which only saw the highest-confidence active row, so any other active
|
||||
# memory would 404 here even though it existed.
|
||||
target = get_memory(memory_id)
|
||||
if target is None:
|
||||
raise HTTPException(status_code=404, detail=f"Memory not found: {memory_id}")
|
||||
if target.status == "invalid":
|
||||
return {"status": "already_invalid", "id": memory_id}
|
||||
if target.status != "active":
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail=(
|
||||
f"Memory {memory_id} is {target.status}; "
|
||||
"use /reject for candidates"
|
||||
),
|
||||
)
|
||||
|
||||
success = invalidate_memory(memory_id, actor="api-http", reason=reason)
|
||||
if not success:
|
||||
@@ -833,15 +827,33 @@ def api_supersede_memory(
|
||||
memory_id: str,
|
||||
req: MemorySupersedeRequest | None = None,
|
||||
) -> dict:
|
||||
"""Supersede an active memory (Issue E — active → superseded)."""
|
||||
from atocore.memory.service import supersede_memory
|
||||
"""Supersede an active memory (Issue E — active → superseded).
|
||||
|
||||
Mirrors the invalidate route's status guard: candidates and other
|
||||
non-active rows must not silently flip to superseded.
|
||||
"""
|
||||
from atocore.memory.service import get_memory, supersede_memory
|
||||
|
||||
reason = req.reason if req else ""
|
||||
target = get_memory(memory_id)
|
||||
if target is None:
|
||||
raise HTTPException(status_code=404, detail=f"Memory not found: {memory_id}")
|
||||
if target.status == "superseded":
|
||||
return {"status": "already_superseded", "id": memory_id}
|
||||
if target.status != "active":
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail=(
|
||||
f"Memory {memory_id} is {target.status}; "
|
||||
"only active memories can be superseded"
|
||||
),
|
||||
)
|
||||
|
||||
success = supersede_memory(memory_id, actor="api-http", reason=reason)
|
||||
if not success:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Memory not found or not active: {memory_id}",
|
||||
status_code=409,
|
||||
detail=f"Memory {memory_id} could not be superseded",
|
||||
)
|
||||
return {"status": "superseded", "id": memory_id}
|
||||
|
||||
@@ -1280,16 +1292,20 @@ def api_dashboard() -> dict:
|
||||
health beyond the basic /health endpoint.
|
||||
"""
|
||||
import json as _json
|
||||
from collections import Counter
|
||||
from datetime import datetime as _dt, timezone as _tz
|
||||
|
||||
all_memories = get_memories(active_only=False, limit=500)
|
||||
active = [m for m in all_memories if m.status == "active"]
|
||||
candidates = [m for m in all_memories if m.status == "candidate"]
|
||||
from atocore.memory.service import get_memory_count_summary
|
||||
|
||||
type_counts = dict(Counter(m.memory_type for m in active))
|
||||
project_counts = dict(Counter(m.project or "(none)" for m in active))
|
||||
reinforced = [m for m in active if m.reference_count > 0]
|
||||
# SQL-backed counts. Earlier code derived these by sampling the top
|
||||
# 500 rows of get_memories() ordered by confidence — anything past
|
||||
# the cap was invisible, so /admin/dashboard silently undercounted
|
||||
# active memories once the corpus crossed ~500 active rows.
|
||||
counts = get_memory_count_summary()
|
||||
active_total = counts["active"]["total"]
|
||||
candidate_total = counts["by_status"].get("candidate", 0)
|
||||
type_counts = counts["active"]["by_type"]
|
||||
project_counts = counts["active"]["by_project"]
|
||||
reinforced_total = counts["active"]["reinforced"]
|
||||
|
||||
# Interaction stats — total + by_client from DB directly
|
||||
interaction_stats: dict = {"most_recent": None, "total": 0, "by_client": {}}
|
||||
@@ -1402,13 +1418,13 @@ def api_dashboard() -> dict:
|
||||
|
||||
# Triage queue health
|
||||
triage: dict = {
|
||||
"pending": len(candidates),
|
||||
"pending": candidate_total,
|
||||
"review_url": "/admin/triage",
|
||||
}
|
||||
if len(candidates) > 50:
|
||||
triage["warning"] = f"High queue: {len(candidates)} candidates pending review."
|
||||
elif len(candidates) > 20:
|
||||
triage["notice"] = f"{len(candidates)} candidates awaiting triage."
|
||||
if candidate_total > 50:
|
||||
triage["warning"] = f"High queue: {candidate_total} candidates pending review."
|
||||
elif candidate_total > 20:
|
||||
triage["notice"] = f"{candidate_total} candidates awaiting triage."
|
||||
|
||||
# Recent audit activity (Phase 4 V1) — last 10 mutations for operator
|
||||
recent_audit: list[dict] = []
|
||||
@@ -1420,11 +1436,13 @@ def api_dashboard() -> dict:
|
||||
|
||||
return {
|
||||
"memories": {
|
||||
"active": len(active),
|
||||
"candidates": len(candidates),
|
||||
"active": active_total,
|
||||
"candidates": candidate_total,
|
||||
"by_type": type_counts,
|
||||
"by_project": project_counts,
|
||||
"reinforced": len(reinforced),
|
||||
"reinforced": reinforced_total,
|
||||
"by_status": counts["by_status"],
|
||||
"total": counts["total"],
|
||||
},
|
||||
"project_state": {
|
||||
"counts": ps_counts,
|
||||
|
||||
@@ -347,6 +347,83 @@ def get_memories(
|
||||
return [_row_to_memory(r) for r in rows]
|
||||
|
||||
|
||||
def get_memory(memory_id: str) -> Memory | None:
|
||||
"""Return a single memory by id, or None if missing.
|
||||
|
||||
Direct id lookup (no LIMIT, no confidence ordering) — the right
|
||||
primitive for routes that need to check a specific memory's status
|
||||
before acting. Avoids the sampling pitfall where ``get_memories``
|
||||
with a small ``limit`` could hide a target row sorted past the cap.
|
||||
"""
|
||||
with get_connection() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT * FROM memories WHERE id = ?", (memory_id,)
|
||||
).fetchone()
|
||||
return _row_to_memory(row) if row else None
|
||||
|
||||
|
||||
def get_memory_count_summary() -> dict:
|
||||
"""Aggregate memory counts straight from SQL (no sampling).
|
||||
|
||||
Returned shape:
|
||||
{
|
||||
"total": int, # all rows
|
||||
"by_status": {status: int, ...}, # full table
|
||||
"active": {
|
||||
"total": int,
|
||||
"reinforced": int, # active with reference_count > 0
|
||||
"by_type": {memory_type: int, ...},
|
||||
"by_project": {project_or_none: int, ...},
|
||||
},
|
||||
}
|
||||
|
||||
Distinct from ``get_memories(...)``, which is a row-fetcher with a
|
||||
confidence-sorted LIMIT and is therefore not safe for counting.
|
||||
"""
|
||||
summary: dict = {
|
||||
"total": 0,
|
||||
"by_status": {},
|
||||
"active": {
|
||||
"total": 0,
|
||||
"reinforced": 0,
|
||||
"by_type": {},
|
||||
"by_project": {},
|
||||
},
|
||||
}
|
||||
|
||||
with get_connection() as conn:
|
||||
row = conn.execute("SELECT count(*) FROM memories").fetchone()
|
||||
summary["total"] = row[0] if row else 0
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT status, count(*) FROM memories GROUP BY status"
|
||||
).fetchall()
|
||||
summary["by_status"] = {r[0]: r[1] for r in rows}
|
||||
|
||||
active_total = summary["by_status"].get("active", 0)
|
||||
summary["active"]["total"] = active_total
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT memory_type, count(*) FROM memories "
|
||||
"WHERE status = 'active' GROUP BY memory_type"
|
||||
).fetchall()
|
||||
summary["active"]["by_type"] = {r[0]: r[1] for r in rows}
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT COALESCE(NULLIF(project, ''), '(none)') AS project, count(*) "
|
||||
"FROM memories WHERE status = 'active' GROUP BY project"
|
||||
).fetchall()
|
||||
summary["active"]["by_project"] = {r[0]: r[1] for r in rows}
|
||||
|
||||
row = conn.execute(
|
||||
"SELECT count(*) FROM memories "
|
||||
"WHERE status = 'active' AND reference_count > 0"
|
||||
).fetchone()
|
||||
summary["active"]["reinforced"] = row[0] if row else 0
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
def update_memory(
|
||||
memory_id: str,
|
||||
content: str | None = None,
|
||||
@@ -355,6 +432,7 @@ def update_memory(
|
||||
memory_type: str | None = None,
|
||||
domain_tags: list[str] | None = None,
|
||||
valid_until: str | None = None,
|
||||
project: str | None = None,
|
||||
actor: str = "api",
|
||||
note: str = "",
|
||||
) -> bool:
|
||||
@@ -368,6 +446,10 @@ def update_memory(
|
||||
|
||||
next_content = content if content is not None else existing["content"]
|
||||
next_status = status if status is not None else existing["status"]
|
||||
next_project = (
|
||||
resolve_project_name(project) if project is not None
|
||||
else (existing["project"] or "")
|
||||
)
|
||||
if confidence is not None:
|
||||
_validate_confidence(confidence)
|
||||
|
||||
@@ -375,7 +457,7 @@ def update_memory(
|
||||
duplicate = conn.execute(
|
||||
"SELECT id FROM memories "
|
||||
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
|
||||
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
|
||||
(existing["memory_type"], next_content, next_project, memory_id),
|
||||
).fetchone()
|
||||
if duplicate:
|
||||
raise ValueError("Update would create a duplicate active memory")
|
||||
@@ -386,6 +468,7 @@ def update_memory(
|
||||
"status": existing["status"],
|
||||
"confidence": existing["confidence"],
|
||||
"memory_type": existing["memory_type"],
|
||||
"project": existing["project"] or "",
|
||||
}
|
||||
after_snapshot = dict(before_snapshot)
|
||||
|
||||
@@ -422,6 +505,10 @@ def update_memory(
|
||||
updates.append("valid_until = ?")
|
||||
params.append(vu)
|
||||
after_snapshot["valid_until"] = vu or ""
|
||||
if project is not None:
|
||||
updates.append("project = ?")
|
||||
params.append(next_project)
|
||||
after_snapshot["project"] = next_project
|
||||
|
||||
if not updates:
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user