fix(memory): SQL-aggregate dashboard counts, project on update, id-based invalidate

Three bugs surfaced by the 2026-04-29 Codex review of the state-of-the-service
plan, all in the memory write/read path:

1. /admin/dashboard memory counts were derived from a confidence-sorted
   get_memories(limit=500) sample. With prod at 1091 active memories the
   dashboard reported 315 ("active in the top 500"), while integrity
   reported the SQL aggregate 1091. Replaced the sampling block with a
   new get_memory_count_summary() helper that does straight SQL aggregates
   over status/type/project. Dashboard memories.{active,candidates,...}
   now match integrity. Adds memories.{by_status,total} for completeness.

2. PUT /memory/{id} silently dropped project changes because
   MemoryUpdateRequest had no project field and update_memory() didn't
   accept one. auto_triage.py:407 detects suggested_project drift and
   issues a PUT to fix it; the fix never landed. Added project to the
   request schema and the service signature, with resolve_project_name
   canonicalization, before/after audit snapshot, and the existing
   duplicate-active check now scoped to the new project.

3. POST /memory/{id}/invalidate did _get_memories(status="active",
   limit=1) and looked for the target inside that single
   highest-confidence row. Any other active memory 404'd. Replaced with
   a direct id lookup via the new get_memory(id) helper; status branching
   stays the same (404 unknown / 200 already-invalid / 409 wrong-status /
   200 invalidated).

Tests added (9):
- test_get_memory_count_summary_returns_full_table_aggregates
- test_get_memory_returns_single_row_or_none
- test_update_memory_can_change_project_with_canonicalization
- test_update_memory_project_unchanged_when_not_passed
- test_api_invalidate_finds_active_memory_outside_top_one
- test_api_invalidate_already_invalid_is_idempotent
- test_api_invalidate_candidate_returns_409
- test_api_invalidate_unknown_id_is_404
- test_admin_dashboard_active_count_matches_full_table

Test count: 572 -> 581. Full suite green locally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-28 21:40:10 -04:00
parent 7042eaea46
commit fb4d55cbcd
4 changed files with 301 additions and 40 deletions

View File

