""" Landscape Analyzer - Automatic optimization problem characterization. This module analyzes the characteristics of an optimization landscape to inform intelligent strategy selection. It computes metrics like smoothness, multimodality, parameter correlation, and noise level. Part of Protocol 10: Intelligent Multi-Strategy Optimization (IMSO) """ import numpy as np from typing import Dict, List, Optional from scipy.stats import spearmanr, variation from scipy.spatial.distance import pdist, squareform from sklearn.cluster import DBSCAN import optuna class LandscapeAnalyzer: """Analyzes optimization landscape characteristics from trial history.""" def __init__(self, min_trials_for_analysis: int = 10, verbose: bool = True): """ Args: min_trials_for_analysis: Minimum trials needed for reliable analysis verbose: Whether to print diagnostic messages """ self.min_trials = min_trials_for_analysis self.verbose = verbose def analyze(self, study: optuna.Study) -> Dict: """ Analyze optimization landscape characteristics. STUDY-AWARE: Uses study.trials directly for analysis. Args: study: Optuna study with completed trials Returns: Dictionary with landscape characteristics: - smoothness: 0-1, how smooth the objective landscape is - multimodal: boolean, multiple local optima detected - n_modes: estimated number of local optima - parameter_correlation: dict of correlation scores - noise_level: estimated noise in evaluations - dimensionality: number of design variables - landscape_type: classification (smooth/rugged/multimodal) """ # Get completed trials completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] if len(completed_trials) < self.min_trials: return { 'ready': False, 'total_trials': len(completed_trials), 'message': f'Need {self.min_trials - len(completed_trials)} more trials for landscape analysis' } # Check if this is a multi-objective study # Multi-objective studies have trial.values (plural), not trial.value is_multi_objective = len(study.directions) > 1 if is_multi_objective: return { 'ready': False, 'total_trials': len(completed_trials), 'message': 'Landscape analysis not supported for multi-objective optimization' } # Extract data X = [] # Parameter values y = [] # Objective values param_names = [] for trial in completed_trials: X.append(list(trial.params.values())) y.append(trial.value) if not param_names: param_names = list(trial.params.keys()) X = np.array(X) y = np.array(y) # Compute characteristics smoothness = self._compute_smoothness(X, y) multimodal, n_modes = self._detect_multimodality(X, y) correlation_scores = self._compute_parameter_correlation(X, y, param_names) noise_level = self._estimate_noise(X, y) landscape_type = self._classify_landscape(smoothness, multimodal, noise_level, n_modes) # Compute parameter ranges for coverage metrics param_ranges = self._compute_parameter_ranges(completed_trials) return { 'ready': True, 'total_trials': len(completed_trials), 'dimensionality': X.shape[1], 'parameter_names': param_names, 'smoothness': smoothness, 'multimodal': multimodal, 'n_modes': n_modes, 'parameter_correlation': correlation_scores, 'noise_level': noise_level, 'landscape_type': landscape_type, 'parameter_ranges': param_ranges, 'objective_statistics': { 'mean': float(np.mean(y)), 'std': float(np.std(y)), 'min': float(np.min(y)), 'max': float(np.max(y)), 'range': float(np.max(y) - np.min(y)) } } def _compute_smoothness(self, X: np.ndarray, y: np.ndarray) -> float: """ Compute landscape smoothness score. High smoothness = nearby points have similar objective values Low smoothness = nearby points have very different values (rugged) Method: Compare objective differences vs parameter distances """ if len(y) < 3: return 0.5 # Unknown # Normalize parameters to [0, 1] for fair distance computation X_norm = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-10) # Compute pairwise distances in parameter space param_distances = pdist(X_norm, metric='euclidean') # Compute pairwise differences in objective space objective_diffs = pdist(y.reshape(-1, 1), metric='euclidean') # Smoothness = correlation between distance and objective difference # Smooth landscape: nearby points → similar objectives (high correlation) # Rugged landscape: nearby points → very different objectives (low correlation) if len(param_distances) > 0 and len(objective_diffs) > 0: # Filter out zero distances to avoid division issues mask = param_distances > 1e-6 if np.sum(mask) > 5: param_distances = param_distances[mask] objective_diffs = objective_diffs[mask] # Compute correlation correlation, _ = spearmanr(param_distances, objective_diffs) # Convert to smoothness score: high correlation = smooth # Handle NaN from constant arrays if np.isnan(correlation): smoothness = 0.5 else: smoothness = max(0.0, min(1.0, (correlation + 1.0) / 2.0)) else: smoothness = 0.5 else: smoothness = 0.5 return smoothness def _detect_multimodality(self, X: np.ndarray, y: np.ndarray) -> tuple: """ Detect multiple local optima using clustering. Returns: (is_multimodal, n_modes) """ if len(y) < 10: return False, 1 # Find good trials (bottom 30%) threshold = np.percentile(y, 30) good_trials_mask = y <= threshold if np.sum(good_trials_mask) < 3: return False, 1 X_good = X[good_trials_mask] # Normalize for clustering X_norm = (X_good - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-10) # Use DBSCAN to find clusters of good solutions # If they're spread across multiple regions → multimodal try: clustering = DBSCAN(eps=0.2, min_samples=2).fit(X_norm) n_clusters = len(set(clustering.labels_)) - (1 if -1 in clustering.labels_ else 0) is_multimodal = n_clusters > 1 n_modes = max(1, n_clusters) except: is_multimodal = False n_modes = 1 return is_multimodal, n_modes def _compute_parameter_correlation(self, X: np.ndarray, y: np.ndarray, param_names: List[str]) -> Dict: """ Compute correlation between each parameter and objective. Returns dict of {param_name: correlation_score} High absolute correlation → parameter strongly affects objective """ correlations = {} for i, param_name in enumerate(param_names): param_values = X[:, i] # Spearman correlation (handles nonlinearity) corr, p_value = spearmanr(param_values, y) if np.isnan(corr): corr = 0.0 correlations[param_name] = { 'correlation': float(corr), 'abs_correlation': float(abs(corr)), 'p_value': float(p_value) if not np.isnan(p_value) else 1.0 } # Compute overall correlation strength avg_abs_corr = np.mean([v['abs_correlation'] for v in correlations.values()]) correlations['overall_strength'] = float(avg_abs_corr) return correlations def _estimate_noise(self, X: np.ndarray, y: np.ndarray) -> float: """ Estimate noise level in objective evaluations. For deterministic FEA simulations, this should be very low. High noise would suggest numerical issues or simulation instability. Method: Look at local variations - similar inputs should give similar outputs. Wide exploration range (high CV) is NOT noise. """ if len(y) < 10: return 0.0 # Calculate pairwise distances in parameter space from scipy.spatial.distance import pdist, squareform # Normalize X to [0,1] for distance calculation X_norm = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-10) # Compute pairwise distances param_distances = squareform(pdist(X_norm, 'euclidean')) objective_diffs = np.abs(y[:, np.newaxis] - y[np.newaxis, :]) # Find pairs that are close in parameter space (distance < 0.1) close_pairs_mask = (param_distances > 1e-6) & (param_distances < 0.1) if np.sum(close_pairs_mask) < 5: # Not enough close pairs to assess noise return 0.0 # For close pairs, measure objective variation # True noise: close inputs give very different outputs # Smooth function: close inputs give similar outputs close_objective_diffs = objective_diffs[close_pairs_mask] close_param_dists = param_distances[close_pairs_mask] # Normalize by expected difference based on smoothness # Noise = unexpected variation for nearby points expected_diff = np.median(close_objective_diffs / (close_param_dists + 1e-10)) actual_std = np.std(close_objective_diffs / (close_param_dists + 1e-10)) # Coefficient of variation of local gradients if expected_diff > 1e-6: local_cv = actual_std / expected_diff noise_score = min(1.0, local_cv / 2.0) else: noise_score = 0.0 return float(noise_score) def _classify_landscape(self, smoothness: float, multimodal: bool, noise: float, n_modes: int = 1) -> str: """ Classify landscape type for strategy selection. Args: smoothness: Smoothness score (0-1) multimodal: Whether multiple modes detected noise: Noise level (0-1) n_modes: Number of modes detected Returns one of: - 'smooth_unimodal': Single smooth bowl (best for CMA-ES, GP-BO) - 'smooth_multimodal': Multiple smooth regions (good for GP-BO, TPE) - 'rugged_unimodal': Single rugged region (TPE, hybrid) - 'rugged_multimodal': Multiple rugged regions (TPE, evolutionary) - 'noisy': High noise level (robust methods) """ # IMPROVEMENT: Detect false multimodality from smooth continuous manifolds # If only 2 modes detected with high smoothness and low noise, # it's likely a continuous smooth surface, not true multimodality if multimodal and n_modes == 2 and smoothness > 0.6 and noise < 0.2: if self.verbose: print(f"[LANDSCAPE] Reclassifying: 2 modes with smoothness={smoothness:.2f}, noise={noise:.2f}") print(f"[LANDSCAPE] This appears to be a smooth continuous manifold, not true multimodality") multimodal = False # Override: treat as unimodal if noise > 0.5: return 'noisy' if smoothness > 0.6: if multimodal: return 'smooth_multimodal' else: return 'smooth_unimodal' else: if multimodal: return 'rugged_multimodal' else: return 'rugged_unimodal' def _compute_parameter_ranges(self, trials: List) -> Dict: """Compute explored parameter ranges.""" if not trials: return {} param_names = list(trials[0].params.keys()) ranges = {} for param in param_names: values = [t.params[param] for t in trials] distribution = trials[0].distributions[param] ranges[param] = { 'explored_min': float(np.min(values)), 'explored_max': float(np.max(values)), 'explored_range': float(np.max(values) - np.min(values)), 'bounds_min': float(distribution.low), 'bounds_max': float(distribution.high), 'bounds_range': float(distribution.high - distribution.low), 'coverage': float((np.max(values) - np.min(values)) / (distribution.high - distribution.low)) } return ranges def print_landscape_report(landscape: Dict, verbose: bool = True): """Print formatted landscape analysis report.""" if not verbose: return # Handle None (multi-objective studies) if landscape is None: print(f"\n [LANDSCAPE ANALYSIS] Skipped for multi-objective optimization") return if not landscape.get('ready', False): print(f"\n [LANDSCAPE ANALYSIS] {landscape.get('message', 'Not ready')}") return print(f"\n{'='*70}") print(f" LANDSCAPE ANALYSIS REPORT") print(f"{'='*70}") print(f" Total Trials Analyzed: {landscape['total_trials']}") print(f" Dimensionality: {landscape['dimensionality']} parameters") print(f"\n LANDSCAPE CHARACTERISTICS:") print(f" Type: {landscape['landscape_type'].upper()}") print(f" Smoothness: {landscape['smoothness']:.2f} {'(smooth)' if landscape['smoothness'] > 0.6 else '(rugged)'}") print(f" Multimodal: {'YES' if landscape['multimodal'] else 'NO'} ({landscape['n_modes']} modes)") print(f" Noise Level: {landscape['noise_level']:.2f} {'(low)' if landscape['noise_level'] < 0.3 else '(high)'}") print(f"\n PARAMETER CORRELATIONS:") for param, info in landscape['parameter_correlation'].items(): if param != 'overall_strength': corr = info['correlation'] strength = 'strong' if abs(corr) > 0.5 else 'moderate' if abs(corr) > 0.3 else 'weak' direction = 'positive' if corr > 0 else 'negative' print(f" {param}: {corr:+.3f} ({strength} {direction})") print(f"\n OBJECTIVE STATISTICS:") stats = landscape['objective_statistics'] print(f" Best: {stats['min']:.6f}") print(f" Mean: {stats['mean']:.6f}") print(f" Std: {stats['std']:.6f}") print(f" Range: {stats['range']:.6f}") print(f"{'='*70}\n")