- Add validation framework (config, model, results, study validators) - Add Claude Code skills (create-study, run-optimization, generate-report, troubleshoot, analyze-model) - Add Atomizer Dashboard (React frontend + FastAPI backend) - Reorganize docs into structured directories (00-09) - Add neural surrogate modules and training infrastructure - Add multi-objective optimization support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
434 lines
16 KiB
Python
434 lines
16 KiB
Python
"""
|
|
Strategy Portfolio Manager - Dynamic multi-strategy optimization.
|
|
|
|
This module manages dynamic switching between optimization strategies during a run.
|
|
It detects stagnation, evaluates alternative strategies, and orchestrates transitions
|
|
to maintain optimization progress.
|
|
|
|
Part of Protocol 10: Intelligent Multi-Strategy Optimization (IMSO)
|
|
"""
|
|
|
|
import numpy as np
|
|
import optuna
|
|
from typing import Dict, List, Optional, Tuple
|
|
import json
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
|
|
|
|
@dataclass
|
|
class StrategyPerformance:
|
|
"""Track performance metrics for a strategy."""
|
|
strategy_name: str
|
|
trials_used: int
|
|
best_value_achieved: float
|
|
improvement_rate: float # Improvement per trial
|
|
last_used_trial: int
|
|
avg_trial_time: float = 0.0
|
|
|
|
|
|
class StrategyTransitionManager:
|
|
"""
|
|
Manages transitions between optimization strategies.
|
|
|
|
Implements intelligent strategy switching based on:
|
|
1. Stagnation detection
|
|
2. Landscape characteristics
|
|
3. Strategy performance history
|
|
4. User-defined transition rules
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
stagnation_window: int = 10,
|
|
min_improvement_threshold: float = 0.001,
|
|
verbose: bool = True,
|
|
tracking_dir: Optional[Path] = None
|
|
):
|
|
"""
|
|
Args:
|
|
stagnation_window: Number of trials to check for stagnation
|
|
min_improvement_threshold: Minimum relative improvement to avoid stagnation
|
|
verbose: Print transition decisions
|
|
tracking_dir: Directory to save transition logs
|
|
"""
|
|
self.stagnation_window = stagnation_window
|
|
self.min_improvement = min_improvement_threshold
|
|
self.verbose = verbose
|
|
self.tracking_dir = tracking_dir
|
|
|
|
# Track strategy performance
|
|
self.strategy_history: Dict[str, StrategyPerformance] = {}
|
|
self.current_strategy: Optional[str] = None
|
|
self.transition_history: List[Dict] = []
|
|
|
|
# Initialize tracking files
|
|
if tracking_dir:
|
|
self.tracking_dir = Path(tracking_dir)
|
|
self.tracking_dir.mkdir(parents=True, exist_ok=True)
|
|
self.transition_log_file = self.tracking_dir / "strategy_transitions.json"
|
|
self.performance_log_file = self.tracking_dir / "strategy_performance.json"
|
|
|
|
# Load existing history
|
|
self._load_transition_history()
|
|
|
|
def should_switch_strategy(
|
|
self,
|
|
study: optuna.Study,
|
|
landscape: Optional[Dict] = None
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Determine if strategy should be switched.
|
|
|
|
Args:
|
|
study: Optuna study
|
|
landscape: Current landscape analysis (optional)
|
|
|
|
Returns:
|
|
(should_switch, reason)
|
|
"""
|
|
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
|
|
|
if len(completed_trials) < self.stagnation_window:
|
|
return False, "Insufficient trials for stagnation analysis"
|
|
|
|
# Check for stagnation in recent trials
|
|
recent_trials = completed_trials[-self.stagnation_window:]
|
|
is_stagnant, stagnation_reason = self._detect_stagnation(recent_trials)
|
|
|
|
if is_stagnant:
|
|
return True, stagnation_reason
|
|
|
|
# Check if landscape changed (would require re-analysis)
|
|
if landscape and self._landscape_changed(landscape):
|
|
return True, "Landscape characteristics changed - re-evaluating strategy"
|
|
|
|
# Check if current strategy hit its theoretical limit
|
|
if self._strategy_exhausted(study, landscape):
|
|
return True, "Current strategy reached convergence limit"
|
|
|
|
return False, "Strategy performing adequately"
|
|
|
|
def _detect_stagnation(self, recent_trials: List) -> Tuple[bool, str]:
|
|
"""
|
|
Detect if optimization has stagnated.
|
|
|
|
Stagnation indicators:
|
|
1. No improvement in best value
|
|
2. High variance in recent objectives (thrashing)
|
|
3. Repeated similar parameter configurations
|
|
|
|
[Protocol 11] Multi-objective NOT supported - stagnation detection
|
|
requires a single objective value. Skip for multi-objective studies.
|
|
"""
|
|
if len(recent_trials) < 3:
|
|
return False, ""
|
|
|
|
# [Protocol 11] Skip stagnation detection for multi-objective
|
|
# Multi-objective has a Pareto front, not a single "best value"
|
|
if recent_trials and recent_trials[0].values is not None:
|
|
# Multi-objective trial (has .values instead of .value)
|
|
return False, "[Protocol 11] Stagnation detection skipped for multi-objective"
|
|
|
|
recent_values = [t.value for t in recent_trials]
|
|
|
|
# 1. Check for improvement in best value
|
|
best_values = []
|
|
current_best = float('inf')
|
|
for value in recent_values:
|
|
current_best = min(current_best, value)
|
|
best_values.append(current_best)
|
|
|
|
# Calculate improvement over window
|
|
if len(best_values) >= 2:
|
|
initial_best = best_values[0]
|
|
final_best = best_values[-1]
|
|
|
|
if initial_best > 0:
|
|
relative_improvement = (initial_best - final_best) / initial_best
|
|
else:
|
|
relative_improvement = abs(final_best - initial_best)
|
|
|
|
if relative_improvement < self.min_improvement:
|
|
return True, f"Stagnation detected: <{self.min_improvement:.1%} improvement in {self.stagnation_window} trials"
|
|
|
|
# 2. Check for thrashing (high variance without improvement)
|
|
recent_variance = np.var(recent_values)
|
|
recent_mean = np.mean(recent_values)
|
|
|
|
if recent_mean > 0:
|
|
coefficient_of_variation = np.sqrt(recent_variance) / recent_mean
|
|
|
|
if coefficient_of_variation > 0.3: # High variance
|
|
# If high variance but no improvement, we're thrashing
|
|
if best_values[0] == best_values[-1]:
|
|
return True, f"Thrashing detected: High variance ({coefficient_of_variation:.2f}) without improvement"
|
|
|
|
return False, ""
|
|
|
|
def _landscape_changed(self, landscape: Dict) -> bool:
|
|
"""
|
|
Detect if landscape characteristics changed significantly.
|
|
|
|
This would indicate we're in a different region of search space.
|
|
"""
|
|
# This is a placeholder - would need to track landscape history
|
|
# For now, return False (no change detection)
|
|
return False
|
|
|
|
def _strategy_exhausted(
|
|
self,
|
|
study: optuna.Study,
|
|
landscape: Optional[Dict]
|
|
) -> bool:
|
|
"""
|
|
Check if current strategy has reached its theoretical limit.
|
|
|
|
Different strategies have different convergence properties:
|
|
- CMA-ES: Fast convergence but can get stuck in local minimum
|
|
- TPE: Slower convergence but better global exploration
|
|
- GP-BO: Sample efficient but plateaus after exploration
|
|
"""
|
|
if not self.current_strategy or not landscape:
|
|
return False
|
|
|
|
# CMA-ES exhaustion: High convergence in smooth landscape
|
|
if self.current_strategy == 'cmaes':
|
|
if landscape.get('smoothness', 0) > 0.7:
|
|
# Check if we've converged (low variance in recent trials)
|
|
completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
|
if len(completed) >= 20:
|
|
recent_params = []
|
|
for trial in completed[-10:]:
|
|
recent_params.append(list(trial.params.values()))
|
|
|
|
recent_params = np.array(recent_params)
|
|
param_variance = np.var(recent_params, axis=0)
|
|
|
|
# If variance is very low, CMA-ES has converged
|
|
if np.all(param_variance < 0.01):
|
|
return True
|
|
|
|
return False
|
|
|
|
def record_strategy_performance(
|
|
self,
|
|
strategy_name: str,
|
|
study: optuna.Study,
|
|
trial: optuna.trial.FrozenTrial
|
|
):
|
|
"""Record performance metrics for current strategy."""
|
|
if strategy_name not in self.strategy_history:
|
|
self.strategy_history[strategy_name] = StrategyPerformance(
|
|
strategy_name=strategy_name,
|
|
trials_used=0,
|
|
best_value_achieved=float('inf'),
|
|
improvement_rate=0.0,
|
|
last_used_trial=0
|
|
)
|
|
|
|
perf = self.strategy_history[strategy_name]
|
|
perf.trials_used += 1
|
|
perf.best_value_achieved = min(perf.best_value_achieved, trial.value)
|
|
perf.last_used_trial = trial.number
|
|
|
|
# Calculate improvement rate
|
|
if perf.trials_used > 1:
|
|
initial_best = study.trials[max(0, trial.number - perf.trials_used)].value
|
|
perf.improvement_rate = (initial_best - perf.best_value_achieved) / perf.trials_used
|
|
|
|
def execute_strategy_switch(
|
|
self,
|
|
study: optuna.Study,
|
|
from_strategy: str,
|
|
to_strategy: str,
|
|
reason: str,
|
|
trial_number: int
|
|
):
|
|
"""
|
|
Execute strategy switch and log the transition.
|
|
|
|
Args:
|
|
study: Optuna study
|
|
from_strategy: Current strategy
|
|
to_strategy: New strategy to switch to
|
|
reason: Reason for switching
|
|
trial_number: Current trial number
|
|
"""
|
|
transition_event = {
|
|
'trial_number': trial_number,
|
|
'from_strategy': from_strategy,
|
|
'to_strategy': to_strategy,
|
|
'reason': reason,
|
|
'best_value_at_switch': study.best_value,
|
|
'total_trials': len(study.trials),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
self.transition_history.append(transition_event)
|
|
self.current_strategy = to_strategy
|
|
|
|
# Save transition log
|
|
if self.tracking_dir:
|
|
try:
|
|
with open(self.transition_log_file, 'w') as f:
|
|
json.dump(self.transition_history, f, indent=2)
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f" Warning: Failed to save transition log: {e}")
|
|
|
|
if self.verbose:
|
|
self._print_transition(transition_event)
|
|
|
|
def _print_transition(self, event: Dict):
|
|
"""Print formatted transition announcement."""
|
|
print(f"\n{'='*70}")
|
|
print(f" STRATEGY TRANSITION")
|
|
print(f"{'='*70}")
|
|
print(f" Trial #{event['trial_number']}")
|
|
print(f" {event['from_strategy'].upper()} -> {event['to_strategy'].upper()}")
|
|
print(f" Reason: {event['reason']}")
|
|
print(f" Best value at transition: {event['best_value_at_switch']:.6f}")
|
|
print(f"{'='*70}\n")
|
|
|
|
def _load_transition_history(self):
|
|
"""Load existing transition history from file."""
|
|
if self.transition_log_file and self.transition_log_file.exists():
|
|
try:
|
|
with open(self.transition_log_file, 'r') as f:
|
|
self.transition_history = json.load(f)
|
|
|
|
# Restore current strategy from history
|
|
if self.transition_history:
|
|
self.current_strategy = self.transition_history[-1]['to_strategy']
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f" Warning: Failed to load transition history: {e}")
|
|
|
|
def save_performance_summary(self):
|
|
"""Save strategy performance summary to file."""
|
|
if not self.tracking_dir:
|
|
return
|
|
|
|
summary = {
|
|
'strategies': {
|
|
name: asdict(perf)
|
|
for name, perf in self.strategy_history.items()
|
|
},
|
|
'current_strategy': self.current_strategy,
|
|
'total_transitions': len(self.transition_history)
|
|
}
|
|
|
|
try:
|
|
with open(self.performance_log_file, 'w') as f:
|
|
json.dump(summary, f, indent=2)
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f" Warning: Failed to save performance summary: {e}")
|
|
|
|
def get_performance_report(self) -> str:
|
|
"""Generate human-readable performance report."""
|
|
if not self.strategy_history:
|
|
return "No strategy performance data available"
|
|
|
|
report = "\n" + "="*70 + "\n"
|
|
report += " STRATEGY PERFORMANCE SUMMARY\n"
|
|
report += "="*70 + "\n"
|
|
|
|
for name, perf in self.strategy_history.items():
|
|
report += f"\n {name.upper()}:\n"
|
|
report += f" Trials used: {perf.trials_used}\n"
|
|
report += f" Best value: {perf.best_value_achieved:.6f}\n"
|
|
report += f" Improvement rate: {perf.improvement_rate:.6f} per trial\n"
|
|
report += f" Last used: Trial #{perf.last_used_trial}\n"
|
|
|
|
if self.transition_history:
|
|
report += f"\n TRANSITIONS: {len(self.transition_history)}\n"
|
|
for event in self.transition_history:
|
|
report += f" Trial #{event['trial_number']}: "
|
|
report += f"{event['from_strategy']} → {event['to_strategy']}\n"
|
|
report += f" Reason: {event['reason']}\n"
|
|
|
|
report += "="*70 + "\n"
|
|
|
|
return report
|
|
|
|
|
|
class AdaptiveStrategyCallback:
|
|
"""
|
|
Optuna callback that manages adaptive strategy switching.
|
|
|
|
This callback integrates with the IntelligentOptimizer to:
|
|
1. Monitor strategy performance
|
|
2. Detect when switching is needed
|
|
3. Coordinate with landscape analyzer and strategy selector
|
|
4. Execute transitions
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
transition_manager: StrategyTransitionManager,
|
|
landscape_analyzer,
|
|
strategy_selector,
|
|
reanalysis_interval: int = 15
|
|
):
|
|
"""
|
|
Args:
|
|
transition_manager: StrategyTransitionManager instance
|
|
landscape_analyzer: LandscapeAnalyzer instance
|
|
strategy_selector: IntelligentStrategySelector instance
|
|
reanalysis_interval: How often to re-analyze landscape
|
|
"""
|
|
self.transition_manager = transition_manager
|
|
self.landscape_analyzer = landscape_analyzer
|
|
self.strategy_selector = strategy_selector
|
|
self.reanalysis_interval = reanalysis_interval
|
|
|
|
self.last_landscape = None
|
|
self.last_recommendation = None
|
|
|
|
def __call__(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
|
"""Called after each trial completes."""
|
|
if trial.state != optuna.trial.TrialState.COMPLETE:
|
|
return
|
|
|
|
current_strategy = self.transition_manager.current_strategy
|
|
|
|
# Record performance
|
|
if current_strategy:
|
|
self.transition_manager.record_strategy_performance(
|
|
current_strategy, study, trial
|
|
)
|
|
|
|
# Periodically re-analyze landscape
|
|
if trial.number % self.reanalysis_interval == 0:
|
|
self.last_landscape = self.landscape_analyzer.analyze(study)
|
|
|
|
# Check if we should switch
|
|
should_switch, reason = self.transition_manager.should_switch_strategy(
|
|
study, self.last_landscape
|
|
)
|
|
|
|
if should_switch and self.last_landscape:
|
|
# Get new strategy recommendation
|
|
new_strategy, details = self.strategy_selector.recommend_strategy(
|
|
landscape=self.last_landscape,
|
|
trials_completed=trial.number,
|
|
current_best_value=study.best_value
|
|
)
|
|
|
|
# Only switch if recommendation is different
|
|
if new_strategy != current_strategy:
|
|
self.transition_manager.execute_strategy_switch(
|
|
study=study,
|
|
from_strategy=current_strategy or 'initial',
|
|
to_strategy=new_strategy,
|
|
reason=reason,
|
|
trial_number=trial.number
|
|
)
|
|
|
|
# Note: Actual sampler change requires study recreation
|
|
# This is logged for the IntelligentOptimizer to act on
|
|
self.last_recommendation = (new_strategy, details)
|