Complete implementation of Agentic Context Engineering (ACE) framework: Core modules (optimization_engine/context/): - playbook.py: AtomizerPlaybook with helpful/harmful scoring - reflector.py: AtomizerReflector for insight extraction - session_state.py: Context isolation (exposed/isolated state) - feedback_loop.py: Automated learning from trial results - compaction.py: Long-session context management - cache_monitor.py: KV-cache optimization tracking - runner_integration.py: OptimizationRunner integration Dashboard integration: - context.py: 12 REST API endpoints for playbook management Tests: - test_context_engineering.py: 44 unit tests - test_context_integration.py: 16 integration tests Documentation: - CONTEXT_ENGINEERING_REPORT.md: Comprehensive implementation report - CONTEXT_ENGINEERING_API.md: Complete API reference - SYS_17_CONTEXT_ENGINEERING.md: System protocol - Updated cheatsheet with SYS_17 quick reference - Enhanced bootstrap (00_BOOTSTRAP_V2.md) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
740 lines
24 KiB
Python
740 lines
24 KiB
Python
"""
|
|
Test suite for context engineering components.
|
|
|
|
Tests the ACE (Agentic Context Engineering) implementation:
|
|
- Playbook: Knowledge store with helpful/harmful tracking
|
|
- Reflector: Outcome analysis and insight extraction
|
|
- SessionState: Context isolation
|
|
- Compaction: Long-running session management
|
|
- FeedbackLoop: Automated learning
|
|
"""
|
|
|
|
import pytest
|
|
from pathlib import Path
|
|
import tempfile
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from optimization_engine.context.playbook import (
|
|
AtomizerPlaybook,
|
|
PlaybookItem,
|
|
InsightCategory
|
|
)
|
|
from optimization_engine.context.reflector import (
|
|
AtomizerReflector,
|
|
OptimizationOutcome
|
|
)
|
|
from optimization_engine.context.session_state import (
|
|
AtomizerSessionState,
|
|
TaskType,
|
|
ExposedState,
|
|
IsolatedState
|
|
)
|
|
from optimization_engine.context.compaction import (
|
|
CompactionManager,
|
|
ContextEvent,
|
|
EventType,
|
|
ContextBudgetManager
|
|
)
|
|
from optimization_engine.context.cache_monitor import (
|
|
ContextCacheOptimizer,
|
|
CacheStats,
|
|
StablePrefixBuilder
|
|
)
|
|
from optimization_engine.context.feedback_loop import (
|
|
FeedbackLoop
|
|
)
|
|
|
|
|
|
class TestAtomizerPlaybook:
|
|
"""Tests for the playbook system."""
|
|
|
|
def test_create_empty_playbook(self):
|
|
"""Test creating an empty playbook."""
|
|
playbook = AtomizerPlaybook()
|
|
assert len(playbook.items) == 0
|
|
assert playbook.version == 1
|
|
|
|
def test_add_insight(self):
|
|
"""Test adding insights to playbook."""
|
|
playbook = AtomizerPlaybook()
|
|
|
|
item = playbook.add_insight(
|
|
category=InsightCategory.STRATEGY,
|
|
content="Use shell elements for thin walls",
|
|
source_trial=1
|
|
)
|
|
|
|
assert item.id == "str-00001"
|
|
assert item.helpful_count == 0
|
|
assert item.harmful_count == 0
|
|
assert item.category == InsightCategory.STRATEGY
|
|
assert len(playbook.items) == 1
|
|
assert 1 in item.source_trials
|
|
|
|
def test_add_multiple_categories(self):
|
|
"""Test adding insights across different categories."""
|
|
playbook = AtomizerPlaybook()
|
|
|
|
playbook.add_insight(InsightCategory.STRATEGY, "Strategy 1")
|
|
playbook.add_insight(InsightCategory.MISTAKE, "Mistake 1")
|
|
playbook.add_insight(InsightCategory.TOOL, "Tool tip 1")
|
|
playbook.add_insight(InsightCategory.STRATEGY, "Strategy 2")
|
|
|
|
assert len(playbook.items) == 4
|
|
assert "str-00001" in playbook.items
|
|
assert "str-00002" in playbook.items
|
|
assert "mis-00001" in playbook.items
|
|
assert "tool-00001" in playbook.items
|
|
|
|
def test_deduplication(self):
|
|
"""Test that duplicate insights are merged."""
|
|
playbook = AtomizerPlaybook()
|
|
|
|
item1 = playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements")
|
|
item2 = playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements")
|
|
|
|
# Should merge into one item
|
|
assert len(playbook.items) == 1
|
|
# Helpful count incremented on duplicate
|
|
assert item2.helpful_count == 1
|
|
assert item1 is item2 # Same object
|
|
|
|
def test_outcome_tracking(self):
|
|
"""Test helpful/harmful tracking."""
|
|
playbook = AtomizerPlaybook()
|
|
item = playbook.add_insight(InsightCategory.STRATEGY, "Test insight")
|
|
|
|
playbook.record_outcome(item.id, helpful=True)
|
|
playbook.record_outcome(item.id, helpful=True)
|
|
playbook.record_outcome(item.id, helpful=False)
|
|
|
|
assert item.helpful_count == 2
|
|
assert item.harmful_count == 1
|
|
assert item.net_score == 1
|
|
assert item.confidence == 2/3
|
|
|
|
def test_confidence_calculation(self):
|
|
"""Test confidence score calculation."""
|
|
playbook = AtomizerPlaybook()
|
|
item = playbook.add_insight(InsightCategory.STRATEGY, "Test")
|
|
|
|
# Initial confidence is 0.5 (neutral)
|
|
assert item.confidence == 0.5
|
|
|
|
# After positive feedback
|
|
playbook.record_outcome(item.id, helpful=True)
|
|
assert item.confidence == 1.0
|
|
|
|
# After mixed feedback
|
|
playbook.record_outcome(item.id, helpful=False)
|
|
assert item.confidence == 0.5
|
|
|
|
def test_persistence(self, tmp_path):
|
|
"""Test save/load cycle."""
|
|
playbook = AtomizerPlaybook()
|
|
playbook.add_insight(InsightCategory.MISTAKE, "Don't do this", tags=["test"])
|
|
playbook.add_insight(InsightCategory.STRATEGY, "Do this instead")
|
|
|
|
# Record some outcomes
|
|
playbook.record_outcome("mis-00001", helpful=False)
|
|
playbook.record_outcome("str-00001", helpful=True)
|
|
|
|
save_path = tmp_path / "playbook.json"
|
|
playbook.save(save_path)
|
|
|
|
# Load and verify
|
|
loaded = AtomizerPlaybook.load(save_path)
|
|
assert len(loaded.items) == 2
|
|
assert "mis-00001" in loaded.items
|
|
assert loaded.items["mis-00001"].harmful_count == 1
|
|
assert loaded.items["str-00001"].helpful_count == 1
|
|
assert "test" in loaded.items["mis-00001"].tags
|
|
|
|
def test_pruning(self):
|
|
"""Test harmful item pruning."""
|
|
playbook = AtomizerPlaybook()
|
|
item = playbook.add_insight(InsightCategory.STRATEGY, "Bad advice")
|
|
|
|
# Record many harmful outcomes
|
|
for _ in range(5):
|
|
playbook.record_outcome(item.id, helpful=False)
|
|
|
|
assert item.net_score == -5
|
|
|
|
# Prune with threshold -3
|
|
removed = playbook.prune_harmful(threshold=-3)
|
|
|
|
assert removed == 1
|
|
assert len(playbook.items) == 0
|
|
|
|
def test_search_by_content(self):
|
|
"""Test content search functionality."""
|
|
playbook = AtomizerPlaybook()
|
|
playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements for thin walls")
|
|
playbook.add_insight(InsightCategory.STRATEGY, "Solid elements for thick parts")
|
|
playbook.add_insight(InsightCategory.MISTAKE, "Don't use coarse mesh")
|
|
|
|
results = playbook.search_by_content("shell elements")
|
|
assert len(results) >= 1
|
|
assert "shell" in results[0].content.lower()
|
|
|
|
def test_get_context_for_task(self):
|
|
"""Test context string generation."""
|
|
playbook = AtomizerPlaybook()
|
|
playbook.add_insight(InsightCategory.STRATEGY, "Strategy 1")
|
|
playbook.add_insight(InsightCategory.MISTAKE, "Mistake 1")
|
|
|
|
# Make strategy have higher score
|
|
playbook.record_outcome("str-00001", helpful=True)
|
|
playbook.record_outcome("str-00001", helpful=True)
|
|
|
|
context = playbook.get_context_for_task("optimization")
|
|
|
|
assert "Playbook" in context
|
|
assert "str-00001" in context
|
|
assert "helpful=2" in context
|
|
|
|
|
|
class TestAtomizerReflector:
|
|
"""Tests for the reflector component."""
|
|
|
|
def test_create_reflector(self):
|
|
"""Test creating a reflector."""
|
|
playbook = AtomizerPlaybook()
|
|
reflector = AtomizerReflector(playbook)
|
|
|
|
assert reflector.playbook is playbook
|
|
assert len(reflector.pending_insights) == 0
|
|
|
|
def test_analyze_successful_trial(self):
|
|
"""Test analysis of successful trial."""
|
|
playbook = AtomizerPlaybook()
|
|
reflector = AtomizerReflector(playbook)
|
|
|
|
outcome = OptimizationOutcome(
|
|
trial_number=1,
|
|
success=True,
|
|
objective_value=100.0,
|
|
constraint_violations=[],
|
|
solver_errors=[],
|
|
design_variables={"thickness": 1.0, "width": 5.0},
|
|
extractor_used="mass_extractor",
|
|
duration_seconds=60
|
|
)
|
|
|
|
insights = reflector.analyze_trial(outcome)
|
|
|
|
# Should extract success pattern
|
|
assert len(insights) >= 1
|
|
assert any(i.helpful for i in insights)
|
|
assert 1 in reflector.analyzed_trials
|
|
|
|
def test_analyze_failed_trial(self):
|
|
"""Test analysis of failed trial."""
|
|
playbook = AtomizerPlaybook()
|
|
reflector = AtomizerReflector(playbook)
|
|
|
|
outcome = OptimizationOutcome(
|
|
trial_number=1,
|
|
success=False,
|
|
objective_value=None,
|
|
constraint_violations=["stress > 250 MPa"],
|
|
solver_errors=["convergence failure at iteration 50"],
|
|
design_variables={"thickness": 0.5},
|
|
extractor_used="stress_extractor",
|
|
duration_seconds=120
|
|
)
|
|
|
|
insights = reflector.analyze_trial(outcome)
|
|
|
|
# Should extract failure patterns
|
|
assert len(insights) >= 2 # At least error + constraint
|
|
assert any(i.category == InsightCategory.MISTAKE for i in insights)
|
|
assert not any(i.helpful for i in insights if i.category == InsightCategory.MISTAKE)
|
|
|
|
def test_analyze_mesh_error(self):
|
|
"""Test analysis of mesh-related error."""
|
|
playbook = AtomizerPlaybook()
|
|
reflector = AtomizerReflector(playbook)
|
|
|
|
outcome = OptimizationOutcome(
|
|
trial_number=5,
|
|
success=False,
|
|
objective_value=None,
|
|
constraint_violations=[],
|
|
solver_errors=["Element distortion: negative jacobian detected"],
|
|
design_variables={},
|
|
extractor_used="",
|
|
duration_seconds=30
|
|
)
|
|
|
|
insights = reflector.analyze_trial(outcome)
|
|
|
|
# Should identify mesh error
|
|
assert any("mesh" in str(i.tags).lower() for i in insights)
|
|
|
|
def test_commit_insights(self):
|
|
"""Test committing insights to playbook."""
|
|
playbook = AtomizerPlaybook()
|
|
reflector = AtomizerReflector(playbook)
|
|
|
|
outcome = OptimizationOutcome(
|
|
trial_number=1,
|
|
success=True,
|
|
objective_value=100.0,
|
|
constraint_violations=[],
|
|
solver_errors=[],
|
|
design_variables={"thickness": 1.0},
|
|
extractor_used="mass_extractor",
|
|
duration_seconds=60
|
|
)
|
|
|
|
reflector.analyze_trial(outcome)
|
|
count = reflector.commit_insights()
|
|
|
|
assert count > 0
|
|
assert len(playbook.items) > 0
|
|
assert len(reflector.pending_insights) == 0 # Cleared after commit
|
|
|
|
def test_analyze_study_completion(self):
|
|
"""Test study-level analysis."""
|
|
playbook = AtomizerPlaybook()
|
|
reflector = AtomizerReflector(playbook)
|
|
|
|
# High success rate study
|
|
insights = reflector.analyze_study_completion(
|
|
study_name="test_study",
|
|
total_trials=100,
|
|
best_value=50.0,
|
|
convergence_rate=0.95,
|
|
method="TPE"
|
|
)
|
|
|
|
assert len(insights) >= 1
|
|
assert any("robust" in i.content.lower() for i in insights)
|
|
|
|
|
|
class TestSessionState:
|
|
"""Tests for session state management."""
|
|
|
|
def test_create_session(self):
|
|
"""Test creating a session."""
|
|
session = AtomizerSessionState(session_id="test_session")
|
|
|
|
assert session.session_id == "test_session"
|
|
assert session.exposed.task_type is None
|
|
assert len(session.exposed.recent_actions) == 0
|
|
|
|
def test_set_task_type(self):
|
|
"""Test setting task type."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
session.exposed.task_type = TaskType.CREATE_STUDY
|
|
|
|
assert session.exposed.task_type == TaskType.CREATE_STUDY
|
|
|
|
def test_add_action(self):
|
|
"""Test adding actions."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
|
|
session.add_action("Created study directory")
|
|
session.add_action("Configured optimization")
|
|
|
|
assert len(session.exposed.recent_actions) == 2
|
|
assert "Created study" in session.exposed.recent_actions[0]
|
|
|
|
def test_action_compression(self):
|
|
"""Test automatic action compression."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
|
|
# Add more actions than the limit
|
|
for i in range(15):
|
|
session.add_action(f"Action {i}")
|
|
|
|
# Should be compressed
|
|
assert len(session.exposed.recent_actions) <= 12
|
|
assert any("earlier actions" in a.lower() for a in session.exposed.recent_actions)
|
|
|
|
def test_add_error(self):
|
|
"""Test adding errors."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
|
|
session.add_error("Solver failed", error_type="convergence")
|
|
session.add_error("Mesh error")
|
|
|
|
assert len(session.exposed.recent_errors) == 2
|
|
assert "[convergence]" in session.exposed.recent_errors[0]
|
|
|
|
def test_update_study_status(self):
|
|
"""Test updating study status."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
|
|
session.update_study_status(
|
|
name="bracket_opt",
|
|
status="running",
|
|
trials_completed=25,
|
|
trials_total=100,
|
|
best_value=0.5,
|
|
best_trial=20
|
|
)
|
|
|
|
assert session.exposed.study_name == "bracket_opt"
|
|
assert session.exposed.trials_completed == 25
|
|
assert session.exposed.best_value == 0.5
|
|
|
|
def test_llm_context_generation(self):
|
|
"""Test LLM context string generation."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
session.exposed.task_type = TaskType.RUN_OPTIMIZATION
|
|
session.exposed.study_name = "test_study"
|
|
session.exposed.trials_completed = 50
|
|
session.exposed.trials_total = 100
|
|
session.exposed.best_value = 0.5
|
|
|
|
context = session.get_llm_context()
|
|
|
|
assert "test_study" in context
|
|
assert "50" in context
|
|
assert "0.5" in context
|
|
assert "run_optimization" in context
|
|
|
|
def test_isolated_state_access(self):
|
|
"""Test accessing isolated state."""
|
|
session = AtomizerSessionState(session_id="test")
|
|
session.isolated.nx_model_path = "/path/to/model.prt"
|
|
|
|
# Should not appear in LLM context
|
|
context = session.get_llm_context()
|
|
assert "/path/to/model.prt" not in context
|
|
|
|
# But accessible via explicit load
|
|
path = session.load_isolated_data("nx_model_path")
|
|
assert path == "/path/to/model.prt"
|
|
|
|
def test_persistence(self, tmp_path):
|
|
"""Test save/load cycle."""
|
|
session = AtomizerSessionState(session_id="test_persist")
|
|
session.exposed.task_type = TaskType.ANALYZE_RESULTS
|
|
session.exposed.study_name = "persist_study"
|
|
session.add_action("Test action")
|
|
|
|
save_path = tmp_path / "session.json"
|
|
session.save(save_path)
|
|
|
|
loaded = AtomizerSessionState.load(save_path)
|
|
|
|
assert loaded.session_id == "test_persist"
|
|
assert loaded.exposed.task_type == TaskType.ANALYZE_RESULTS
|
|
assert loaded.exposed.study_name == "persist_study"
|
|
|
|
|
|
class TestCompactionManager:
|
|
"""Tests for context compaction."""
|
|
|
|
def test_create_manager(self):
|
|
"""Test creating compaction manager."""
|
|
manager = CompactionManager(compaction_threshold=10, keep_recent=5)
|
|
|
|
assert manager.compaction_threshold == 10
|
|
assert manager.keep_recent == 5
|
|
assert len(manager.events) == 0
|
|
|
|
def test_add_events(self):
|
|
"""Test adding events."""
|
|
manager = CompactionManager(compaction_threshold=50)
|
|
|
|
manager.add_trial_event(trial_number=1, success=True, objective=100.0)
|
|
manager.add_trial_event(trial_number=2, success=False)
|
|
|
|
assert len(manager.events) == 2
|
|
|
|
def test_compaction_trigger(self):
|
|
"""Test that compaction triggers at threshold."""
|
|
manager = CompactionManager(compaction_threshold=10, keep_recent=5)
|
|
|
|
for i in range(15):
|
|
manager.add_event(ContextEvent(
|
|
timestamp=datetime.now(),
|
|
event_type=EventType.TRIAL_COMPLETE,
|
|
summary=f"Trial {i} complete",
|
|
details={"trial_number": i, "objective": i * 0.1}
|
|
))
|
|
|
|
assert manager.compaction_count > 0
|
|
assert len(manager.events) <= 10
|
|
|
|
def test_error_preservation(self):
|
|
"""Test that errors are never compacted."""
|
|
manager = CompactionManager(compaction_threshold=10, keep_recent=3)
|
|
|
|
# Add error early
|
|
manager.add_error_event("Critical solver failure", "solver_error")
|
|
|
|
# Add many regular events
|
|
for i in range(20):
|
|
manager.add_trial_event(trial_number=i, success=True, objective=i)
|
|
|
|
# Error should still be present
|
|
errors = [e for e in manager.events if e.event_type == EventType.ERROR]
|
|
assert len(errors) == 1
|
|
assert "Critical solver failure" in errors[0].summary
|
|
|
|
def test_milestone_preservation(self):
|
|
"""Test that milestones are preserved."""
|
|
manager = CompactionManager(compaction_threshold=10, keep_recent=3)
|
|
|
|
manager.add_milestone("Optimization started", {"method": "TPE"})
|
|
|
|
for i in range(20):
|
|
manager.add_trial_event(trial_number=i, success=True)
|
|
|
|
# Milestone should be preserved
|
|
milestones = [e for e in manager.events if e.event_type == EventType.MILESTONE]
|
|
assert len(milestones) == 1
|
|
|
|
def test_context_string_generation(self):
|
|
"""Test context string generation."""
|
|
manager = CompactionManager()
|
|
|
|
manager.add_trial_event(trial_number=1, success=True, objective=100.0)
|
|
manager.add_error_event("Test error")
|
|
|
|
context = manager.get_context_string()
|
|
|
|
assert "Optimization History" in context
|
|
assert "Trial 1" in context
|
|
assert "Test error" in context
|
|
|
|
def test_get_stats(self):
|
|
"""Test statistics generation."""
|
|
manager = CompactionManager(compaction_threshold=10, keep_recent=5)
|
|
|
|
for i in range(15):
|
|
manager.add_trial_event(trial_number=i, success=i % 2 == 0)
|
|
|
|
stats = manager.get_stats()
|
|
|
|
assert stats["total_events"] <= 15
|
|
assert stats["compaction_count"] > 0
|
|
|
|
|
|
class TestCacheMonitor:
|
|
"""Tests for cache monitoring."""
|
|
|
|
def test_create_optimizer(self):
|
|
"""Test creating cache optimizer."""
|
|
optimizer = ContextCacheOptimizer()
|
|
|
|
assert optimizer.stats.total_requests == 0
|
|
assert optimizer.stats.cache_hits == 0
|
|
|
|
def test_prepare_context(self):
|
|
"""Test context preparation."""
|
|
optimizer = ContextCacheOptimizer()
|
|
|
|
context = optimizer.prepare_context(
|
|
stable_prefix="Stable content",
|
|
semi_stable="Session content",
|
|
dynamic="User message"
|
|
)
|
|
|
|
assert "Stable content" in context
|
|
assert "Session content" in context
|
|
assert "User message" in context
|
|
assert optimizer.stats.total_requests == 1
|
|
|
|
def test_cache_hit_detection(self):
|
|
"""Test cache hit detection."""
|
|
optimizer = ContextCacheOptimizer()
|
|
|
|
# First request
|
|
optimizer.prepare_context("Stable", "Semi", "Dynamic 1")
|
|
|
|
# Second request with same stable prefix
|
|
optimizer.prepare_context("Stable", "Semi", "Dynamic 2")
|
|
|
|
assert optimizer.stats.total_requests == 2
|
|
assert optimizer.stats.cache_hits == 1
|
|
|
|
def test_cache_miss_detection(self):
|
|
"""Test cache miss detection."""
|
|
optimizer = ContextCacheOptimizer()
|
|
|
|
optimizer.prepare_context("Stable 1", "Semi", "Dynamic")
|
|
optimizer.prepare_context("Stable 2", "Semi", "Dynamic") # Different prefix
|
|
|
|
assert optimizer.stats.cache_hits == 0
|
|
assert optimizer.stats.cache_misses == 2
|
|
|
|
def test_stable_prefix_builder(self):
|
|
"""Test stable prefix builder."""
|
|
builder = StablePrefixBuilder()
|
|
|
|
builder.add_identity("I am Atomizer")
|
|
builder.add_capabilities("I can optimize")
|
|
builder.add_tools("Tool definitions here")
|
|
|
|
prefix = builder.build()
|
|
|
|
assert "I am Atomizer" in prefix
|
|
assert "I can optimize" in prefix
|
|
# Identity should come before capabilities (order 10 < 20)
|
|
assert prefix.index("Atomizer") < prefix.index("optimize")
|
|
|
|
|
|
class TestFeedbackLoop:
|
|
"""Tests for the feedback loop."""
|
|
|
|
def test_create_feedback_loop(self, tmp_path):
|
|
"""Test creating feedback loop."""
|
|
playbook_path = tmp_path / "playbook.json"
|
|
feedback = FeedbackLoop(playbook_path)
|
|
|
|
assert feedback.playbook is not None
|
|
assert feedback._total_trials_processed == 0
|
|
|
|
def test_process_successful_trial(self, tmp_path):
|
|
"""Test processing successful trial."""
|
|
playbook_path = tmp_path / "playbook.json"
|
|
feedback = FeedbackLoop(playbook_path)
|
|
|
|
result = feedback.process_trial_result(
|
|
trial_number=1,
|
|
success=True,
|
|
objective_value=100.0,
|
|
design_variables={"thickness": 1.0}
|
|
)
|
|
|
|
assert result["trial_number"] == 1
|
|
assert result["success"] is True
|
|
assert feedback._total_trials_processed == 1
|
|
assert feedback._successful_trials == 1
|
|
|
|
def test_process_failed_trial(self, tmp_path):
|
|
"""Test processing failed trial."""
|
|
playbook_path = tmp_path / "playbook.json"
|
|
feedback = FeedbackLoop(playbook_path)
|
|
|
|
result = feedback.process_trial_result(
|
|
trial_number=1,
|
|
success=False,
|
|
objective_value=0.0,
|
|
design_variables={"thickness": 0.5},
|
|
errors=["Convergence failure"]
|
|
)
|
|
|
|
assert result["success"] is False
|
|
assert feedback._failed_trials == 1
|
|
|
|
def test_finalize_study(self, tmp_path):
|
|
"""Test study finalization."""
|
|
playbook_path = tmp_path / "playbook.json"
|
|
feedback = FeedbackLoop(playbook_path)
|
|
|
|
# Process some trials
|
|
for i in range(10):
|
|
feedback.process_trial_result(
|
|
trial_number=i,
|
|
success=i % 3 != 0,
|
|
objective_value=100 - i if i % 3 != 0 else 0,
|
|
design_variables={"x": i * 0.1}
|
|
)
|
|
|
|
# Finalize
|
|
result = feedback.finalize_study({
|
|
"name": "test_study",
|
|
"total_trials": 10,
|
|
"best_value": 91,
|
|
"convergence_rate": 0.7
|
|
})
|
|
|
|
assert result["insights_added"] > 0
|
|
assert result["playbook_size"] > 0
|
|
assert playbook_path.exists() # Should be saved
|
|
|
|
def test_playbook_item_attribution(self, tmp_path):
|
|
"""Test that playbook items get updated based on outcomes."""
|
|
playbook_path = tmp_path / "playbook.json"
|
|
|
|
# Pre-populate playbook
|
|
playbook = AtomizerPlaybook()
|
|
item = playbook.add_insight(InsightCategory.STRATEGY, "Test strategy")
|
|
playbook.save(playbook_path)
|
|
|
|
# Create feedback loop and process trials with this item active
|
|
feedback = FeedbackLoop(playbook_path)
|
|
|
|
feedback.process_trial_result(
|
|
trial_number=1,
|
|
success=True,
|
|
objective_value=100.0,
|
|
design_variables={},
|
|
context_items_used=[item.id]
|
|
)
|
|
|
|
feedback.process_trial_result(
|
|
trial_number=2,
|
|
success=True,
|
|
objective_value=95.0,
|
|
design_variables={},
|
|
context_items_used=[item.id]
|
|
)
|
|
|
|
# Item should have positive feedback
|
|
assert feedback.playbook.items[item.id].helpful_count == 2
|
|
|
|
|
|
class TestContextBudgetManager:
|
|
"""Tests for context budget management."""
|
|
|
|
def test_create_manager(self):
|
|
"""Test creating budget manager."""
|
|
manager = ContextBudgetManager()
|
|
|
|
assert manager.budget["total"] == 100000
|
|
assert "stable_prefix" in manager.budget
|
|
|
|
def test_estimate_tokens(self):
|
|
"""Test token estimation."""
|
|
manager = ContextBudgetManager()
|
|
|
|
tokens = manager.estimate_tokens("Hello world") # 11 chars
|
|
assert tokens == 2 # 11 / 4 = 2.75 -> 2
|
|
|
|
def test_update_usage(self):
|
|
"""Test usage tracking."""
|
|
manager = ContextBudgetManager()
|
|
|
|
result = manager.update_usage("stable_prefix", "x" * 20000) # 5000 tokens
|
|
|
|
assert result["section"] == "stable_prefix"
|
|
assert result["tokens"] == 5000
|
|
assert result["over_budget"] is False
|
|
|
|
def test_over_budget_warning(self):
|
|
"""Test over-budget detection."""
|
|
manager = ContextBudgetManager()
|
|
|
|
# Exceed stable_prefix budget (5000 tokens = 20000 chars)
|
|
result = manager.update_usage("stable_prefix", "x" * 40000) # 10000 tokens
|
|
|
|
assert result["over_budget"] is True
|
|
assert "warning" in result
|
|
|
|
def test_get_status(self):
|
|
"""Test overall status reporting."""
|
|
manager = ContextBudgetManager()
|
|
|
|
manager.update_usage("stable_prefix", "x" * 10000)
|
|
manager.update_usage("protocols", "x" * 20000)
|
|
|
|
status = manager.get_status()
|
|
|
|
assert "total_used" in status
|
|
assert "utilization" in status
|
|
assert "recommendations" in status
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|