""" Adaptive Characterization Module - Intelligent stopping for landscape characterization. This module implements adaptive stopping criteria for the characterization phase that intelligently determines when enough landscape exploration has been done. Simple problems (smooth, unimodal) -> stop early (~10-15 trials) Complex problems (multimodal, rugged) -> continue longer (~20-30 trials) Part of Protocol 10: Intelligent Multi-Strategy Optimization (IMSO) """ import numpy as np import optuna from typing import Dict, List, Optional from dataclasses import dataclass import json from pathlib import Path from datetime import datetime @dataclass class LandscapeMetricSnapshot: """Snapshot of landscape metrics at a given trial.""" trial_number: int smoothness: float multimodal: bool n_modes: int noise_level: float landscape_type: str overall_confidence: float class CharacterizationStoppingCriterion: """ Intelligently determines when characterization phase has gathered enough information. Key Features: 1. Progressive landscape analysis (every 5 trials starting at trial 10) 2. Metric convergence detection (are metrics stabilizing?) 3. Complexity-aware sample adequacy (complex problems need more trials) 4. Parameter space coverage assessment 5. Confidence scoring (combines all factors) Stopping Decision: - Simple problems: Stop at ~10-15 trials when metrics converge - Complex problems: Continue to ~20-30 trials for adequate coverage """ def __init__( self, min_trials: int = 10, max_trials: int = 30, confidence_threshold: float = 0.85, check_interval: int = 5, verbose: bool = True, tracking_dir: Optional[Path] = None ): """ Args: min_trials: Minimum trials before considering stopping max_trials: Maximum trials (stop even if not converged) confidence_threshold: Confidence needed to stop (0-1) check_interval: How often to check stopping criteria verbose: Print progress reports tracking_dir: Directory to save characterization tracking """ self.min_trials = min_trials self.max_trials = max_trials self.confidence_threshold = confidence_threshold self.check_interval = check_interval self.verbose = verbose self.tracking_dir = tracking_dir # Track metric history across analyses self.metric_history: List[LandscapeMetricSnapshot] = [] self.should_stop_flag = False self.stop_reason = "" self.final_confidence = 0.0 # Initialize tracking if tracking_dir: self.tracking_dir = Path(tracking_dir) self.tracking_dir.mkdir(parents=True, exist_ok=True) self.characterization_log = self.tracking_dir / "characterization_progress.json" def update(self, landscape: Dict, trial_number: int): """ Update with latest landscape analysis. Args: landscape: Landscape analysis dictionary trial_number: Current trial number """ if not landscape.get('ready', False): return # Create snapshot snapshot = LandscapeMetricSnapshot( trial_number=trial_number, smoothness=landscape['smoothness'], multimodal=landscape['multimodal'], n_modes=landscape['n_modes'], noise_level=landscape['noise_level'], landscape_type=landscape['landscape_type'], overall_confidence=0.0 # Will be calculated ) self.metric_history.append(snapshot) # Calculate confidence confidence = self._calculate_confidence(landscape, trial_number) snapshot.overall_confidence = confidence # Save progress self._save_progress() # Print report if self.verbose: self._print_progress_report(trial_number, landscape, confidence) # Check stopping criteria if trial_number >= self.min_trials: self._evaluate_stopping_criteria(landscape, trial_number, confidence) def should_stop(self, study: optuna.Study) -> bool: """ Check if characterization should stop. Args: study: Optuna study Returns: True if should stop characterization """ completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] n_trials = len(completed_trials) # Force stop at max trials if n_trials >= self.max_trials: self.should_stop_flag = True self.stop_reason = f"Maximum characterization trials reached ({self.max_trials})" return True return self.should_stop_flag def _calculate_confidence(self, landscape: Dict, trial_number: int) -> float: """ Calculate confidence score for stopping decision. Confidence Components (weighted sum): 1. Metric Stability (40%): Are metrics converging? 2. Parameter Coverage (30%): Explored enough space? 3. Sample Adequacy (20%): Enough samples for complexity? 4. Landscape Clarity (10%): Clear classification? """ if trial_number < self.min_trials: return 0.0 # 1. Metric Stability Score stability_score = self._compute_metric_stability() # 2. Parameter Coverage Score coverage_score = self._compute_parameter_coverage(landscape) # 3. Sample Adequacy Score adequacy_score = self._compute_sample_adequacy(landscape, trial_number) # 4. Landscape Clarity Score clarity_score = self._compute_landscape_clarity(landscape) # Weighted confidence confidence = ( 0.40 * stability_score + 0.30 * coverage_score + 0.20 * adequacy_score + 0.10 * clarity_score ) return confidence def _compute_metric_stability(self) -> float: """ Compute how stable landscape metrics are. High stability = metrics have converged (good for stopping) Low stability = metrics still changing (need more trials) """ if len(self.metric_history) < 3: return 0.0 # Look at last 3 analyses recent_snapshots = self.metric_history[-3:] # Check smoothness stability smoothness_values = [s.smoothness for s in recent_snapshots] smoothness_std = np.std(smoothness_values) smoothness_stable = smoothness_std < 0.05 # Stable if std < 0.05 # Check noise stability noise_values = [s.noise_level for s in recent_snapshots] noise_std = np.std(noise_values) noise_stable = noise_std < 0.1 # Stable if std < 0.1 # Check landscape type consistency landscape_types = [s.landscape_type for s in recent_snapshots] type_consistent = len(set(landscape_types)) == 1 # All same type # Check n_modes stability n_modes = [s.n_modes for s in recent_snapshots] modes_consistent = len(set(n_modes)) <= 1 # Same or ±1 # Combine stability indicators stability_indicators = [ 1.0 if smoothness_stable else 0.0, 1.0 if noise_stable else 0.0, 1.0 if type_consistent else 0.0, 1.0 if modes_consistent else 0.0 ] stability_score = np.mean(stability_indicators) return stability_score def _compute_parameter_coverage(self, landscape: Dict) -> float: """ Compute how well parameter space has been explored. High coverage = explored wide range of each parameter """ param_ranges = landscape.get('parameter_ranges', {}) if not param_ranges: return 0.5 # Unknown coverage_scores = [] for param, ranges in param_ranges.items(): coverage = ranges['coverage'] # Already computed in landscape analyzer coverage_scores.append(coverage) avg_coverage = np.mean(coverage_scores) # Normalize: 50% coverage = 0.5 score, 100% coverage = 1.0 score coverage_score = min(1.0, avg_coverage / 0.5) return coverage_score def _compute_sample_adequacy(self, landscape: Dict, trial_number: int) -> float: """ Compute if we have enough samples for the detected complexity. Simple problems: 10 trials sufficient Complex problems: 20-30 trials needed """ dimensionality = landscape.get('dimensionality', 2) multimodal = landscape.get('multimodal', False) n_modes = landscape.get('n_modes', 1) # Calculate required samples based on complexity if multimodal and n_modes > 2: # Complex multimodal: need more samples required_samples = 10 + 5 * n_modes + 2 * dimensionality elif multimodal: # Simple multimodal: moderate samples required_samples = 15 + 2 * dimensionality else: # Unimodal: fewer samples needed required_samples = 10 + dimensionality # Cap at max_trials required_samples = min(required_samples, self.max_trials) # Score based on how many samples we have vs required adequacy_score = min(1.0, trial_number / required_samples) return adequacy_score def _compute_landscape_clarity(self, landscape: Dict) -> float: """ Compute how clearly we can classify the landscape. Clear classification = high confidence in landscape type """ smoothness = landscape.get('smoothness', 0.5) noise_level = landscape.get('noise_level', 0.5) # Clear cases: # - Very smooth (> 0.7) or very rugged (< 0.3) # - Low noise (< 0.3) or high noise (> 0.7) smoothness_clarity = max( abs(smoothness - 0.7), # Distance from smooth threshold abs(smoothness - 0.3) # Distance from rugged threshold ) noise_clarity = max( abs(noise_level - 0.3), # Distance from low noise threshold abs(noise_level - 0.7) # Distance from high noise threshold ) # Normalize to 0-1 clarity_score = min(1.0, (smoothness_clarity + noise_clarity) / 0.8) return clarity_score def _evaluate_stopping_criteria(self, landscape: Dict, trial_number: int, confidence: float): """ Evaluate if we should stop characterization. Stop if: 1. Confidence threshold met 2. OR maximum trials reached """ if confidence >= self.confidence_threshold: self.should_stop_flag = True self.stop_reason = f"Characterization confidence threshold met ({confidence:.1%})" self.final_confidence = confidence if self.verbose: print(f"\n{'='*70}") print(f" CHARACTERIZATION COMPLETE") print(f"{'='*70}") print(f" Trial #{trial_number}") print(f" Confidence: {confidence:.1%}") print(f" Landscape Type: {landscape['landscape_type'].upper()}") print(f" Ready for strategy selection") print(f"{'='*70}\n") def _print_progress_report(self, trial_number: int, landscape: Dict, confidence: float): """Print characterization progress report.""" print(f"\n{'='*70}") print(f" CHARACTERIZATION PROGRESS - Trial #{trial_number}") print(f"{'='*70}") print(f" Landscape Type: {landscape['landscape_type']}") print(f" Smoothness: {landscape['smoothness']:.2f}") print(f" Multimodal: {'YES' if landscape['multimodal'] else 'NO'} ({landscape['n_modes']} modes)") print(f" Noise: {landscape['noise_level']:.2f}") print(f" Characterization Confidence: {confidence:.1%}") if confidence >= self.confidence_threshold: print(f" Status: READY TO STOP (confidence >= {self.confidence_threshold:.0%})") else: remaining = self.confidence_threshold - confidence print(f" Status: CONTINUE (need +{remaining:.1%} confidence)") print(f"{'='*70}\n") def _save_progress(self): """Save characterization progress to JSON.""" if not self.tracking_dir: return progress_data = { 'min_trials': self.min_trials, 'max_trials': self.max_trials, 'confidence_threshold': self.confidence_threshold, 'metric_history': [ { 'trial_number': s.trial_number, 'smoothness': s.smoothness, 'multimodal': s.multimodal, 'n_modes': s.n_modes, 'noise_level': s.noise_level, 'landscape_type': s.landscape_type, 'confidence': s.overall_confidence } for s in self.metric_history ], 'should_stop': self.should_stop_flag, 'stop_reason': self.stop_reason, 'final_confidence': self.final_confidence, 'timestamp': datetime.now().isoformat() } try: with open(self.characterization_log, 'w') as f: json.dump(progress_data, f, indent=2) except Exception as e: if self.verbose: print(f" Warning: Failed to save characterization progress: {e}") def get_summary_report(self) -> str: """Generate summary report of characterization phase.""" if not self.metric_history: return "No characterization data available" final_snapshot = self.metric_history[-1] report = "\n" + "="*70 + "\n" report += " CHARACTERIZATION PHASE SUMMARY\n" report += "="*70 + "\n" report += f" Total Trials: {final_snapshot.trial_number}\n" report += f" Final Confidence: {final_snapshot.overall_confidence:.1%}\n" report += f" Stop Reason: {self.stop_reason}\n" report += f"\n FINAL LANDSCAPE CLASSIFICATION:\n" report += f" Type: {final_snapshot.landscape_type.upper()}\n" report += f" Smoothness: {final_snapshot.smoothness:.2f}\n" report += f" Multimodal: {'YES' if final_snapshot.multimodal else 'NO'} ({final_snapshot.n_modes} modes)\n" report += f" Noise Level: {final_snapshot.noise_level:.2f}\n" if len(self.metric_history) >= 2: report += f"\n METRIC CONVERGENCE:\n" # Show how metrics evolved first = self.metric_history[0] last = self.metric_history[-1] smoothness_change = abs(last.smoothness - first.smoothness) report += f" Smoothness stability: {smoothness_change:.3f} (lower = more stable)\n" type_changes = len(set(s.landscape_type for s in self.metric_history)) report += f" Landscape type changes: {type_changes - 1}\n" report += "="*70 + "\n" return report