fix(memory): SQL-aggregate dashboard counts, project on update, id-based invalidate

Three bugs surfaced by the 2026-04-29 Codex review of the state-of-the-service
plan, all in the memory write/read path:

1. /admin/dashboard memory counts were derived from a confidence-sorted
   get_memories(limit=500) sample. With prod at 1091 active memories the
   dashboard reported 315 ("active in the top 500"), while integrity
   reported the SQL aggregate 1091. Replaced the sampling block with a
   new get_memory_count_summary() helper that does straight SQL aggregates
   over status/type/project. Dashboard memories.{active,candidates,...}
   now match integrity. Adds memories.{by_status,total} for completeness.

2. PUT /memory/{id} silently dropped project changes because
   MemoryUpdateRequest had no project field and update_memory() didn't
   accept one. auto_triage.py:407 detects suggested_project drift and
   issues a PUT to fix it; the fix never landed. Added project to the
   request schema and the service signature, with resolve_project_name
   canonicalization, before/after audit snapshot, and the existing
   duplicate-active check now scoped to the new project.

3. POST /memory/{id}/invalidate did _get_memories(status="active",
   limit=1) and looked for the target inside that single
   highest-confidence row. Any other active memory 404'd. Replaced with
   a direct id lookup via the new get_memory(id) helper; status branching
   stays the same (404 unknown / 200 already-invalid / 409 wrong-status /
   200 invalidated).

Tests added (9):
- test_get_memory_count_summary_returns_full_table_aggregates
- test_get_memory_returns_single_row_or_none
- test_update_memory_can_change_project_with_canonicalization
- test_update_memory_project_unchanged_when_not_passed
- test_api_invalidate_finds_active_memory_outside_top_one
- test_api_invalidate_already_invalid_is_idempotent
- test_api_invalidate_candidate_returns_409
- test_api_invalidate_unknown_id_is_404
- test_admin_dashboard_active_count_matches_full_table

Test count: 572 -> 581. Full suite green locally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-28 21:40:10 -04:00
parent 7042eaea46
commit fb4d55cbcd
4 changed files with 301 additions and 40 deletions

View File