@@ -303,6 +303,7 @@ class MemoryUpdateRequest(BaseModel):
memory_type: str | None = None
domain_tags: list[str] | None = None
valid_until: str | None = None
project: str | None = None
class ProjectStateSetRequest(BaseModel):
@@ -636,6 +637,7 @@ def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
memory_type=req.memory_type,
domain_tags=req.domain_tags,
valid_until=req.valid_until,
project=req.project,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -794,33 +796,25 @@ def api_invalidate_memory(
req: MemoryInvalidateRequest | None = None,
) -> dict:
"""Retract an active memory (Issue E — active → invalid)."""
from atocore.memory.service import get_memories as _get_memories, invalidate_memory
from atocore.memory.service import get_memory, invalidate_memory
reason = req.reason if req else ""
# Quick existence/status check for a clean 404 vs 409.
existing = [
m for m in _get_memories(status="active", limit=1)
if m.id == memory_id
]
if not existing:
# Fall through to generic not-active if the id exists in another status.
all_match = [
m for m in _get_memories(status="candidate", limit=5000)
+ _get_memories(status="invalid", limit=5000)
+ _get_memories(status="superseded", limit=5000)
if m.id == memory_id
]
if all_match:
if all_match[0].status == "invalid":
return {"status": "already_invalid", "id": memory_id}
raise HTTPException(
status_code=409,
detail=(
f"Memory {memory_id} is {all_match[0].status}; "
"use /reject for candidates"
),
)
# Direct id lookup — earlier code used get_memories(status='active', limit=1)
# which only saw the highest-confidence active row, so any other active
# memory would 404 here even though it existed.
target = get_memory(memory_id)
if target is None:
raise HTTPException(status_code=404, detail=f"Memory not found: {memory_id}")
if target.status == "invalid":
return {"status": "already_invalid", "id": memory_id}
if target.status != "active":
raise HTTPException(
status_code=409,
detail=(
f"Memory {memory_id} is {target.status}; "
"use /reject for candidates"
),
)
success = invalidate_memory(memory_id, actor="api-http", reason=reason)
if not success:
@@ -1280,16 +1274,20 @@ def api_dashboard() -> dict:
health beyond the basic /health endpoint.
"""
import json as _json
from collections import Counter
from datetime import datetime as _dt, timezone as _tz
all_memories = get_memories(active_only=False, limit=500)
active = [m for m in all_memories if m.status == "active"]
candidates = [m for m in all_memories if m.status == "candidate"]
from atocore.memory.service import get_memory_count_summary
type_counts = dict(Counter(m.memory_type for m in active))
project_counts = dict(Counter(m.project or "(none)" for m in active))
reinforced = [m for m in active if m.reference_count > 0]
# SQL-backed counts. Earlier code derived these by sampling the top
# 500 rows of get_memories() ordered by confidence — anything past
# the cap was invisible, so /admin/dashboard silently undercounted
# active memories once the corpus crossed ~500 active rows.
counts = get_memory_count_summary()
active_total = counts["active"]["total"]
candidate_total = counts["by_status"].get("candidate", 0)
type_counts = counts["active"]["by_type"]
project_counts = counts["active"]["by_project"]
reinforced_total = counts["active"]["reinforced"]
# Interaction stats — total + by_client from DB directly
interaction_stats: dict = {"most_recent": None, "total": 0, "by_client": {}}
@@ -1402,13 +1400,13 @@ def api_dashboard() -> dict:
# Triage queue health
triage: dict = {
"pending": len(candidates),
"pending": candidate_total,
"review_url": "/admin/triage",
}
if len(candidates) > 50:
triage["warning"] = f"High queue: {len(candidates)} candidates pending review."
elif len(candidates) > 20:
triage["notice"] = f"{len(candidates)} candidates awaiting triage."
if candidate_total > 50:
triage["warning"] = f"High queue: {candidate_total} candidates pending review."
elif candidate_total > 20:
triage["notice"] = f"{candidate_total} candidates awaiting triage."
# Recent audit activity (Phase 4 V1) — last 10 mutations for operator
recent_audit: list[dict] = []
@@ -1420,11 +1418,13 @@ def api_dashboard() -> dict:
return {
"memories": {
"active": len(active),
"candidates": len(candidates),
"active": active_total,
"candidates": candidate_total,
"by_type": type_counts,
"by_project": project_counts,
"reinforced": len(reinforced),
"reinforced": reinforced_total,
"by_status": counts["by_status"],
"total": counts["total"],
},
"project_state": {
"counts": ps_counts,

View File

@@ -347,6 +347,83 @@ def get_memories(
return [_row_to_memory(r) for r in rows]
def get_memory(memory_id: str) -> Memory | None:
"""Return a single memory by id, or None if missing.
Direct id lookup (no LIMIT, no confidence ordering) — the right
primitive for routes that need to check a specific memory's status
before acting. Avoids the sampling pitfall where ``get_memories``
with a small ``limit`` could hide a target row sorted past the cap.
"""
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?", (memory_id,)
).fetchone()
return _row_to_memory(row) if row else None
def get_memory_count_summary() -> dict:
"""Aggregate memory counts straight from SQL (no sampling).
Returned shape:
{
"total": int, # all rows
"by_status": {status: int, ...}, # full table
"active": {
"total": int,
"reinforced": int, # active with reference_count > 0
"by_type": {memory_type: int, ...},
"by_project": {project_or_none: int, ...},
},
}
Distinct from ``get_memories(...)``, which is a row-fetcher with a
confidence-sorted LIMIT and is therefore not safe for counting.
"""
summary: dict = {
"total": 0,
"by_status": {},
"active": {
"total": 0,
"reinforced": 0,
"by_type": {},
"by_project": {},
},
}
with get_connection() as conn:
row = conn.execute("SELECT count(*) FROM memories").fetchone()
summary["total"] = row[0] if row else 0
rows = conn.execute(
"SELECT status, count(*) FROM memories GROUP BY status"
).fetchall()
summary["by_status"] = {r[0]: r[1] for r in rows}
active_total = summary["by_status"].get("active", 0)
summary["active"]["total"] = active_total
rows = conn.execute(
"SELECT memory_type, count(*) FROM memories "
"WHERE status = 'active' GROUP BY memory_type"
).fetchall()
summary["active"]["by_type"] = {r[0]: r[1] for r in rows}
rows = conn.execute(
"SELECT COALESCE(NULLIF(project, ''), '(none)') AS project, count(*) "
"FROM memories WHERE status = 'active' GROUP BY project"
).fetchall()
summary["active"]["by_project"] = {r[0]: r[1] for r in rows}
row = conn.execute(
"SELECT count(*) FROM memories "
"WHERE status = 'active' AND reference_count > 0"
).fetchone()
summary["active"]["reinforced"] = row[0] if row else 0
return summary
def update_memory(
memory_id: str,
content: str | None = None,
@@ -355,6 +432,7 @@ def update_memory(
memory_type: str | None = None,
domain_tags: list[str] | None = None,
valid_until: str | None = None,
project: str | None = None,
actor: str = "api",
note: str = "",
) -> bool:
@@ -368,6 +446,10 @@ def update_memory(
next_content = content if content is not None else existing["content"]
next_status = status if status is not None else existing["status"]
next_project = (
resolve_project_name(project) if project is not None
else (existing["project"] or "")
)
if confidence is not None:
_validate_confidence(confidence)
@@ -375,7 +457,7 @@ def update_memory(
duplicate = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
(existing["memory_type"], next_content, next_project, memory_id),
).fetchone()
if duplicate:
raise ValueError("Update would create a duplicate active memory")
@@ -386,6 +468,7 @@ def update_memory(
"status": existing["status"],
"confidence": existing["confidence"],
"memory_type": existing["memory_type"],
"project": existing["project"] or "",
}
after_snapshot = dict(before_snapshot)
@@ -422,6 +505,10 @@ def update_memory(
updates.append("valid_until = ?")
params.append(vu)
after_snapshot["valid_until"] = vu or ""
if project is not None:
updates.append("project = ?")
params.append(next_project)
after_snapshot["project"] = next_project
if not updates:
return False