149 lines
5.3 KiB
Python
149 lines
5.3 KiB
Python
|
|
"""Phase 6 tests — Living Taxonomy: detector + transient-to-durable extension."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from atocore.memory.service import (
|
||
|
|
create_memory,
|
||
|
|
extend_reinforced_valid_until,
|
||
|
|
)
|
||
|
|
from atocore.models.database import get_connection, init_db
|
||
|
|
|
||
|
|
|
||
|
|
def _set_memory_fields(mem_id, reference_count=None, valid_until=None):
|
||
|
|
"""Helper to force memory state for tests."""
|
||
|
|
with get_connection() as conn:
|
||
|
|
fields, params = [], []
|
||
|
|
if reference_count is not None:
|
||
|
|
fields.append("reference_count = ?")
|
||
|
|
params.append(reference_count)
|
||
|
|
if valid_until is not None:
|
||
|
|
fields.append("valid_until = ?")
|
||
|
|
params.append(valid_until)
|
||
|
|
params.append(mem_id)
|
||
|
|
conn.execute(
|
||
|
|
f"UPDATE memories SET {', '.join(fields)} WHERE id = ?",
|
||
|
|
params,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# --- Transient-to-durable extension (C.3) ---
|
||
|
|
|
||
|
|
|
||
|
|
def test_extend_extends_imminent_valid_until(tmp_data_dir):
|
||
|
|
init_db()
|
||
|
|
mem = create_memory("knowledge", "Reinforced content for extension")
|
||
|
|
soon = (datetime.now(timezone.utc) + timedelta(days=7)).strftime("%Y-%m-%d")
|
||
|
|
_set_memory_fields(mem.id, reference_count=6, valid_until=soon)
|
||
|
|
|
||
|
|
result = extend_reinforced_valid_until()
|
||
|
|
assert len(result) == 1
|
||
|
|
assert result[0]["memory_id"] == mem.id
|
||
|
|
assert result[0]["action"] == "extended"
|
||
|
|
# New expiry should be ~90 days out
|
||
|
|
new_date = datetime.strptime(result[0]["new_valid_until"], "%Y-%m-%d")
|
||
|
|
days_out = (new_date - datetime.now(timezone.utc).replace(tzinfo=None)).days
|
||
|
|
assert 85 <= days_out <= 92 # ~90 days, some slop for test timing
|
||
|
|
|
||
|
|
|
||
|
|
def test_extend_makes_permanent_at_high_reference_count(tmp_data_dir):
|
||
|
|
init_db()
|
||
|
|
mem = create_memory("knowledge", "Heavy-referenced content")
|
||
|
|
soon = (datetime.now(timezone.utc) + timedelta(days=7)).strftime("%Y-%m-%d")
|
||
|
|
_set_memory_fields(mem.id, reference_count=15, valid_until=soon)
|
||
|
|
|
||
|
|
result = extend_reinforced_valid_until()
|
||
|
|
assert len(result) == 1
|
||
|
|
assert result[0]["action"] == "made_permanent"
|
||
|
|
assert result[0]["new_valid_until"] is None
|
||
|
|
|
||
|
|
# Verify the DB reflects the cleared expiry
|
||
|
|
with get_connection() as conn:
|
||
|
|
row = conn.execute(
|
||
|
|
"SELECT valid_until FROM memories WHERE id = ?", (mem.id,)
|
||
|
|
).fetchone()
|
||
|
|
assert row["valid_until"] is None
|
||
|
|
|
||
|
|
|
||
|
|
def test_extend_skips_not_expiring_soon(tmp_data_dir):
|
||
|
|
init_db()
|
||
|
|
mem = create_memory("knowledge", "Far-future expiry")
|
||
|
|
far = (datetime.now(timezone.utc) + timedelta(days=365)).strftime("%Y-%m-%d")
|
||
|
|
_set_memory_fields(mem.id, reference_count=6, valid_until=far)
|
||
|
|
|
||
|
|
result = extend_reinforced_valid_until(imminent_expiry_days=30)
|
||
|
|
assert result == []
|
||
|
|
|
||
|
|
|
||
|
|
def test_extend_skips_low_reference_count(tmp_data_dir):
|
||
|
|
init_db()
|
||
|
|
mem = create_memory("knowledge", "Not reinforced enough")
|
||
|
|
soon = (datetime.now(timezone.utc) + timedelta(days=7)).strftime("%Y-%m-%d")
|
||
|
|
_set_memory_fields(mem.id, reference_count=2, valid_until=soon)
|
||
|
|
|
||
|
|
result = extend_reinforced_valid_until(min_reference_count=5)
|
||
|
|
assert result == []
|
||
|
|
|
||
|
|
|
||
|
|
def test_extend_skips_permanent_memory(tmp_data_dir):
|
||
|
|
"""Memory with no valid_until is already permanent — shouldn't touch."""
|
||
|
|
init_db()
|
||
|
|
mem = create_memory("knowledge", "Already permanent")
|
||
|
|
_set_memory_fields(mem.id, reference_count=20)
|
||
|
|
# no valid_until
|
||
|
|
|
||
|
|
result = extend_reinforced_valid_until()
|
||
|
|
assert result == []
|
||
|
|
|
||
|
|
|
||
|
|
def test_extend_writes_audit_row(tmp_data_dir):
|
||
|
|
init_db()
|
||
|
|
mem = create_memory("knowledge", "Audited extension")
|
||
|
|
soon = (datetime.now(timezone.utc) + timedelta(days=7)).strftime("%Y-%m-%d")
|
||
|
|
_set_memory_fields(mem.id, reference_count=6, valid_until=soon)
|
||
|
|
|
||
|
|
extend_reinforced_valid_until()
|
||
|
|
|
||
|
|
from atocore.memory.service import get_memory_audit
|
||
|
|
audit = get_memory_audit(mem.id)
|
||
|
|
actions = [a["action"] for a in audit]
|
||
|
|
assert "valid_until_extended" in actions
|
||
|
|
entry = next(a for a in audit if a["action"] == "valid_until_extended")
|
||
|
|
assert entry["actor"] == "transient-to-durable"
|
||
|
|
|
||
|
|
|
||
|
|
# --- Emerging detector (smoke tests — detector runs against live DB state
|
||
|
|
# so we test the shape of results rather than full integration here) ---
|
||
|
|
|
||
|
|
|
||
|
|
def test_detector_imports_cleanly():
|
||
|
|
"""Detector module must import without errors (it's called from nightly cron)."""
|
||
|
|
import importlib.util
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
# Load the detector script as a module
|
||
|
|
script = Path(__file__).resolve().parent.parent / "scripts" / "detect_emerging.py"
|
||
|
|
assert script.exists()
|
||
|
|
spec = importlib.util.spec_from_file_location("detect_emerging", script)
|
||
|
|
mod = importlib.util.module_from_spec(spec)
|
||
|
|
# Don't actually run main() — just verify it parses and defines expected names
|
||
|
|
spec.loader.exec_module(mod)
|
||
|
|
assert hasattr(mod, "main")
|
||
|
|
assert hasattr(mod, "PROJECT_MIN_MEMORIES")
|
||
|
|
assert hasattr(mod, "PROJECT_ALERT_THRESHOLD")
|
||
|
|
|
||
|
|
|
||
|
|
def test_detector_handles_empty_db(tmp_data_dir):
|
||
|
|
"""Detector should handle zero memories without crashing."""
|
||
|
|
init_db()
|
||
|
|
# Don't create any memories. Just verify the queries work via the service layer.
|
||
|
|
from atocore.memory.service import get_memories
|
||
|
|
active = get_memories(active_only=True, limit=500)
|
||
|
|
candidates = get_memories(status="candidate", limit=500)
|
||
|
|
assert active == []
|
||
|
|
assert candidates == []
|