""" Strategy Portfolio Manager - Dynamic multi-strategy optimization. This module manages dynamic switching between optimization strategies during a run. It detects stagnation, evaluates alternative strategies, and orchestrates transitions to maintain optimization progress. Part of Protocol 10: Intelligent Multi-Strategy Optimization (IMSO) """ import numpy as np import optuna from typing import Dict, List, Optional, Tuple import json from pathlib import Path from dataclasses import dataclass, asdict from datetime import datetime @dataclass class StrategyPerformance: """Track performance metrics for a strategy.""" strategy_name: str trials_used: int best_value_achieved: float improvement_rate: float # Improvement per trial last_used_trial: int avg_trial_time: float = 0.0 class StrategyTransitionManager: """ Manages transitions between optimization strategies. Implements intelligent strategy switching based on: 1. Stagnation detection 2. Landscape characteristics 3. Strategy performance history 4. User-defined transition rules """ def __init__( self, stagnation_window: int = 10, min_improvement_threshold: float = 0.001, verbose: bool = True, tracking_dir: Optional[Path] = None ): """ Args: stagnation_window: Number of trials to check for stagnation min_improvement_threshold: Minimum relative improvement to avoid stagnation verbose: Print transition decisions tracking_dir: Directory to save transition logs """ self.stagnation_window = stagnation_window self.min_improvement = min_improvement_threshold self.verbose = verbose self.tracking_dir = tracking_dir # Track strategy performance self.strategy_history: Dict[str, StrategyPerformance] = {} self.current_strategy: Optional[str] = None self.transition_history: List[Dict] = [] # Initialize tracking files if tracking_dir: self.tracking_dir = Path(tracking_dir) self.tracking_dir.mkdir(parents=True, exist_ok=True) self.transition_log_file = self.tracking_dir / "strategy_transitions.json" self.performance_log_file = self.tracking_dir / "strategy_performance.json" # Load existing history self._load_transition_history() def should_switch_strategy( self, study: optuna.Study, landscape: Optional[Dict] = None ) -> Tuple[bool, str]: """ Determine if strategy should be switched. Args: study: Optuna study landscape: Current landscape analysis (optional) Returns: (should_switch, reason) """ completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] if len(completed_trials) < self.stagnation_window: return False, "Insufficient trials for stagnation analysis" # Check for stagnation in recent trials recent_trials = completed_trials[-self.stagnation_window:] is_stagnant, stagnation_reason = self._detect_stagnation(recent_trials) if is_stagnant: return True, stagnation_reason # Check if landscape changed (would require re-analysis) if landscape and self._landscape_changed(landscape): return True, "Landscape characteristics changed - re-evaluating strategy" # Check if current strategy hit its theoretical limit if self._strategy_exhausted(study, landscape): return True, "Current strategy reached convergence limit" return False, "Strategy performing adequately" def _detect_stagnation(self, recent_trials: List) -> Tuple[bool, str]: """ Detect if optimization has stagnated. Stagnation indicators: 1. No improvement in best value 2. High variance in recent objectives (thrashing) 3. Repeated similar parameter configurations [Protocol 11] Multi-objective NOT supported - stagnation detection requires a single objective value. Skip for multi-objective studies. """ if len(recent_trials) < 3: return False, "" # [Protocol 11] Skip stagnation detection for multi-objective # Multi-objective has a Pareto front, not a single "best value" if recent_trials and recent_trials[0].values is not None: # Multi-objective trial (has .values instead of .value) return False, "[Protocol 11] Stagnation detection skipped for multi-objective" recent_values = [t.value for t in recent_trials] # 1. Check for improvement in best value best_values = [] current_best = float('inf') for value in recent_values: current_best = min(current_best, value) best_values.append(current_best) # Calculate improvement over window if len(best_values) >= 2: initial_best = best_values[0] final_best = best_values[-1] if initial_best > 0: relative_improvement = (initial_best - final_best) / initial_best else: relative_improvement = abs(final_best - initial_best) if relative_improvement < self.min_improvement: return True, f"Stagnation detected: <{self.min_improvement:.1%} improvement in {self.stagnation_window} trials" # 2. Check for thrashing (high variance without improvement) recent_variance = np.var(recent_values) recent_mean = np.mean(recent_values) if recent_mean > 0: coefficient_of_variation = np.sqrt(recent_variance) / recent_mean if coefficient_of_variation > 0.3: # High variance # If high variance but no improvement, we're thrashing if best_values[0] == best_values[-1]: return True, f"Thrashing detected: High variance ({coefficient_of_variation:.2f}) without improvement" return False, "" def _landscape_changed(self, landscape: Dict) -> bool: """ Detect if landscape characteristics changed significantly. This would indicate we're in a different region of search space. """ # This is a placeholder - would need to track landscape history # For now, return False (no change detection) return False def _strategy_exhausted( self, study: optuna.Study, landscape: Optional[Dict] ) -> bool: """ Check if current strategy has reached its theoretical limit. Different strategies have different convergence properties: - CMA-ES: Fast convergence but can get stuck in local minimum - TPE: Slower convergence but better global exploration - GP-BO: Sample efficient but plateaus after exploration """ if not self.current_strategy or not landscape: return False # CMA-ES exhaustion: High convergence in smooth landscape if self.current_strategy == 'cmaes': if landscape.get('smoothness', 0) > 0.7: # Check if we've converged (low variance in recent trials) completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] if len(completed) >= 20: recent_params = [] for trial in completed[-10:]: recent_params.append(list(trial.params.values())) recent_params = np.array(recent_params) param_variance = np.var(recent_params, axis=0) # If variance is very low, CMA-ES has converged if np.all(param_variance < 0.01): return True return False def record_strategy_performance( self, strategy_name: str, study: optuna.Study, trial: optuna.trial.FrozenTrial ): """Record performance metrics for current strategy.""" if strategy_name not in self.strategy_history: self.strategy_history[strategy_name] = StrategyPerformance( strategy_name=strategy_name, trials_used=0, best_value_achieved=float('inf'), improvement_rate=0.0, last_used_trial=0 ) perf = self.strategy_history[strategy_name] perf.trials_used += 1 perf.best_value_achieved = min(perf.best_value_achieved, trial.value) perf.last_used_trial = trial.number # Calculate improvement rate if perf.trials_used > 1: initial_best = study.trials[max(0, trial.number - perf.trials_used)].value perf.improvement_rate = (initial_best - perf.best_value_achieved) / perf.trials_used def execute_strategy_switch( self, study: optuna.Study, from_strategy: str, to_strategy: str, reason: str, trial_number: int ): """ Execute strategy switch and log the transition. Args: study: Optuna study from_strategy: Current strategy to_strategy: New strategy to switch to reason: Reason for switching trial_number: Current trial number """ transition_event = { 'trial_number': trial_number, 'from_strategy': from_strategy, 'to_strategy': to_strategy, 'reason': reason, 'best_value_at_switch': study.best_value, 'total_trials': len(study.trials), 'timestamp': datetime.now().isoformat() } self.transition_history.append(transition_event) self.current_strategy = to_strategy # Save transition log if self.tracking_dir: try: with open(self.transition_log_file, 'w') as f: json.dump(self.transition_history, f, indent=2) except Exception as e: if self.verbose: print(f" Warning: Failed to save transition log: {e}") if self.verbose: self._print_transition(transition_event) def _print_transition(self, event: Dict): """Print formatted transition announcement.""" print(f"\n{'='*70}") print(f" STRATEGY TRANSITION") print(f"{'='*70}") print(f" Trial #{event['trial_number']}") print(f" {event['from_strategy'].upper()} -> {event['to_strategy'].upper()}") print(f" Reason: {event['reason']}") print(f" Best value at transition: {event['best_value_at_switch']:.6f}") print(f"{'='*70}\n") def _load_transition_history(self): """Load existing transition history from file.""" if self.transition_log_file and self.transition_log_file.exists(): try: with open(self.transition_log_file, 'r') as f: self.transition_history = json.load(f) # Restore current strategy from history if self.transition_history: self.current_strategy = self.transition_history[-1]['to_strategy'] except Exception as e: if self.verbose: print(f" Warning: Failed to load transition history: {e}") def save_performance_summary(self): """Save strategy performance summary to file.""" if not self.tracking_dir: return summary = { 'strategies': { name: asdict(perf) for name, perf in self.strategy_history.items() }, 'current_strategy': self.current_strategy, 'total_transitions': len(self.transition_history) } try: with open(self.performance_log_file, 'w') as f: json.dump(summary, f, indent=2) except Exception as e: if self.verbose: print(f" Warning: Failed to save performance summary: {e}") def get_performance_report(self) -> str: """Generate human-readable performance report.""" if not self.strategy_history: return "No strategy performance data available" report = "\n" + "="*70 + "\n" report += " STRATEGY PERFORMANCE SUMMARY\n" report += "="*70 + "\n" for name, perf in self.strategy_history.items(): report += f"\n {name.upper()}:\n" report += f" Trials used: {perf.trials_used}\n" report += f" Best value: {perf.best_value_achieved:.6f}\n" report += f" Improvement rate: {perf.improvement_rate:.6f} per trial\n" report += f" Last used: Trial #{perf.last_used_trial}\n" if self.transition_history: report += f"\n TRANSITIONS: {len(self.transition_history)}\n" for event in self.transition_history: report += f" Trial #{event['trial_number']}: " report += f"{event['from_strategy']} → {event['to_strategy']}\n" report += f" Reason: {event['reason']}\n" report += "="*70 + "\n" return report class AdaptiveStrategyCallback: """ Optuna callback that manages adaptive strategy switching. This callback integrates with the IntelligentOptimizer to: 1. Monitor strategy performance 2. Detect when switching is needed 3. Coordinate with landscape analyzer and strategy selector 4. Execute transitions """ def __init__( self, transition_manager: StrategyTransitionManager, landscape_analyzer, strategy_selector, reanalysis_interval: int = 15 ): """ Args: transition_manager: StrategyTransitionManager instance landscape_analyzer: LandscapeAnalyzer instance strategy_selector: IntelligentStrategySelector instance reanalysis_interval: How often to re-analyze landscape """ self.transition_manager = transition_manager self.landscape_analyzer = landscape_analyzer self.strategy_selector = strategy_selector self.reanalysis_interval = reanalysis_interval self.last_landscape = None self.last_recommendation = None def __call__(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """Called after each trial completes.""" if trial.state != optuna.trial.TrialState.COMPLETE: return current_strategy = self.transition_manager.current_strategy # Record performance if current_strategy: self.transition_manager.record_strategy_performance( current_strategy, study, trial ) # Periodically re-analyze landscape if trial.number % self.reanalysis_interval == 0: self.last_landscape = self.landscape_analyzer.analyze(study) # Check if we should switch should_switch, reason = self.transition_manager.should_switch_strategy( study, self.last_landscape ) if should_switch and self.last_landscape: # Get new strategy recommendation new_strategy, details = self.strategy_selector.recommend_strategy( landscape=self.last_landscape, trials_completed=trial.number, current_best_value=study.best_value ) # Only switch if recommendation is different if new_strategy != current_strategy: self.transition_manager.execute_strategy_switch( study=study, from_strategy=current_strategy or 'initial', to_strategy=new_strategy, reason=reason, trial_number=trial.number ) # Note: Actual sampler change requires study recreation # This is logged for the IntelligentOptimizer to act on self.last_recommendation = (new_strategy, details)