""" Context Engineering Integration for OptimizationRunner Provides integration between the context engineering system and the OptimizationRunner without modifying the core runner code. Two approaches are provided: 1. ContextEngineeringMixin - Mix into OptimizationRunner subclass 2. ContextAwareRunner - Wrapper that adds context engineering Usage: # Approach 1: Mixin class MyRunner(ContextEngineeringMixin, OptimizationRunner): pass # Approach 2: Wrapper runner = OptimizationRunner(...) context_runner = ContextAwareRunner(runner, playbook_path) context_runner.run(...) """ from typing import Dict, Any, Optional, List, Callable from pathlib import Path from datetime import datetime import time from .playbook import AtomizerPlaybook, get_playbook from .reflector import AtomizerReflector, OptimizationOutcome from .feedback_loop import FeedbackLoop from .compaction import CompactionManager, EventType from .session_state import AtomizerSessionState, TaskType, get_session class ContextEngineeringMixin: """ Mixin class to add context engineering to OptimizationRunner. Provides: - Automatic playbook loading/saving - Trial outcome reflection - Learning from successes/failures - Session state tracking Usage: class MyContextAwareRunner(ContextEngineeringMixin, OptimizationRunner): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.init_context_engineering() """ def init_context_engineering( self, playbook_path: Optional[Path] = None, enable_compaction: bool = True, compaction_threshold: int = 50 ) -> None: """ Initialize context engineering components. Call this in your subclass __init__ after super().__init__(). Args: playbook_path: Path to playbook JSON (default: output_dir/playbook.json) enable_compaction: Whether to enable context compaction compaction_threshold: Number of events before compaction """ # Determine playbook path if playbook_path is None: playbook_path = getattr(self, 'output_dir', Path('.')) / 'playbook.json' self._playbook_path = Path(playbook_path) self._playbook = AtomizerPlaybook.load(self._playbook_path) self._reflector = AtomizerReflector(self._playbook) self._feedback_loop = FeedbackLoop(self._playbook_path) # Initialize compaction if enabled self._enable_compaction = enable_compaction if enable_compaction: self._compaction_manager = CompactionManager( compaction_threshold=compaction_threshold, keep_recent=20, keep_errors=True ) else: self._compaction_manager = None # Session state self._session = get_session() self._session.exposed.task_type = TaskType.RUN_OPTIMIZATION # Track active playbook items for feedback attribution self._active_playbook_items: List[str] = [] # Statistics self._context_stats = { "trials_processed": 0, "insights_generated": 0, "errors_captured": 0 } def get_relevant_playbook_items(self, max_items: int = 15) -> List[str]: """ Get relevant playbook items for current optimization context. Returns: List of playbook item context strings """ context = self._playbook.get_context_for_task( task_type="optimization", max_items=max_items, min_confidence=0.5 ) # Extract item IDs for feedback tracking self._active_playbook_items = [ item.id for item in self._playbook.items.values() ][:max_items] return context.split('\n') def record_trial_start(self, trial_number: int, design_vars: Dict[str, float]) -> None: """ Record the start of a trial for context tracking. Args: trial_number: Trial number design_vars: Design variable values """ if self._compaction_manager: self._compaction_manager.add_event( self._compaction_manager.events.__class__( timestamp=datetime.now(), event_type=EventType.TRIAL_START, summary=f"Trial {trial_number} started", details={"trial_number": trial_number, "design_vars": design_vars} ) ) self._session.add_action(f"Started trial {trial_number}") def record_trial_outcome( self, trial_number: int, success: bool, objective_value: Optional[float], design_vars: Dict[str, float], errors: Optional[List[str]] = None, duration_seconds: float = 0.0 ) -> Dict[str, Any]: """ Record the outcome of a trial for learning. Args: trial_number: Trial number success: Whether trial succeeded objective_value: Objective value (None if failed) design_vars: Design variable values errors: List of error messages duration_seconds: Trial duration Returns: Dictionary with processing results """ errors = errors or [] # Update compaction manager if self._compaction_manager: self._compaction_manager.add_trial_event( trial_number=trial_number, success=success, objective=objective_value, duration=duration_seconds ) # Create outcome for reflection outcome = OptimizationOutcome( trial_number=trial_number, success=success, objective_value=objective_value, constraint_violations=[], solver_errors=errors, design_variables=design_vars, extractor_used=getattr(self, '_current_extractor', ''), duration_seconds=duration_seconds ) # Analyze and generate insights insights = self._reflector.analyze_trial(outcome) # Process through feedback loop result = self._feedback_loop.process_trial_result( trial_number=trial_number, success=success, objective_value=objective_value or 0.0, design_variables=design_vars, context_items_used=self._active_playbook_items, errors=errors ) # Update statistics self._context_stats["trials_processed"] += 1 self._context_stats["insights_generated"] += len(insights) # Update session state if success: self._session.add_action( f"Trial {trial_number} succeeded: obj={objective_value:.4g}" ) else: error_summary = errors[0][:50] if errors else "unknown" self._session.add_error(f"Trial {trial_number}: {error_summary}") self._context_stats["errors_captured"] += 1 return { "insights_extracted": len(insights), "playbook_items_updated": result.get("items_updated", 0) } def record_error(self, error_message: str, error_type: str = "") -> None: """ Record an error for learning (outside trial context). Args: error_message: Error description error_type: Error classification """ if self._compaction_manager: self._compaction_manager.add_error_event(error_message, error_type) self._session.add_error(error_message, error_type) self._context_stats["errors_captured"] += 1 def finalize_context_engineering(self, study_stats: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Finalize context engineering at end of optimization. Commits insights and saves playbook. Args: study_stats: Optional study statistics for analysis Returns: Dictionary with finalization results """ if study_stats is None: study_stats = { "name": getattr(self, 'study', {}).get('study_name', 'unknown'), "total_trials": self._context_stats["trials_processed"], "best_value": getattr(self, 'best_value', 0), "convergence_rate": 0.8 # Would need actual calculation } # Finalize feedback loop result = self._feedback_loop.finalize_study(study_stats) # Save playbook self._playbook.save(self._playbook_path) # Add compaction stats if self._compaction_manager: result["compaction_stats"] = self._compaction_manager.get_stats() result["context_stats"] = self._context_stats return result def get_context_string(self) -> str: """ Get full context string for LLM consumption. Returns: Formatted context string """ parts = [] # Session state parts.append(self._session.get_llm_context()) # Playbook items playbook_context = self._playbook.get_context_for_task( task_type="optimization", max_items=15 ) if playbook_context: parts.append(playbook_context) # Compaction history if self._compaction_manager: parts.append(self._compaction_manager.get_context_string()) return "\n\n---\n\n".join(parts) class ContextAwareRunner: """ Wrapper that adds context engineering to any OptimizationRunner. This approach doesn't require subclassing - it wraps an existing runner instance and intercepts relevant calls. Usage: runner = OptimizationRunner(...) context_runner = ContextAwareRunner(runner) # Use context_runner.run() instead of runner.run() study = context_runner.run(n_trials=50) # Get learning report report = context_runner.get_learning_report() """ def __init__( self, runner, playbook_path: Optional[Path] = None, enable_compaction: bool = True ): """ Initialize context-aware wrapper. Args: runner: OptimizationRunner instance to wrap playbook_path: Path to playbook (default: runner's output_dir) enable_compaction: Whether to enable context compaction """ self._runner = runner # Determine playbook path if playbook_path is None: playbook_path = runner.output_dir / 'playbook.json' self._playbook_path = Path(playbook_path) self._playbook = AtomizerPlaybook.load(self._playbook_path) self._reflector = AtomizerReflector(self._playbook) self._feedback_loop = FeedbackLoop(self._playbook_path) # Compaction self._enable_compaction = enable_compaction if enable_compaction: self._compaction = CompactionManager( compaction_threshold=50, keep_recent=20 ) else: self._compaction = None # Session self._session = get_session() self._session.exposed.task_type = TaskType.RUN_OPTIMIZATION # Statistics self._stats = { "trials_observed": 0, "successful_trials": 0, "failed_trials": 0, "insights_generated": 0 } # Hook into runner's objective function self._original_objective = runner._objective_function runner._objective_function = self._wrapped_objective def _wrapped_objective(self, trial) -> float: """ Wrapped objective function that captures outcomes. """ start_time = time.time() trial_number = trial.number # Record trial start if self._compaction: from .compaction import ContextEvent self._compaction.add_event(ContextEvent( timestamp=datetime.now(), event_type=EventType.TRIAL_START, summary=f"Trial {trial_number} starting" )) try: # Run original objective result = self._original_objective(trial) # Record success duration = time.time() - start_time self._record_success(trial_number, result, trial.params, duration) return result except Exception as e: # Record failure duration = time.time() - start_time self._record_failure(trial_number, str(e), trial.params, duration) raise def _record_success( self, trial_number: int, objective_value: float, params: Dict[str, Any], duration: float ) -> None: """Record successful trial.""" self._stats["trials_observed"] += 1 self._stats["successful_trials"] += 1 if self._compaction: self._compaction.add_trial_event( trial_number=trial_number, success=True, objective=objective_value, duration=duration ) # Process through feedback loop self._feedback_loop.process_trial_result( trial_number=trial_number, success=True, objective_value=objective_value, design_variables=dict(params), context_items_used=list(self._playbook.items.keys())[:10] ) # Update session self._session.add_action(f"Trial {trial_number}: obj={objective_value:.4g}") def _record_failure( self, trial_number: int, error: str, params: Dict[str, Any], duration: float ) -> None: """Record failed trial.""" self._stats["trials_observed"] += 1 self._stats["failed_trials"] += 1 if self._compaction: self._compaction.add_trial_event( trial_number=trial_number, success=False, duration=duration ) self._compaction.add_error_event(error, "trial_failure") # Process through feedback loop self._feedback_loop.process_trial_result( trial_number=trial_number, success=False, objective_value=0.0, design_variables=dict(params), errors=[error] ) # Update session self._session.add_error(f"Trial {trial_number}: {error[:100]}") def run(self, *args, **kwargs): """ Run optimization with context engineering. Passes through to wrapped runner.run() with context tracking. """ # Update session state study_name = kwargs.get('study_name', 'unknown') self._session.exposed.study_name = study_name self._session.exposed.study_status = "running" try: # Run optimization result = self._runner.run(*args, **kwargs) # Finalize context engineering self._finalize(study_name) return result except Exception as e: self._session.add_error(f"Study failed: {str(e)}") raise def _finalize(self, study_name: str) -> None: """Finalize context engineering after optimization.""" total_trials = self._stats["trials_observed"] success_rate = ( self._stats["successful_trials"] / total_trials if total_trials > 0 else 0 ) # Finalize feedback loop result = self._feedback_loop.finalize_study({ "name": study_name, "total_trials": total_trials, "best_value": getattr(self._runner, 'best_value', 0), "convergence_rate": success_rate }) self._stats["insights_generated"] = result.get("insights_added", 0) # Update session self._session.exposed.study_status = "completed" self._session.exposed.trials_completed = total_trials def get_learning_report(self) -> Dict[str, Any]: """Get report on what the system learned.""" return { "statistics": self._stats, "playbook_size": len(self._playbook.items), "playbook_stats": self._playbook.get_stats(), "feedback_stats": self._feedback_loop.get_statistics(), "top_insights": self._feedback_loop.get_top_performers(10), "compaction_stats": ( self._compaction.get_stats() if self._compaction else None ) } def get_context(self) -> str: """Get current context string for LLM.""" parts = [self._session.get_llm_context()] if self._compaction: parts.append(self._compaction.get_context_string()) playbook_context = self._playbook.get_context_for_task("optimization") if playbook_context: parts.append(playbook_context) return "\n\n---\n\n".join(parts) def __getattr__(self, name): """Delegate unknown attributes to wrapped runner.""" return getattr(self._runner, name)