"""Tests for Phase 9 Commit C rule-based candidate extractor.""" from fastapi.testclient import TestClient from atocore.interactions.service import record_interaction from atocore.main import app from atocore.memory.extractor import ( MemoryCandidate, extract_candidates_from_interaction, ) from atocore.memory.service import ( create_memory, get_memories, promote_memory, reject_candidate_memory, ) from atocore.models.database import init_db def _capture(**fields): return record_interaction( prompt=fields.get("prompt", "unused"), response=fields.get("response", ""), response_summary=fields.get("response_summary", ""), project=fields.get("project", ""), reinforce=False, ) # --- extractor: heading patterns ------------------------------------------ def test_extractor_finds_decision_heading(tmp_data_dir): init_db() interaction = _capture( response=( "We talked about the frame.\n\n" "## Decision: switch the lateral supports to GF-PTFE pads\n\n" "Rationale: thermal stability." ), ) results = extract_candidates_from_interaction(interaction) assert len(results) == 1 assert results[0].memory_type == "adaptation" assert "GF-PTFE" in results[0].content assert results[0].rule == "decision_heading" def test_extractor_finds_constraint_and_requirement_headings(tmp_data_dir): init_db() interaction = _capture( response=( "### Constraint: total mass must stay under 4.8 kg\n" "## Requirement: survives 12g shock in any axis\n" ), ) results = extract_candidates_from_interaction(interaction) rules = {r.rule for r in results} assert "constraint_heading" in rules assert "requirement_heading" in rules constraint = next(r for r in results if r.rule == "constraint_heading") requirement = next(r for r in results if r.rule == "requirement_heading") assert constraint.memory_type == "project" assert requirement.memory_type == "project" assert "4.8 kg" in constraint.content assert "12g" in requirement.content def test_extractor_finds_fact_heading(tmp_data_dir): init_db() interaction = _capture( response="## Fact: the polisher sim uses floating-point deltas in microns\n", ) results = extract_candidates_from_interaction(interaction) assert len(results) == 1 assert results[0].memory_type == "knowledge" assert results[0].rule == "fact_heading" def test_extractor_heading_separator_variants(tmp_data_dir): """Decision headings should match with `:`, `-`, or em-dash.""" init_db() for sep in (":", "-", "\u2014"): interaction = _capture( response=f"## Decision {sep} adopt option B for the mount interface\n", ) results = extract_candidates_from_interaction(interaction) assert len(results) == 1, f"sep={sep!r}" assert "option B" in results[0].content # --- extractor: sentence patterns ----------------------------------------- def test_extractor_finds_preference_sentence(tmp_data_dir): init_db() interaction = _capture( response=( "I prefer rebase-based workflows because history stays linear " "and reviewers have an easier time." ), ) results = extract_candidates_from_interaction(interaction) pref_matches = [r for r in results if r.rule == "preference_sentence"] assert len(pref_matches) == 1 assert pref_matches[0].memory_type == "preference" assert "rebase" in pref_matches[0].content.lower() def test_extractor_finds_decided_to_sentence(tmp_data_dir): init_db() interaction = _capture( response=( "After going through the options we decided to keep the legacy " "calibration routine for the July milestone." ), ) results = extract_candidates_from_interaction(interaction) decision_matches = [r for r in results if r.rule == "decided_to_sentence"] assert len(decision_matches) == 1 assert decision_matches[0].memory_type == "adaptation" assert "legacy calibration" in decision_matches[0].content.lower() def test_extractor_finds_requirement_sentence(tmp_data_dir): init_db() interaction = _capture( response=( "One of the findings: the requirement is that the interferometer " "must resolve 50 picometer displacements at 1 kHz bandwidth." ), ) results = extract_candidates_from_interaction(interaction) req_matches = [r for r in results if r.rule == "requirement_sentence"] assert len(req_matches) == 1 assert req_matches[0].memory_type == "project" assert "picometer" in req_matches[0].content.lower() # --- extractor: content rules --------------------------------------------- def test_extractor_rejects_too_short_matches(tmp_data_dir): init_db() interaction = _capture(response="## Decision: yes\n") # too short after clean results = extract_candidates_from_interaction(interaction) assert results == [] def test_extractor_deduplicates_identical_matches(tmp_data_dir): init_db() interaction = _capture( response=( "## Decision: use the modular frame variant for prototyping\n" "## Decision: use the modular frame variant for prototyping\n" ), ) results = extract_candidates_from_interaction(interaction) assert len(results) == 1 def test_extractor_strips_trailing_punctuation(tmp_data_dir): init_db() interaction = _capture( response="## Decision: defer the laser redesign to Q3.\n", ) results = extract_candidates_from_interaction(interaction) assert len(results) == 1 assert results[0].content.endswith("Q3") def test_extractor_includes_project_and_source_interaction_id(tmp_data_dir): init_db() interaction = _capture( project="p05-interferometer", response="## Decision: freeze the optical path for the prototype run\n", ) results = extract_candidates_from_interaction(interaction) assert len(results) == 1 assert results[0].project == "p05-interferometer" assert results[0].source_interaction_id == interaction.id def test_extractor_drops_candidates_matching_existing_active(tmp_data_dir): init_db() # Seed an active memory that the extractor would otherwise re-propose create_memory( memory_type="preference", content="prefers small reviewable diffs", ) interaction = _capture( response="Remember that I prefer small reviewable diffs because they merge faster.", ) results = extract_candidates_from_interaction(interaction) # The only candidate would have been the preference, now dropped assert not any(r.content.lower() == "small reviewable diffs" for r in results) def test_extractor_returns_empty_for_no_patterns(tmp_data_dir): init_db() interaction = _capture(response="Nothing structural here, just prose.") results = extract_candidates_from_interaction(interaction) assert results == [] # --- service: candidate lifecycle ----------------------------------------- def test_candidate_and_active_can_coexist(tmp_data_dir): init_db() active = create_memory( memory_type="preference", content="logs every config change to the change log", status="active", ) candidate = create_memory( memory_type="preference", content="logs every config change to the change log", status="candidate", ) # The two are distinct rows because status is part of the dedup key assert active.id != candidate.id def test_promote_memory_moves_candidate_to_active(tmp_data_dir): init_db() candidate = create_memory( memory_type="adaptation", content="moved the staging scripts into deploy/staging", status="candidate", ) ok = promote_memory(candidate.id) assert ok is True active_list = get_memories(memory_type="adaptation", status="active") assert any(m.id == candidate.id for m in active_list) def test_promote_memory_on_non_candidate_returns_false(tmp_data_dir): init_db() active = create_memory( memory_type="adaptation", content="already active adaptation entry", status="active", ) assert promote_memory(active.id) is False def test_reject_candidate_moves_it_to_invalid(tmp_data_dir): init_db() candidate = create_memory( memory_type="knowledge", content="the calibration uses barometric pressure compensation", status="candidate", ) ok = reject_candidate_memory(candidate.id) assert ok is True invalid_list = get_memories(memory_type="knowledge", status="invalid") assert any(m.id == candidate.id for m in invalid_list) def test_reject_on_non_candidate_returns_false(tmp_data_dir): init_db() active = create_memory(memory_type="preference", content="always uses structured logging") assert reject_candidate_memory(active.id) is False def test_get_memories_filters_by_candidate_status(tmp_data_dir): init_db() create_memory(memory_type="preference", content="active one", status="active") create_memory(memory_type="preference", content="candidate one", status="candidate") create_memory(memory_type="preference", content="another candidate", status="candidate") candidates = get_memories(status="candidate", memory_type="preference") assert len(candidates) == 2 assert all(c.status == "candidate" for c in candidates) # --- API: extract / promote / reject / list ------------------------------- def test_api_extract_interaction_without_persist(tmp_data_dir): init_db() interaction = record_interaction( prompt="review", response="## Decision: flip the default budget to 4000 for p05\n", reinforce=False, ) client = TestClient(app) response = client.post(f"/interactions/{interaction.id}/extract", json={}) assert response.status_code == 200 body = response.json() assert body["candidate_count"] == 1 assert body["persisted"] is False assert body["persisted_ids"] == [] # The candidate should NOT have been written to the memory table queue = get_memories(status="candidate") assert queue == [] def test_api_extract_interaction_with_persist(tmp_data_dir): init_db() interaction = record_interaction( prompt="review", response=( "## Decision: pin the embedding model to v2.3 for Wave 2\n" "## Constraint: context budget must stay under 4000 chars\n" ), reinforce=False, ) client = TestClient(app) response = client.post( f"/interactions/{interaction.id}/extract", json={"persist": True} ) assert response.status_code == 200 body = response.json() assert body["candidate_count"] == 2 assert body["persisted"] is True assert len(body["persisted_ids"]) == 2 queue = get_memories(status="candidate", limit=50) assert len(queue) == 2 def test_api_extract_returns_404_for_missing_interaction(tmp_data_dir): init_db() client = TestClient(app) response = client.post("/interactions/nope/extract", json={}) assert response.status_code == 404 def test_api_promote_and_reject_endpoints(tmp_data_dir): init_db() candidate = create_memory( memory_type="adaptation", content="restructured the ingestion pipeline into layered stages", status="candidate", ) client = TestClient(app) promote_response = client.post(f"/memory/{candidate.id}/promote") assert promote_response.status_code == 200 assert promote_response.json()["status"] == "promoted" # Promoting it again should 404 because it's no longer a candidate second_promote = client.post(f"/memory/{candidate.id}/promote") assert second_promote.status_code == 404 reject_response = client.post("/memory/does-not-exist/reject") assert reject_response.status_code == 404 def test_api_get_memory_candidate_status_filter(tmp_data_dir): init_db() create_memory(memory_type="preference", content="prefers explicit types", status="active") create_memory( memory_type="preference", content="prefers pull requests sized by diff lines not files", status="candidate", ) client = TestClient(app) response = client.get("/memory", params={"status": "candidate"}) assert response.status_code == 200 body = response.json() assert "candidate" in body["statuses"] assert len(body["memories"]) == 1 assert body["memories"][0]["status"] == "candidate" def test_api_get_memory_invalid_status_returns_400(tmp_data_dir): init_db() client = TestClient(app) response = client.get("/memory", params={"status": "not-a-status"}) assert response.status_code == 400