- Add validation framework (config, model, results, study validators) - Add Claude Code skills (create-study, run-optimization, generate-report, troubleshoot, analyze-model) - Add Atomizer Dashboard (React frontend + FastAPI backend) - Reorganize docs into structured directories (00-09) - Add neural surrogate modules and training infrastructure - Add multi-objective optimization support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
387 lines
15 KiB
Python
387 lines
15 KiB
Python
"""
|
|
Landscape Analyzer - Automatic optimization problem characterization.
|
|
|
|
This module analyzes the characteristics of an optimization landscape to inform
|
|
intelligent strategy selection. It computes metrics like smoothness, multimodality,
|
|
parameter correlation, and noise level.
|
|
|
|
Part of Protocol 10: Intelligent Multi-Strategy Optimization (IMSO)
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Dict, List, Optional
|
|
from scipy.stats import spearmanr, variation
|
|
from scipy.spatial.distance import pdist, squareform
|
|
from sklearn.cluster import DBSCAN
|
|
import optuna
|
|
|
|
|
|
class LandscapeAnalyzer:
|
|
"""Analyzes optimization landscape characteristics from trial history."""
|
|
|
|
def __init__(self, min_trials_for_analysis: int = 10, verbose: bool = True):
|
|
"""
|
|
Args:
|
|
min_trials_for_analysis: Minimum trials needed for reliable analysis
|
|
verbose: Whether to print diagnostic messages
|
|
"""
|
|
self.min_trials = min_trials_for_analysis
|
|
self.verbose = verbose
|
|
|
|
def analyze(self, study: optuna.Study) -> Dict:
|
|
"""
|
|
Analyze optimization landscape characteristics.
|
|
|
|
STUDY-AWARE: Uses study.trials directly for analysis.
|
|
|
|
Args:
|
|
study: Optuna study with completed trials
|
|
|
|
Returns:
|
|
Dictionary with landscape characteristics:
|
|
- smoothness: 0-1, how smooth the objective landscape is
|
|
- multimodal: boolean, multiple local optima detected
|
|
- n_modes: estimated number of local optima
|
|
- parameter_correlation: dict of correlation scores
|
|
- noise_level: estimated noise in evaluations
|
|
- dimensionality: number of design variables
|
|
- landscape_type: classification (smooth/rugged/multimodal)
|
|
"""
|
|
# Get completed trials
|
|
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
|
|
|
if len(completed_trials) < self.min_trials:
|
|
return {
|
|
'ready': False,
|
|
'total_trials': len(completed_trials),
|
|
'message': f'Need {self.min_trials - len(completed_trials)} more trials for landscape analysis'
|
|
}
|
|
|
|
# Check if this is a multi-objective study
|
|
# Multi-objective studies have trial.values (plural), not trial.value
|
|
is_multi_objective = len(study.directions) > 1
|
|
|
|
if is_multi_objective:
|
|
return {
|
|
'ready': False,
|
|
'total_trials': len(completed_trials),
|
|
'message': 'Landscape analysis not supported for multi-objective optimization'
|
|
}
|
|
|
|
# Extract data
|
|
X = [] # Parameter values
|
|
y = [] # Objective values
|
|
param_names = []
|
|
|
|
for trial in completed_trials:
|
|
X.append(list(trial.params.values()))
|
|
y.append(trial.value)
|
|
if not param_names:
|
|
param_names = list(trial.params.keys())
|
|
|
|
X = np.array(X)
|
|
y = np.array(y)
|
|
|
|
# Compute characteristics
|
|
smoothness = self._compute_smoothness(X, y)
|
|
multimodal, n_modes = self._detect_multimodality(X, y)
|
|
correlation_scores = self._compute_parameter_correlation(X, y, param_names)
|
|
noise_level = self._estimate_noise(X, y)
|
|
landscape_type = self._classify_landscape(smoothness, multimodal, noise_level, n_modes)
|
|
|
|
# Compute parameter ranges for coverage metrics
|
|
param_ranges = self._compute_parameter_ranges(completed_trials)
|
|
|
|
return {
|
|
'ready': True,
|
|
'total_trials': len(completed_trials),
|
|
'dimensionality': X.shape[1],
|
|
'parameter_names': param_names,
|
|
'smoothness': smoothness,
|
|
'multimodal': multimodal,
|
|
'n_modes': n_modes,
|
|
'parameter_correlation': correlation_scores,
|
|
'noise_level': noise_level,
|
|
'landscape_type': landscape_type,
|
|
'parameter_ranges': param_ranges,
|
|
'objective_statistics': {
|
|
'mean': float(np.mean(y)),
|
|
'std': float(np.std(y)),
|
|
'min': float(np.min(y)),
|
|
'max': float(np.max(y)),
|
|
'range': float(np.max(y) - np.min(y))
|
|
}
|
|
}
|
|
|
|
def _compute_smoothness(self, X: np.ndarray, y: np.ndarray) -> float:
|
|
"""
|
|
Compute landscape smoothness score.
|
|
|
|
High smoothness = nearby points have similar objective values
|
|
Low smoothness = nearby points have very different values (rugged)
|
|
|
|
Method: Compare objective differences vs parameter distances
|
|
"""
|
|
if len(y) < 3:
|
|
return 0.5 # Unknown
|
|
|
|
# Normalize parameters to [0, 1] for fair distance computation
|
|
X_norm = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-10)
|
|
|
|
# Compute pairwise distances in parameter space
|
|
param_distances = pdist(X_norm, metric='euclidean')
|
|
|
|
# Compute pairwise differences in objective space
|
|
objective_diffs = pdist(y.reshape(-1, 1), metric='euclidean')
|
|
|
|
# Smoothness = correlation between distance and objective difference
|
|
# Smooth landscape: nearby points → similar objectives (high correlation)
|
|
# Rugged landscape: nearby points → very different objectives (low correlation)
|
|
|
|
if len(param_distances) > 0 and len(objective_diffs) > 0:
|
|
# Filter out zero distances to avoid division issues
|
|
mask = param_distances > 1e-6
|
|
if np.sum(mask) > 5:
|
|
param_distances = param_distances[mask]
|
|
objective_diffs = objective_diffs[mask]
|
|
|
|
# Compute correlation
|
|
correlation, _ = spearmanr(param_distances, objective_diffs)
|
|
|
|
# Convert to smoothness score: high correlation = smooth
|
|
# Handle NaN from constant arrays
|
|
if np.isnan(correlation):
|
|
smoothness = 0.5
|
|
else:
|
|
smoothness = max(0.0, min(1.0, (correlation + 1.0) / 2.0))
|
|
else:
|
|
smoothness = 0.5
|
|
else:
|
|
smoothness = 0.5
|
|
|
|
return smoothness
|
|
|
|
def _detect_multimodality(self, X: np.ndarray, y: np.ndarray) -> tuple:
|
|
"""
|
|
Detect multiple local optima using clustering.
|
|
|
|
Returns:
|
|
(is_multimodal, n_modes)
|
|
"""
|
|
if len(y) < 10:
|
|
return False, 1
|
|
|
|
# Find good trials (bottom 30%)
|
|
threshold = np.percentile(y, 30)
|
|
good_trials_mask = y <= threshold
|
|
|
|
if np.sum(good_trials_mask) < 3:
|
|
return False, 1
|
|
|
|
X_good = X[good_trials_mask]
|
|
|
|
# Normalize for clustering
|
|
X_norm = (X_good - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-10)
|
|
|
|
# Use DBSCAN to find clusters of good solutions
|
|
# If they're spread across multiple regions → multimodal
|
|
try:
|
|
clustering = DBSCAN(eps=0.2, min_samples=2).fit(X_norm)
|
|
n_clusters = len(set(clustering.labels_)) - (1 if -1 in clustering.labels_ else 0)
|
|
|
|
is_multimodal = n_clusters > 1
|
|
n_modes = max(1, n_clusters)
|
|
except:
|
|
is_multimodal = False
|
|
n_modes = 1
|
|
|
|
return is_multimodal, n_modes
|
|
|
|
def _compute_parameter_correlation(self, X: np.ndarray, y: np.ndarray, param_names: List[str]) -> Dict:
|
|
"""
|
|
Compute correlation between each parameter and objective.
|
|
|
|
Returns dict of {param_name: correlation_score}
|
|
High absolute correlation → parameter strongly affects objective
|
|
"""
|
|
correlations = {}
|
|
|
|
for i, param_name in enumerate(param_names):
|
|
param_values = X[:, i]
|
|
|
|
# Spearman correlation (handles nonlinearity)
|
|
corr, p_value = spearmanr(param_values, y)
|
|
|
|
if np.isnan(corr):
|
|
corr = 0.0
|
|
|
|
correlations[param_name] = {
|
|
'correlation': float(corr),
|
|
'abs_correlation': float(abs(corr)),
|
|
'p_value': float(p_value) if not np.isnan(p_value) else 1.0
|
|
}
|
|
|
|
# Compute overall correlation strength
|
|
avg_abs_corr = np.mean([v['abs_correlation'] for v in correlations.values()])
|
|
|
|
correlations['overall_strength'] = float(avg_abs_corr)
|
|
|
|
return correlations
|
|
|
|
def _estimate_noise(self, X: np.ndarray, y: np.ndarray) -> float:
|
|
"""
|
|
Estimate noise level in objective evaluations.
|
|
|
|
For deterministic FEA simulations, this should be very low.
|
|
High noise would suggest numerical issues or simulation instability.
|
|
|
|
Method: Look at local variations - similar inputs should give similar outputs.
|
|
Wide exploration range (high CV) is NOT noise.
|
|
"""
|
|
if len(y) < 10:
|
|
return 0.0
|
|
|
|
# Calculate pairwise distances in parameter space
|
|
from scipy.spatial.distance import pdist, squareform
|
|
|
|
# Normalize X to [0,1] for distance calculation
|
|
X_norm = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0) + 1e-10)
|
|
|
|
# Compute pairwise distances
|
|
param_distances = squareform(pdist(X_norm, 'euclidean'))
|
|
objective_diffs = np.abs(y[:, np.newaxis] - y[np.newaxis, :])
|
|
|
|
# Find pairs that are close in parameter space (distance < 0.1)
|
|
close_pairs_mask = (param_distances > 1e-6) & (param_distances < 0.1)
|
|
|
|
if np.sum(close_pairs_mask) < 5:
|
|
# Not enough close pairs to assess noise
|
|
return 0.0
|
|
|
|
# For close pairs, measure objective variation
|
|
# True noise: close inputs give very different outputs
|
|
# Smooth function: close inputs give similar outputs
|
|
close_objective_diffs = objective_diffs[close_pairs_mask]
|
|
close_param_dists = param_distances[close_pairs_mask]
|
|
|
|
# Normalize by expected difference based on smoothness
|
|
# Noise = unexpected variation for nearby points
|
|
expected_diff = np.median(close_objective_diffs / (close_param_dists + 1e-10))
|
|
actual_std = np.std(close_objective_diffs / (close_param_dists + 1e-10))
|
|
|
|
# Coefficient of variation of local gradients
|
|
if expected_diff > 1e-6:
|
|
local_cv = actual_std / expected_diff
|
|
noise_score = min(1.0, local_cv / 2.0)
|
|
else:
|
|
noise_score = 0.0
|
|
|
|
return float(noise_score)
|
|
|
|
def _classify_landscape(self, smoothness: float, multimodal: bool, noise: float, n_modes: int = 1) -> str:
|
|
"""
|
|
Classify landscape type for strategy selection.
|
|
|
|
Args:
|
|
smoothness: Smoothness score (0-1)
|
|
multimodal: Whether multiple modes detected
|
|
noise: Noise level (0-1)
|
|
n_modes: Number of modes detected
|
|
|
|
Returns one of:
|
|
- 'smooth_unimodal': Single smooth bowl (best for CMA-ES, GP-BO)
|
|
- 'smooth_multimodal': Multiple smooth regions (good for GP-BO, TPE)
|
|
- 'rugged_unimodal': Single rugged region (TPE, hybrid)
|
|
- 'rugged_multimodal': Multiple rugged regions (TPE, evolutionary)
|
|
- 'noisy': High noise level (robust methods)
|
|
"""
|
|
# IMPROVEMENT: Detect false multimodality from smooth continuous manifolds
|
|
# If only 2 modes detected with high smoothness and low noise,
|
|
# it's likely a continuous smooth surface, not true multimodality
|
|
if multimodal and n_modes == 2 and smoothness > 0.6 and noise < 0.2:
|
|
if self.verbose:
|
|
print(f"[LANDSCAPE] Reclassifying: 2 modes with smoothness={smoothness:.2f}, noise={noise:.2f}")
|
|
print(f"[LANDSCAPE] This appears to be a smooth continuous manifold, not true multimodality")
|
|
multimodal = False # Override: treat as unimodal
|
|
|
|
if noise > 0.5:
|
|
return 'noisy'
|
|
|
|
if smoothness > 0.6:
|
|
if multimodal:
|
|
return 'smooth_multimodal'
|
|
else:
|
|
return 'smooth_unimodal'
|
|
else:
|
|
if multimodal:
|
|
return 'rugged_multimodal'
|
|
else:
|
|
return 'rugged_unimodal'
|
|
|
|
def _compute_parameter_ranges(self, trials: List) -> Dict:
|
|
"""Compute explored parameter ranges."""
|
|
if not trials:
|
|
return {}
|
|
|
|
param_names = list(trials[0].params.keys())
|
|
ranges = {}
|
|
|
|
for param in param_names:
|
|
values = [t.params[param] for t in trials]
|
|
distribution = trials[0].distributions[param]
|
|
|
|
ranges[param] = {
|
|
'explored_min': float(np.min(values)),
|
|
'explored_max': float(np.max(values)),
|
|
'explored_range': float(np.max(values) - np.min(values)),
|
|
'bounds_min': float(distribution.low),
|
|
'bounds_max': float(distribution.high),
|
|
'bounds_range': float(distribution.high - distribution.low),
|
|
'coverage': float((np.max(values) - np.min(values)) / (distribution.high - distribution.low))
|
|
}
|
|
|
|
return ranges
|
|
|
|
|
|
def print_landscape_report(landscape: Dict, verbose: bool = True):
|
|
"""Print formatted landscape analysis report."""
|
|
if not verbose:
|
|
return
|
|
|
|
# Handle None (multi-objective studies)
|
|
if landscape is None:
|
|
print(f"\n [LANDSCAPE ANALYSIS] Skipped for multi-objective optimization")
|
|
return
|
|
|
|
if not landscape.get('ready', False):
|
|
print(f"\n [LANDSCAPE ANALYSIS] {landscape.get('message', 'Not ready')}")
|
|
return
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f" LANDSCAPE ANALYSIS REPORT")
|
|
print(f"{'='*70}")
|
|
print(f" Total Trials Analyzed: {landscape['total_trials']}")
|
|
print(f" Dimensionality: {landscape['dimensionality']} parameters")
|
|
print(f"\n LANDSCAPE CHARACTERISTICS:")
|
|
print(f" Type: {landscape['landscape_type'].upper()}")
|
|
print(f" Smoothness: {landscape['smoothness']:.2f} {'(smooth)' if landscape['smoothness'] > 0.6 else '(rugged)'}")
|
|
print(f" Multimodal: {'YES' if landscape['multimodal'] else 'NO'} ({landscape['n_modes']} modes)")
|
|
print(f" Noise Level: {landscape['noise_level']:.2f} {'(low)' if landscape['noise_level'] < 0.3 else '(high)'}")
|
|
|
|
print(f"\n PARAMETER CORRELATIONS:")
|
|
for param, info in landscape['parameter_correlation'].items():
|
|
if param != 'overall_strength':
|
|
corr = info['correlation']
|
|
strength = 'strong' if abs(corr) > 0.5 else 'moderate' if abs(corr) > 0.3 else 'weak'
|
|
direction = 'positive' if corr > 0 else 'negative'
|
|
print(f" {param}: {corr:+.3f} ({strength} {direction})")
|
|
|
|
print(f"\n OBJECTIVE STATISTICS:")
|
|
stats = landscape['objective_statistics']
|
|
print(f" Best: {stats['min']:.6f}")
|
|
print(f" Mean: {stats['mean']:.6f}")
|
|
print(f" Std: {stats['std']:.6f}")
|
|
print(f" Range: {stats['range']:.6f}")
|
|
|
|
print(f"{'='*70}\n")
|