@@ -303,6 +303,7 @@ class MemoryUpdateRequest(BaseModel):
memory_type: str | None = None
domain_tags: list[str] | None = None
valid_until: str | None = None
project: str | None = None
class ProjectStateSetRequest(BaseModel):
@@ -636,6 +637,7 @@ def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
memory_type=req.memory_type,
domain_tags=req.domain_tags,
valid_until=req.valid_until,
project=req.project,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -794,33 +796,25 @@ def api_invalidate_memory(
req: MemoryInvalidateRequest | None = None,
) -> dict:
"""Retract an active memory (Issue E — active → invalid)."""
from atocore.memory.service import get_memories as _get_memories, invalidate_memory
from atocore.memory.service import get_memory, invalidate_memory
reason = req.reason if req else ""
# Quick existence/status check for a clean 404 vs 409.
existing = [
m for m in _get_memories(status="active", limit=1)
if m.id == memory_id
]
if not existing:
# Fall through to generic not-active if the id exists in another status.
all_match = [
m for m in _get_memories(status="candidate", limit=5000)
+ _get_memories(status="invalid", limit=5000)
+ _get_memories(status="superseded", limit=5000)
if m.id == memory_id
]
if all_match:
if all_match[0].status == "invalid":
# Direct id lookup — earlier code used get_memories(status='active', limit=1)
# which only saw the highest-confidence active row, so any other active
# memory would 404 here even though it existed.
target = get_memory(memory_id)
if target is None:
raise HTTPException(status_code=404, detail=f"Memory not found: {memory_id}")
if target.status == "invalid":
return {"status": "already_invalid", "id": memory_id}
if target.status != "active":
raise HTTPException(
status_code=409,
detail=(
f"Memory {memory_id} is {all_match[0].status}; "
f"Memory {memory_id} is {target.status}; "
"use /reject for candidates"
),
)
raise HTTPException(status_code=404, detail=f"Memory not found: {memory_id}")
success = invalidate_memory(memory_id, actor="api-http", reason=reason)
if not success:
@@ -1280,16 +1274,20 @@ def api_dashboard() -> dict:
health beyond the basic /health endpoint.
"""
import json as _json
from collections import Counter
from datetime import datetime as _dt, timezone as _tz
all_memories = get_memories(active_only=False, limit=500)
active = [m for m in all_memories if m.status == "active"]
candidates = [m for m in all_memories if m.status == "candidate"]
from atocore.memory.service import get_memory_count_summary
type_counts = dict(Counter(m.memory_type for m in active))
project_counts = dict(Counter(m.project or "(none)" for m in active))
reinforced = [m for m in active if m.reference_count > 0]
# SQL-backed counts. Earlier code derived these by sampling the top
# 500 rows of get_memories() ordered by confidence — anything past
# the cap was invisible, so /admin/dashboard silently undercounted
# active memories once the corpus crossed ~500 active rows.
counts = get_memory_count_summary()
active_total = counts["active"]["total"]
candidate_total = counts["by_status"].get("candidate", 0)
type_counts = counts["active"]["by_type"]
project_counts = counts["active"]["by_project"]
reinforced_total = counts["active"]["reinforced"]
# Interaction stats — total + by_client from DB directly
interaction_stats: dict = {"most_recent": None, "total": 0, "by_client": {}}
@@ -1402,13 +1400,13 @@ def api_dashboard() -> dict:
# Triage queue health
triage: dict = {
"pending": len(candidates),
"pending": candidate_total,
"review_url": "/admin/triage",
}
if len(candidates) > 50:
triage["warning"] = f"High queue: {len(candidates)} candidates pending review."
elif len(candidates) > 20:
triage["notice"] = f"{len(candidates)} candidates awaiting triage."
if candidate_total > 50:
triage["warning"] = f"High queue: {candidate_total} candidates pending review."
elif candidate_total > 20:
triage["notice"] = f"{candidate_total} candidates awaiting triage."
# Recent audit activity (Phase 4 V1) — last 10 mutations for operator
recent_audit: list[dict] = []
@@ -1420,11 +1418,13 @@ def api_dashboard() -> dict:
return {
"memories": {
"active": len(active),
"candidates": len(candidates),
"active": active_total,
"candidates": candidate_total,
"by_type": type_counts,
"by_project": project_counts,
"reinforced": len(reinforced),
"reinforced": reinforced_total,
"by_status": counts["by_status"],
"total": counts["total"],
},
"project_state": {
"counts": ps_counts,

View File

@@ -347,6 +347,83 @@ def get_memories(
return [_row_to_memory(r) for r in rows]
def get_memory(memory_id: str) -> Memory | None:
"""Return a single memory by id, or None if missing.
Direct id lookup (no LIMIT, no confidence ordering) — the right
primitive for routes that need to check a specific memory's status
before acting. Avoids the sampling pitfall where ``get_memories``
with a small ``limit`` could hide a target row sorted past the cap.
"""
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM memories WHERE id = ?", (memory_id,)
).fetchone()
return _row_to_memory(row) if row else None
def get_memory_count_summary() -> dict:
"""Aggregate memory counts straight from SQL (no sampling).
Returned shape:
{
"total": int, # all rows
"by_status": {status: int, ...}, # full table
"active": {
"total": int,
"reinforced": int, # active with reference_count > 0
"by_type": {memory_type: int, ...},
"by_project": {project_or_none: int, ...},
},
}
Distinct from ``get_memories(...)``, which is a row-fetcher with a
confidence-sorted LIMIT and is therefore not safe for counting.
"""
summary: dict = {
"total": 0,
"by_status": {},
"active": {
"total": 0,
"reinforced": 0,
"by_type": {},
"by_project": {},
},
}
with get_connection() as conn:
row = conn.execute("SELECT count(*) FROM memories").fetchone()
summary["total"] = row[0] if row else 0
rows = conn.execute(
"SELECT status, count(*) FROM memories GROUP BY status"
).fetchall()
summary["by_status"] = {r[0]: r[1] for r in rows}
active_total = summary["by_status"].get("active", 0)
summary["active"]["total"] = active_total
rows = conn.execute(
"SELECT memory_type, count(*) FROM memories "
"WHERE status = 'active' GROUP BY memory_type"
).fetchall()
summary["active"]["by_type"] = {r[0]: r[1] for r in rows}
rows = conn.execute(
"SELECT COALESCE(NULLIF(project, ''), '(none)') AS project, count(*) "
"FROM memories WHERE status = 'active' GROUP BY project"
).fetchall()
summary["active"]["by_project"] = {r[0]: r[1] for r in rows}
row = conn.execute(
"SELECT count(*) FROM memories "
"WHERE status = 'active' AND reference_count > 0"
).fetchone()
summary["active"]["reinforced"] = row[0] if row else 0
return summary
def update_memory(
memory_id: str,
content: str | None = None,
@@ -355,6 +432,7 @@ def update_memory(
memory_type: str | None = None,
domain_tags: list[str] | None = None,
valid_until: str | None = None,
project: str | None = None,
actor: str = "api",
note: str = "",
) -> bool:
@@ -368,6 +446,10 @@ def update_memory(
next_content = content if content is not None else existing["content"]
next_status = status if status is not None else existing["status"]
next_project = (
resolve_project_name(project) if project is not None
else (existing["project"] or "")
)
if confidence is not None:
_validate_confidence(confidence)
@@ -375,7 +457,7 @@ def update_memory(
duplicate = conn.execute(
"SELECT id FROM memories "
"WHERE memory_type = ? AND content = ? AND project = ? AND status = 'active' AND id != ?",
(existing["memory_type"], next_content, existing["project"] or "", memory_id),
(existing["memory_type"], next_content, next_project, memory_id),
).fetchone()
if duplicate:
raise ValueError("Update would create a duplicate active memory")
@@ -386,6 +468,7 @@ def update_memory(
"status": existing["status"],
"confidence": existing["confidence"],
"memory_type": existing["memory_type"],
"project": existing["project"] or "",
}
after_snapshot = dict(before_snapshot)
@@ -422,6 +505,10 @@ def update_memory(
updates.append("valid_until = ?")
params.append(vu)
after_snapshot["valid_until"] = vu or ""
if project is not None:
updates.append("project = ?")
params.append(next_project)
after_snapshot["project"] = next_project
if not updates:
return False

View File

@@ -192,3 +192,91 @@ def test_v1_aliases_present(env):
"/v1/memory/{memory_id}/supersede",
):
assert p in paths, f"{p} missing"
# ---------------------------------------------------------------------------
# Wave 1 (2026-04-29) — invalidation route used to do
# `_get_memories(status='active', limit=1)` and look for the target id
# inside that single highest-confidence row, so any active memory
# outside slot 0 fell through as 404. Direct id lookup fixes it.
# ---------------------------------------------------------------------------
def test_api_invalidate_finds_active_memory_outside_top_one(env):
"""An active memory not at the top of the confidence sort must still
be invalidatable via POST /memory/{id}/invalidate."""
high = create_memory(
memory_type="knowledge",
content="high-confidence top row",
confidence=0.99,
)
low = create_memory(
memory_type="knowledge",
content="lower-confidence target",
confidence=0.55,
)
client = TestClient(app)
r = client.post(f"/memory/{low.id}/invalidate", json={"reason": "wave1 regression"})
assert r.status_code == 200, r.text
assert r.json()["status"] == "invalidated"
# And confirm the high-confidence row is untouched
assert _get_memory(high.id).status == "active"
assert _get_memory(low.id).status == "invalid"
def test_api_invalidate_already_invalid_is_idempotent(env):
m = create_memory(memory_type="knowledge", content="already invalid")
client = TestClient(app)
r1 = client.post(f"/memory/{m.id}/invalidate", json={"reason": "first"})
assert r1.status_code == 200
r2 = client.post(f"/memory/{m.id}/invalidate", json={"reason": "again"})
assert r2.status_code == 200
assert r2.json()["status"] == "already_invalid"
def test_api_invalidate_candidate_returns_409(env):
m = create_memory(
memory_type="knowledge", content="candidate route", status="candidate"
)
client = TestClient(app)
r = client.post(f"/memory/{m.id}/invalidate", json={"reason": "wrong route"})
assert r.status_code == 409
def test_api_invalidate_unknown_id_is_404(env):
client = TestClient(app)
r = client.post("/memory/no-such-id/invalidate", json={"reason": "ghost"})
assert r.status_code == 404
def test_admin_dashboard_active_count_matches_full_table(env):
"""/admin/dashboard memories.active must match the SQL aggregate even
when there are more active memories than the legacy sample limit (500).
This guards the Codex finding that the dashboard was deriving counts
from a confidence-sorted limit=500 fetch, hiding rows past the cap.
We don't need 500 rows in the test — a small corpus that exercises
the SQL-aggregate path is enough; the integrity-vs-dashboard equality
is the invariant being asserted.
"""
# Mix of statuses to exercise the by_status aggregate
create_memory(memory_type="knowledge", content="a")
create_memory(memory_type="knowledge", content="b", project="p06-polisher")
create_memory(memory_type="project", content="c-cand", status="candidate")
cand = create_memory(memory_type="project", content="d-cand", status="candidate")
# Invalidate one to seed an "invalid" bucket
from atocore.memory.service import invalidate_memory
target_id = cand.id
# Promote it first via direct DB so invalidate does flip a candidate
# to invalid via the service path (mirrors actual API trajectory).
invalidate_memory(target_id)
client = TestClient(app)
dash = client.get("/admin/dashboard").json()
assert dash["memories"]["active"] == 2
assert dash["memories"]["candidates"] == 1
assert dash["memories"]["by_status"]["invalid"] == 1
assert dash["memories"]["total"] == 4
assert dash["memories"]["by_project"].get("p06-polisher") == 1
# "(none)" bucket is the COALESCE label for empty/null project
assert "(none)" in dash["memories"]["by_project"]

View File

@@ -575,3 +575,89 @@ def test_expire_stale_candidates_keeps_reinforced(isolated_db):
assert mid not in expired
mem = _get_memory_by_id(mid)
assert mem["status"] == "candidate"
# ---------------------------------------------------------------------------
# Wave 1 (2026-04-29) — counts come from SQL, not from the top-N sample.
# Exposed by Codex audit when prod /admin/dashboard reported 315 active
# while /admin/integrity-check reported 1091. The dashboard was building
# its counts from a confidence-sorted limit=500 fetch.
# ---------------------------------------------------------------------------
def test_get_memory_count_summary_returns_full_table_aggregates(isolated_db):
"""Counts come from SQL aggregates, not a sampled fetch."""
from atocore.memory.service import (
create_memory,
get_memory_count_summary,
invalidate_memory,
)
# Create more rows than any reasonable sampling LIMIT so any
# LIMIT-based counter would visibly disagree with reality.
for i in range(120):
create_memory(
"knowledge",
f"fact-{i}",
project="p04-gigabit",
confidence=0.9,
status="active",
)
for i in range(7):
create_memory("knowledge", f"cand-{i}", status="candidate")
invalid_obj = create_memory("knowledge", "to-invalidate", status="active")
invalidate_memory(invalid_obj.id)
summary = get_memory_count_summary()
assert summary["total"] == 120 + 7 + 1
assert summary["by_status"]["active"] == 120
assert summary["by_status"]["candidate"] == 7
assert summary["by_status"]["invalid"] == 1
assert summary["active"]["total"] == 120
assert summary["active"]["by_type"] == {"knowledge": 120}
assert summary["active"]["by_project"] == {"p04-gigabit": 120}
def test_get_memory_returns_single_row_or_none(isolated_db):
from atocore.memory.service import create_memory, get_memory
mem = create_memory("knowledge", "single-row test")
fetched = get_memory(mem.id)
assert fetched is not None
assert fetched.id == mem.id
assert get_memory("non-existent-id") is None
def test_update_memory_can_change_project_with_canonicalization(
isolated_db, project_registry
):
"""update_memory(project=...) canonicalizes aliases and writes audit."""
project_registry(("p04-gigabit", ("p04", "gigabit")))
from atocore.memory.service import (
create_memory,
get_memory,
get_memory_audit,
update_memory,
)
mem = create_memory("knowledge", "retargetable fact", project="atocore")
ok = update_memory(mem.id, project="p04") # alias
assert ok is True
refreshed = get_memory(mem.id)
assert refreshed.project == "p04-gigabit" # canonical, not "p04"
audit_rows = get_memory_audit(mem.id, limit=10)
update_rows = [r for r in audit_rows if r.get("action") == "updated"]
assert update_rows, f"expected an updated audit row, got {audit_rows}"
head = update_rows[0]
assert head["before"]["project"] == "atocore"
assert head["after"]["project"] == "p04-gigabit"
def test_update_memory_project_unchanged_when_not_passed(isolated_db):
from atocore.memory.service import create_memory, get_memory, update_memory
mem = create_memory("knowledge", "untouched project", project="p06-polisher")
update_memory(mem.id, content="edited content")
assert get_memory(mem.id).project == "p06-polisher"