Complete implementation of Agentic Context Engineering (ACE) framework: Core modules (optimization_engine/context/): - playbook.py: AtomizerPlaybook with helpful/harmful scoring - reflector.py: AtomizerReflector for insight extraction - session_state.py: Context isolation (exposed/isolated state) - feedback_loop.py: Automated learning from trial results - compaction.py: Long-session context management - cache_monitor.py: KV-cache optimization tracking - runner_integration.py: OptimizationRunner integration Dashboard integration: - context.py: 12 REST API endpoints for playbook management Tests: - test_context_engineering.py: 44 unit tests - test_context_integration.py: 16 integration tests Documentation: - CONTEXT_ENGINEERING_REPORT.md: Comprehensive implementation report - CONTEXT_ENGINEERING_API.md: Complete API reference - SYS_17_CONTEXT_ENGINEERING.md: System protocol - Updated cheatsheet with SYS_17 quick reference - Enhanced bootstrap (00_BOOTSTRAP_V2.md) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
532 lines
17 KiB
Python
532 lines
17 KiB
Python
"""
|
|
Context Engineering Integration for OptimizationRunner
|
|
|
|
Provides integration between the context engineering system and the
|
|
OptimizationRunner without modifying the core runner code.
|
|
|
|
Two approaches are provided:
|
|
1. ContextEngineeringMixin - Mix into OptimizationRunner subclass
|
|
2. ContextAwareRunner - Wrapper that adds context engineering
|
|
|
|
Usage:
|
|
# Approach 1: Mixin
|
|
class MyRunner(ContextEngineeringMixin, OptimizationRunner):
|
|
pass
|
|
|
|
# Approach 2: Wrapper
|
|
runner = OptimizationRunner(...)
|
|
context_runner = ContextAwareRunner(runner, playbook_path)
|
|
context_runner.run(...)
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional, List, Callable
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import time
|
|
|
|
from .playbook import AtomizerPlaybook, get_playbook
|
|
from .reflector import AtomizerReflector, OptimizationOutcome
|
|
from .feedback_loop import FeedbackLoop
|
|
from .compaction import CompactionManager, EventType
|
|
from .session_state import AtomizerSessionState, TaskType, get_session
|
|
|
|
|
|
class ContextEngineeringMixin:
|
|
"""
|
|
Mixin class to add context engineering to OptimizationRunner.
|
|
|
|
Provides:
|
|
- Automatic playbook loading/saving
|
|
- Trial outcome reflection
|
|
- Learning from successes/failures
|
|
- Session state tracking
|
|
|
|
Usage:
|
|
class MyContextAwareRunner(ContextEngineeringMixin, OptimizationRunner):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.init_context_engineering()
|
|
"""
|
|
|
|
def init_context_engineering(
|
|
self,
|
|
playbook_path: Optional[Path] = None,
|
|
enable_compaction: bool = True,
|
|
compaction_threshold: int = 50
|
|
) -> None:
|
|
"""
|
|
Initialize context engineering components.
|
|
|
|
Call this in your subclass __init__ after super().__init__().
|
|
|
|
Args:
|
|
playbook_path: Path to playbook JSON (default: output_dir/playbook.json)
|
|
enable_compaction: Whether to enable context compaction
|
|
compaction_threshold: Number of events before compaction
|
|
"""
|
|
# Determine playbook path
|
|
if playbook_path is None:
|
|
playbook_path = getattr(self, 'output_dir', Path('.')) / 'playbook.json'
|
|
|
|
self._playbook_path = Path(playbook_path)
|
|
self._playbook = AtomizerPlaybook.load(self._playbook_path)
|
|
self._reflector = AtomizerReflector(self._playbook)
|
|
self._feedback_loop = FeedbackLoop(self._playbook_path)
|
|
|
|
# Initialize compaction if enabled
|
|
self._enable_compaction = enable_compaction
|
|
if enable_compaction:
|
|
self._compaction_manager = CompactionManager(
|
|
compaction_threshold=compaction_threshold,
|
|
keep_recent=20,
|
|
keep_errors=True
|
|
)
|
|
else:
|
|
self._compaction_manager = None
|
|
|
|
# Session state
|
|
self._session = get_session()
|
|
self._session.exposed.task_type = TaskType.RUN_OPTIMIZATION
|
|
|
|
# Track active playbook items for feedback attribution
|
|
self._active_playbook_items: List[str] = []
|
|
|
|
# Statistics
|
|
self._context_stats = {
|
|
"trials_processed": 0,
|
|
"insights_generated": 0,
|
|
"errors_captured": 0
|
|
}
|
|
|
|
def get_relevant_playbook_items(self, max_items: int = 15) -> List[str]:
|
|
"""
|
|
Get relevant playbook items for current optimization context.
|
|
|
|
Returns:
|
|
List of playbook item context strings
|
|
"""
|
|
context = self._playbook.get_context_for_task(
|
|
task_type="optimization",
|
|
max_items=max_items,
|
|
min_confidence=0.5
|
|
)
|
|
|
|
# Extract item IDs for feedback tracking
|
|
self._active_playbook_items = [
|
|
item.id for item in self._playbook.items.values()
|
|
][:max_items]
|
|
|
|
return context.split('\n')
|
|
|
|
def record_trial_start(self, trial_number: int, design_vars: Dict[str, float]) -> None:
|
|
"""
|
|
Record the start of a trial for context tracking.
|
|
|
|
Args:
|
|
trial_number: Trial number
|
|
design_vars: Design variable values
|
|
"""
|
|
if self._compaction_manager:
|
|
self._compaction_manager.add_event(
|
|
self._compaction_manager.events.__class__(
|
|
timestamp=datetime.now(),
|
|
event_type=EventType.TRIAL_START,
|
|
summary=f"Trial {trial_number} started",
|
|
details={"trial_number": trial_number, "design_vars": design_vars}
|
|
)
|
|
)
|
|
|
|
self._session.add_action(f"Started trial {trial_number}")
|
|
|
|
def record_trial_outcome(
|
|
self,
|
|
trial_number: int,
|
|
success: bool,
|
|
objective_value: Optional[float],
|
|
design_vars: Dict[str, float],
|
|
errors: Optional[List[str]] = None,
|
|
duration_seconds: float = 0.0
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Record the outcome of a trial for learning.
|
|
|
|
Args:
|
|
trial_number: Trial number
|
|
success: Whether trial succeeded
|
|
objective_value: Objective value (None if failed)
|
|
design_vars: Design variable values
|
|
errors: List of error messages
|
|
duration_seconds: Trial duration
|
|
|
|
Returns:
|
|
Dictionary with processing results
|
|
"""
|
|
errors = errors or []
|
|
|
|
# Update compaction manager
|
|
if self._compaction_manager:
|
|
self._compaction_manager.add_trial_event(
|
|
trial_number=trial_number,
|
|
success=success,
|
|
objective=objective_value,
|
|
duration=duration_seconds
|
|
)
|
|
|
|
# Create outcome for reflection
|
|
outcome = OptimizationOutcome(
|
|
trial_number=trial_number,
|
|
success=success,
|
|
objective_value=objective_value,
|
|
constraint_violations=[],
|
|
solver_errors=errors,
|
|
design_variables=design_vars,
|
|
extractor_used=getattr(self, '_current_extractor', ''),
|
|
duration_seconds=duration_seconds
|
|
)
|
|
|
|
# Analyze and generate insights
|
|
insights = self._reflector.analyze_trial(outcome)
|
|
|
|
# Process through feedback loop
|
|
result = self._feedback_loop.process_trial_result(
|
|
trial_number=trial_number,
|
|
success=success,
|
|
objective_value=objective_value or 0.0,
|
|
design_variables=design_vars,
|
|
context_items_used=self._active_playbook_items,
|
|
errors=errors
|
|
)
|
|
|
|
# Update statistics
|
|
self._context_stats["trials_processed"] += 1
|
|
self._context_stats["insights_generated"] += len(insights)
|
|
|
|
# Update session state
|
|
if success:
|
|
self._session.add_action(
|
|
f"Trial {trial_number} succeeded: obj={objective_value:.4g}"
|
|
)
|
|
else:
|
|
error_summary = errors[0][:50] if errors else "unknown"
|
|
self._session.add_error(f"Trial {trial_number}: {error_summary}")
|
|
self._context_stats["errors_captured"] += 1
|
|
|
|
return {
|
|
"insights_extracted": len(insights),
|
|
"playbook_items_updated": result.get("items_updated", 0)
|
|
}
|
|
|
|
def record_error(self, error_message: str, error_type: str = "") -> None:
|
|
"""
|
|
Record an error for learning (outside trial context).
|
|
|
|
Args:
|
|
error_message: Error description
|
|
error_type: Error classification
|
|
"""
|
|
if self._compaction_manager:
|
|
self._compaction_manager.add_error_event(error_message, error_type)
|
|
|
|
self._session.add_error(error_message, error_type)
|
|
self._context_stats["errors_captured"] += 1
|
|
|
|
def finalize_context_engineering(self, study_stats: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""
|
|
Finalize context engineering at end of optimization.
|
|
|
|
Commits insights and saves playbook.
|
|
|
|
Args:
|
|
study_stats: Optional study statistics for analysis
|
|
|
|
Returns:
|
|
Dictionary with finalization results
|
|
"""
|
|
if study_stats is None:
|
|
study_stats = {
|
|
"name": getattr(self, 'study', {}).get('study_name', 'unknown'),
|
|
"total_trials": self._context_stats["trials_processed"],
|
|
"best_value": getattr(self, 'best_value', 0),
|
|
"convergence_rate": 0.8 # Would need actual calculation
|
|
}
|
|
|
|
# Finalize feedback loop
|
|
result = self._feedback_loop.finalize_study(study_stats)
|
|
|
|
# Save playbook
|
|
self._playbook.save(self._playbook_path)
|
|
|
|
# Add compaction stats
|
|
if self._compaction_manager:
|
|
result["compaction_stats"] = self._compaction_manager.get_stats()
|
|
|
|
result["context_stats"] = self._context_stats
|
|
|
|
return result
|
|
|
|
def get_context_string(self) -> str:
|
|
"""
|
|
Get full context string for LLM consumption.
|
|
|
|
Returns:
|
|
Formatted context string
|
|
"""
|
|
parts = []
|
|
|
|
# Session state
|
|
parts.append(self._session.get_llm_context())
|
|
|
|
# Playbook items
|
|
playbook_context = self._playbook.get_context_for_task(
|
|
task_type="optimization",
|
|
max_items=15
|
|
)
|
|
if playbook_context:
|
|
parts.append(playbook_context)
|
|
|
|
# Compaction history
|
|
if self._compaction_manager:
|
|
parts.append(self._compaction_manager.get_context_string())
|
|
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
|
|
class ContextAwareRunner:
|
|
"""
|
|
Wrapper that adds context engineering to any OptimizationRunner.
|
|
|
|
This approach doesn't require subclassing - it wraps an existing
|
|
runner instance and intercepts relevant calls.
|
|
|
|
Usage:
|
|
runner = OptimizationRunner(...)
|
|
context_runner = ContextAwareRunner(runner)
|
|
|
|
# Use context_runner.run() instead of runner.run()
|
|
study = context_runner.run(n_trials=50)
|
|
|
|
# Get learning report
|
|
report = context_runner.get_learning_report()
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
runner,
|
|
playbook_path: Optional[Path] = None,
|
|
enable_compaction: bool = True
|
|
):
|
|
"""
|
|
Initialize context-aware wrapper.
|
|
|
|
Args:
|
|
runner: OptimizationRunner instance to wrap
|
|
playbook_path: Path to playbook (default: runner's output_dir)
|
|
enable_compaction: Whether to enable context compaction
|
|
"""
|
|
self._runner = runner
|
|
|
|
# Determine playbook path
|
|
if playbook_path is None:
|
|
playbook_path = runner.output_dir / 'playbook.json'
|
|
|
|
self._playbook_path = Path(playbook_path)
|
|
self._playbook = AtomizerPlaybook.load(self._playbook_path)
|
|
self._reflector = AtomizerReflector(self._playbook)
|
|
self._feedback_loop = FeedbackLoop(self._playbook_path)
|
|
|
|
# Compaction
|
|
self._enable_compaction = enable_compaction
|
|
if enable_compaction:
|
|
self._compaction = CompactionManager(
|
|
compaction_threshold=50,
|
|
keep_recent=20
|
|
)
|
|
else:
|
|
self._compaction = None
|
|
|
|
# Session
|
|
self._session = get_session()
|
|
self._session.exposed.task_type = TaskType.RUN_OPTIMIZATION
|
|
|
|
# Statistics
|
|
self._stats = {
|
|
"trials_observed": 0,
|
|
"successful_trials": 0,
|
|
"failed_trials": 0,
|
|
"insights_generated": 0
|
|
}
|
|
|
|
# Hook into runner's objective function
|
|
self._original_objective = runner._objective_function
|
|
runner._objective_function = self._wrapped_objective
|
|
|
|
def _wrapped_objective(self, trial) -> float:
|
|
"""
|
|
Wrapped objective function that captures outcomes.
|
|
"""
|
|
start_time = time.time()
|
|
trial_number = trial.number
|
|
|
|
# Record trial start
|
|
if self._compaction:
|
|
from .compaction import ContextEvent
|
|
self._compaction.add_event(ContextEvent(
|
|
timestamp=datetime.now(),
|
|
event_type=EventType.TRIAL_START,
|
|
summary=f"Trial {trial_number} starting"
|
|
))
|
|
|
|
try:
|
|
# Run original objective
|
|
result = self._original_objective(trial)
|
|
|
|
# Record success
|
|
duration = time.time() - start_time
|
|
self._record_success(trial_number, result, trial.params, duration)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Record failure
|
|
duration = time.time() - start_time
|
|
self._record_failure(trial_number, str(e), trial.params, duration)
|
|
raise
|
|
|
|
def _record_success(
|
|
self,
|
|
trial_number: int,
|
|
objective_value: float,
|
|
params: Dict[str, Any],
|
|
duration: float
|
|
) -> None:
|
|
"""Record successful trial."""
|
|
self._stats["trials_observed"] += 1
|
|
self._stats["successful_trials"] += 1
|
|
|
|
if self._compaction:
|
|
self._compaction.add_trial_event(
|
|
trial_number=trial_number,
|
|
success=True,
|
|
objective=objective_value,
|
|
duration=duration
|
|
)
|
|
|
|
# Process through feedback loop
|
|
self._feedback_loop.process_trial_result(
|
|
trial_number=trial_number,
|
|
success=True,
|
|
objective_value=objective_value,
|
|
design_variables=dict(params),
|
|
context_items_used=list(self._playbook.items.keys())[:10]
|
|
)
|
|
|
|
# Update session
|
|
self._session.add_action(f"Trial {trial_number}: obj={objective_value:.4g}")
|
|
|
|
def _record_failure(
|
|
self,
|
|
trial_number: int,
|
|
error: str,
|
|
params: Dict[str, Any],
|
|
duration: float
|
|
) -> None:
|
|
"""Record failed trial."""
|
|
self._stats["trials_observed"] += 1
|
|
self._stats["failed_trials"] += 1
|
|
|
|
if self._compaction:
|
|
self._compaction.add_trial_event(
|
|
trial_number=trial_number,
|
|
success=False,
|
|
duration=duration
|
|
)
|
|
self._compaction.add_error_event(error, "trial_failure")
|
|
|
|
# Process through feedback loop
|
|
self._feedback_loop.process_trial_result(
|
|
trial_number=trial_number,
|
|
success=False,
|
|
objective_value=0.0,
|
|
design_variables=dict(params),
|
|
errors=[error]
|
|
)
|
|
|
|
# Update session
|
|
self._session.add_error(f"Trial {trial_number}: {error[:100]}")
|
|
|
|
def run(self, *args, **kwargs):
|
|
"""
|
|
Run optimization with context engineering.
|
|
|
|
Passes through to wrapped runner.run() with context tracking.
|
|
"""
|
|
# Update session state
|
|
study_name = kwargs.get('study_name', 'unknown')
|
|
self._session.exposed.study_name = study_name
|
|
self._session.exposed.study_status = "running"
|
|
|
|
try:
|
|
# Run optimization
|
|
result = self._runner.run(*args, **kwargs)
|
|
|
|
# Finalize context engineering
|
|
self._finalize(study_name)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
self._session.add_error(f"Study failed: {str(e)}")
|
|
raise
|
|
|
|
def _finalize(self, study_name: str) -> None:
|
|
"""Finalize context engineering after optimization."""
|
|
total_trials = self._stats["trials_observed"]
|
|
success_rate = (
|
|
self._stats["successful_trials"] / total_trials
|
|
if total_trials > 0 else 0
|
|
)
|
|
|
|
# Finalize feedback loop
|
|
result = self._feedback_loop.finalize_study({
|
|
"name": study_name,
|
|
"total_trials": total_trials,
|
|
"best_value": getattr(self._runner, 'best_value', 0),
|
|
"convergence_rate": success_rate
|
|
})
|
|
|
|
self._stats["insights_generated"] = result.get("insights_added", 0)
|
|
|
|
# Update session
|
|
self._session.exposed.study_status = "completed"
|
|
self._session.exposed.trials_completed = total_trials
|
|
|
|
def get_learning_report(self) -> Dict[str, Any]:
|
|
"""Get report on what the system learned."""
|
|
return {
|
|
"statistics": self._stats,
|
|
"playbook_size": len(self._playbook.items),
|
|
"playbook_stats": self._playbook.get_stats(),
|
|
"feedback_stats": self._feedback_loop.get_statistics(),
|
|
"top_insights": self._feedback_loop.get_top_performers(10),
|
|
"compaction_stats": (
|
|
self._compaction.get_stats() if self._compaction else None
|
|
)
|
|
}
|
|
|
|
def get_context(self) -> str:
|
|
"""Get current context string for LLM."""
|
|
parts = [self._session.get_llm_context()]
|
|
|
|
if self._compaction:
|
|
parts.append(self._compaction.get_context_string())
|
|
|
|
playbook_context = self._playbook.get_context_for_task("optimization")
|
|
if playbook_context:
|
|
parts.append(playbook_context)
|
|
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
def __getattr__(self, name):
|
|
"""Delegate unknown attributes to wrapped runner."""
|
|
return getattr(self._runner, name)
|