Files
Atomizer/optimization_engine/processors/adaptive_characterization.py

416 lines
15 KiB
Python
Raw Permalink Normal View History

"""
Adaptive Characterization Module - Intelligent stopping for landscape characterization.
This module implements adaptive stopping criteria for the characterization phase
that intelligently determines when enough landscape exploration has been done.
Simple problems (smooth, unimodal) -> stop early (~10-15 trials)
Complex problems (multimodal, rugged) -> continue longer (~20-30 trials)
Part of Protocol 10: Intelligent Multi-Strategy Optimization (IMSO)
"""
import numpy as np
import optuna
from typing import Dict, List, Optional
from dataclasses import dataclass
import json
from pathlib import Path
from datetime import datetime
@dataclass
class LandscapeMetricSnapshot:
"""Snapshot of landscape metrics at a given trial."""
trial_number: int
smoothness: float
multimodal: bool
n_modes: int
noise_level: float
landscape_type: str
overall_confidence: float
class CharacterizationStoppingCriterion:
"""
Intelligently determines when characterization phase has gathered enough information.
Key Features:
1. Progressive landscape analysis (every 5 trials starting at trial 10)
2. Metric convergence detection (are metrics stabilizing?)
3. Complexity-aware sample adequacy (complex problems need more trials)
4. Parameter space coverage assessment
5. Confidence scoring (combines all factors)
Stopping Decision:
- Simple problems: Stop at ~10-15 trials when metrics converge
- Complex problems: Continue to ~20-30 trials for adequate coverage
"""
def __init__(
self,
min_trials: int = 10,
max_trials: int = 30,
confidence_threshold: float = 0.85,
check_interval: int = 5,
verbose: bool = True,
tracking_dir: Optional[Path] = None
):
"""
Args:
min_trials: Minimum trials before considering stopping
max_trials: Maximum trials (stop even if not converged)
confidence_threshold: Confidence needed to stop (0-1)
check_interval: How often to check stopping criteria
verbose: Print progress reports
tracking_dir: Directory to save characterization tracking
"""
self.min_trials = min_trials
self.max_trials = max_trials
self.confidence_threshold = confidence_threshold
self.check_interval = check_interval
self.verbose = verbose
self.tracking_dir = tracking_dir
# Track metric history across analyses
self.metric_history: List[LandscapeMetricSnapshot] = []
self.should_stop_flag = False
self.stop_reason = ""
self.final_confidence = 0.0
# Initialize tracking
if tracking_dir:
self.tracking_dir = Path(tracking_dir)
self.tracking_dir.mkdir(parents=True, exist_ok=True)
self.characterization_log = self.tracking_dir / "characterization_progress.json"
def update(self, landscape: Dict, trial_number: int):
"""
Update with latest landscape analysis.
Args:
landscape: Landscape analysis dictionary
trial_number: Current trial number
"""
if not landscape.get('ready', False):
return
# Create snapshot
snapshot = LandscapeMetricSnapshot(
trial_number=trial_number,
smoothness=landscape['smoothness'],
multimodal=landscape['multimodal'],
n_modes=landscape['n_modes'],
noise_level=landscape['noise_level'],
landscape_type=landscape['landscape_type'],
overall_confidence=0.0 # Will be calculated
)
self.metric_history.append(snapshot)
# Calculate confidence
confidence = self._calculate_confidence(landscape, trial_number)
snapshot.overall_confidence = confidence
# Save progress
self._save_progress()
# Print report
if self.verbose:
self._print_progress_report(trial_number, landscape, confidence)
# Check stopping criteria
if trial_number >= self.min_trials:
self._evaluate_stopping_criteria(landscape, trial_number, confidence)
def should_stop(self, study: optuna.Study) -> bool:
"""
Check if characterization should stop.
Args:
study: Optuna study
Returns:
True if should stop characterization
"""
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
n_trials = len(completed_trials)
# Force stop at max trials
if n_trials >= self.max_trials:
self.should_stop_flag = True
self.stop_reason = f"Maximum characterization trials reached ({self.max_trials})"
return True
return self.should_stop_flag
def _calculate_confidence(self, landscape: Dict, trial_number: int) -> float:
"""
Calculate confidence score for stopping decision.
Confidence Components (weighted sum):
1. Metric Stability (40%): Are metrics converging?
2. Parameter Coverage (30%): Explored enough space?
3. Sample Adequacy (20%): Enough samples for complexity?
4. Landscape Clarity (10%): Clear classification?
"""
if trial_number < self.min_trials:
return 0.0
# 1. Metric Stability Score
stability_score = self._compute_metric_stability()
# 2. Parameter Coverage Score
coverage_score = self._compute_parameter_coverage(landscape)
# 3. Sample Adequacy Score
adequacy_score = self._compute_sample_adequacy(landscape, trial_number)
# 4. Landscape Clarity Score
clarity_score = self._compute_landscape_clarity(landscape)
# Weighted confidence
confidence = (
0.40 * stability_score +
0.30 * coverage_score +
0.20 * adequacy_score +
0.10 * clarity_score
)
return confidence
def _compute_metric_stability(self) -> float:
"""
Compute how stable landscape metrics are.
High stability = metrics have converged (good for stopping)
Low stability = metrics still changing (need more trials)
"""
if len(self.metric_history) < 3:
return 0.0
# Look at last 3 analyses
recent_snapshots = self.metric_history[-3:]
# Check smoothness stability
smoothness_values = [s.smoothness for s in recent_snapshots]
smoothness_std = np.std(smoothness_values)
smoothness_stable = smoothness_std < 0.05 # Stable if std < 0.05
# Check noise stability
noise_values = [s.noise_level for s in recent_snapshots]
noise_std = np.std(noise_values)
noise_stable = noise_std < 0.1 # Stable if std < 0.1
# Check landscape type consistency
landscape_types = [s.landscape_type for s in recent_snapshots]
type_consistent = len(set(landscape_types)) == 1 # All same type
# Check n_modes stability
n_modes = [s.n_modes for s in recent_snapshots]
modes_consistent = len(set(n_modes)) <= 1 # Same or ±1
# Combine stability indicators
stability_indicators = [
1.0 if smoothness_stable else 0.0,
1.0 if noise_stable else 0.0,
1.0 if type_consistent else 0.0,
1.0 if modes_consistent else 0.0
]
stability_score = np.mean(stability_indicators)
return stability_score
def _compute_parameter_coverage(self, landscape: Dict) -> float:
"""
Compute how well parameter space has been explored.
High coverage = explored wide range of each parameter
"""
param_ranges = landscape.get('parameter_ranges', {})
if not param_ranges:
return 0.5 # Unknown
coverage_scores = []
for param, ranges in param_ranges.items():
coverage = ranges['coverage'] # Already computed in landscape analyzer
coverage_scores.append(coverage)
avg_coverage = np.mean(coverage_scores)
# Normalize: 50% coverage = 0.5 score, 100% coverage = 1.0 score
coverage_score = min(1.0, avg_coverage / 0.5)
return coverage_score
def _compute_sample_adequacy(self, landscape: Dict, trial_number: int) -> float:
"""
Compute if we have enough samples for the detected complexity.
Simple problems: 10 trials sufficient
Complex problems: 20-30 trials needed
"""
dimensionality = landscape.get('dimensionality', 2)
multimodal = landscape.get('multimodal', False)
n_modes = landscape.get('n_modes', 1)
# Calculate required samples based on complexity
if multimodal and n_modes > 2:
# Complex multimodal: need more samples
required_samples = 10 + 5 * n_modes + 2 * dimensionality
elif multimodal:
# Simple multimodal: moderate samples
required_samples = 15 + 2 * dimensionality
else:
# Unimodal: fewer samples needed
required_samples = 10 + dimensionality
# Cap at max_trials
required_samples = min(required_samples, self.max_trials)
# Score based on how many samples we have vs required
adequacy_score = min(1.0, trial_number / required_samples)
return adequacy_score
def _compute_landscape_clarity(self, landscape: Dict) -> float:
"""
Compute how clearly we can classify the landscape.
Clear classification = high confidence in landscape type
"""
smoothness = landscape.get('smoothness', 0.5)
noise_level = landscape.get('noise_level', 0.5)
# Clear cases:
# - Very smooth (> 0.7) or very rugged (< 0.3)
# - Low noise (< 0.3) or high noise (> 0.7)
smoothness_clarity = max(
abs(smoothness - 0.7), # Distance from smooth threshold
abs(smoothness - 0.3) # Distance from rugged threshold
)
noise_clarity = max(
abs(noise_level - 0.3), # Distance from low noise threshold
abs(noise_level - 0.7) # Distance from high noise threshold
)
# Normalize to 0-1
clarity_score = min(1.0, (smoothness_clarity + noise_clarity) / 0.8)
return clarity_score
def _evaluate_stopping_criteria(self, landscape: Dict, trial_number: int, confidence: float):
"""
Evaluate if we should stop characterization.
Stop if:
1. Confidence threshold met
2. OR maximum trials reached
"""
if confidence >= self.confidence_threshold:
self.should_stop_flag = True
self.stop_reason = f"Characterization confidence threshold met ({confidence:.1%})"
self.final_confidence = confidence
if self.verbose:
print(f"\n{'='*70}")
print(f" CHARACTERIZATION COMPLETE")
print(f"{'='*70}")
print(f" Trial #{trial_number}")
print(f" Confidence: {confidence:.1%}")
print(f" Landscape Type: {landscape['landscape_type'].upper()}")
print(f" Ready for strategy selection")
print(f"{'='*70}\n")
def _print_progress_report(self, trial_number: int, landscape: Dict, confidence: float):
"""Print characterization progress report."""
print(f"\n{'='*70}")
print(f" CHARACTERIZATION PROGRESS - Trial #{trial_number}")
print(f"{'='*70}")
print(f" Landscape Type: {landscape['landscape_type']}")
print(f" Smoothness: {landscape['smoothness']:.2f}")
print(f" Multimodal: {'YES' if landscape['multimodal'] else 'NO'} ({landscape['n_modes']} modes)")
print(f" Noise: {landscape['noise_level']:.2f}")
print(f" Characterization Confidence: {confidence:.1%}")
if confidence >= self.confidence_threshold:
print(f" Status: READY TO STOP (confidence >= {self.confidence_threshold:.0%})")
else:
remaining = self.confidence_threshold - confidence
print(f" Status: CONTINUE (need +{remaining:.1%} confidence)")
print(f"{'='*70}\n")
def _save_progress(self):
"""Save characterization progress to JSON."""
if not self.tracking_dir:
return
progress_data = {
'min_trials': self.min_trials,
'max_trials': self.max_trials,
'confidence_threshold': self.confidence_threshold,
'metric_history': [
{
'trial_number': s.trial_number,
'smoothness': s.smoothness,
'multimodal': s.multimodal,
'n_modes': s.n_modes,
'noise_level': s.noise_level,
'landscape_type': s.landscape_type,
'confidence': s.overall_confidence
}
for s in self.metric_history
],
'should_stop': self.should_stop_flag,
'stop_reason': self.stop_reason,
'final_confidence': self.final_confidence,
'timestamp': datetime.now().isoformat()
}
try:
with open(self.characterization_log, 'w') as f:
json.dump(progress_data, f, indent=2)
except Exception as e:
if self.verbose:
print(f" Warning: Failed to save characterization progress: {e}")
def get_summary_report(self) -> str:
"""Generate summary report of characterization phase."""
if not self.metric_history:
return "No characterization data available"
final_snapshot = self.metric_history[-1]
report = "\n" + "="*70 + "\n"
report += " CHARACTERIZATION PHASE SUMMARY\n"
report += "="*70 + "\n"
report += f" Total Trials: {final_snapshot.trial_number}\n"
report += f" Final Confidence: {final_snapshot.overall_confidence:.1%}\n"
report += f" Stop Reason: {self.stop_reason}\n"
report += f"\n FINAL LANDSCAPE CLASSIFICATION:\n"
report += f" Type: {final_snapshot.landscape_type.upper()}\n"
report += f" Smoothness: {final_snapshot.smoothness:.2f}\n"
report += f" Multimodal: {'YES' if final_snapshot.multimodal else 'NO'} ({final_snapshot.n_modes} modes)\n"
report += f" Noise Level: {final_snapshot.noise_level:.2f}\n"
if len(self.metric_history) >= 2:
report += f"\n METRIC CONVERGENCE:\n"
# Show how metrics evolved
first = self.metric_history[0]
last = self.metric_history[-1]
smoothness_change = abs(last.smoothness - first.smoothness)
report += f" Smoothness stability: {smoothness_change:.3f} (lower = more stable)\n"
type_changes = len(set(s.landscape_type for s in self.metric_history))
report += f" Landscape type changes: {type_changes - 1}\n"
report += "="*70 + "\n"
return report