feat: Implement Protocol 13 - Real-Time Dashboard Tracking
Complete implementation of Protocol 13 featuring real-time web dashboard for monitoring multi-objective optimization studies. ## New Features ### Backend (Python) - Real-time tracking system with per-trial JSON writes - New API endpoints for metadata, optimizer state, and Pareto fronts - Unit inference from objective descriptions - Multi-objective support using Optuna's best_trials API ### Frontend (React + TypeScript) - OptimizerPanel: Real-time optimizer state (phase, strategy, progress) - ParetoPlot: Pareto front visualization with normalization toggle - 3 modes: Raw, Min-Max [0-1], Z-Score standardization - Pareto front line connecting optimal points - ParallelCoordinatesPlot: High-dimensional interactive visualization - Objectives + design variables on parallel axes - Click-to-select, hover-to-highlight - Color-coded feasibility - Dynamic units throughout all visualizations ### Documentation - Comprehensive Protocol 13 guide with architecture, data flow, usage ## Files Added - `docs/PROTOCOL_13_DASHBOARD.md` - `atomizer-dashboard/frontend/src/components/OptimizerPanel.tsx` - `atomizer-dashboard/frontend/src/components/ParetoPlot.tsx` - `atomizer-dashboard/frontend/src/components/ParallelCoordinatesPlot.tsx` - `optimization_engine/realtime_tracking.py` ## Files Modified - `atomizer-dashboard/frontend/src/pages/Dashboard.tsx` - `atomizer-dashboard/backend/api/routes/optimization.py` - `optimization_engine/intelligent_optimizer.py` ## Testing - Tested with bracket_stiffness_optimization_V2 (30 trials, 20 Pareto solutions) - Dashboard running on localhost:3001 - All P1 and P2 features verified working 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
560
optimization_engine/intelligent_optimizer.py
Normal file
560
optimization_engine/intelligent_optimizer.py
Normal file
@@ -0,0 +1,560 @@
|
||||
"""
|
||||
Intelligent Multi-Strategy Optimizer - Protocol 10 Implementation.
|
||||
|
||||
This is the main orchestrator for Protocol 10: Intelligent Multi-Strategy
|
||||
Optimization (IMSO). It coordinates landscape analysis, strategy selection,
|
||||
and dynamic strategy switching to create a self-tuning optimization system.
|
||||
|
||||
Architecture:
|
||||
1. Landscape Analyzer: Characterizes the optimization problem
|
||||
2. Strategy Selector: Recommends best algorithm based on characteristics
|
||||
3. Strategy Portfolio Manager: Handles dynamic switching between strategies
|
||||
4. Adaptive Callbacks: Integrates with Optuna for runtime adaptation
|
||||
|
||||
This module enables Atomizer to automatically adapt to different FEA problem
|
||||
types without requiring manual algorithm configuration.
|
||||
|
||||
Usage:
|
||||
from optimization_engine.intelligent_optimizer import IntelligentOptimizer
|
||||
|
||||
optimizer = IntelligentOptimizer(
|
||||
study_name="my_study",
|
||||
study_dir=Path("results"),
|
||||
config=config_dict
|
||||
)
|
||||
|
||||
best_params = optimizer.optimize(
|
||||
objective_function=my_objective,
|
||||
n_trials=100
|
||||
)
|
||||
"""
|
||||
|
||||
import optuna
|
||||
from pathlib import Path
|
||||
from typing import Dict, Callable, Optional, Any
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from optimization_engine.landscape_analyzer import LandscapeAnalyzer, print_landscape_report
|
||||
from optimization_engine.strategy_selector import (
|
||||
IntelligentStrategySelector,
|
||||
create_sampler_from_config
|
||||
)
|
||||
from optimization_engine.strategy_portfolio import (
|
||||
StrategyTransitionManager,
|
||||
AdaptiveStrategyCallback
|
||||
)
|
||||
from optimization_engine.adaptive_surrogate import AdaptiveExploitationCallback
|
||||
from optimization_engine.adaptive_characterization import CharacterizationStoppingCriterion
|
||||
from optimization_engine.realtime_tracking import create_realtime_callback
|
||||
|
||||
|
||||
class IntelligentOptimizer:
|
||||
"""
|
||||
Self-tuning multi-strategy optimizer for FEA problems.
|
||||
|
||||
This class implements Protocol 10: Intelligent Multi-Strategy Optimization.
|
||||
It automatically:
|
||||
1. Analyzes problem characteristics
|
||||
2. Selects appropriate optimization algorithms
|
||||
3. Switches strategies dynamically based on performance
|
||||
4. Logs all decisions for transparency and learning
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
study_name: str,
|
||||
study_dir: Path,
|
||||
config: Dict,
|
||||
verbose: bool = True
|
||||
):
|
||||
"""
|
||||
Initialize intelligent optimizer.
|
||||
|
||||
Args:
|
||||
study_name: Name for the optimization study
|
||||
study_dir: Directory to save optimization results
|
||||
config: Configuration dictionary with Protocol 10 settings
|
||||
verbose: Print detailed progress information
|
||||
"""
|
||||
self.study_name = study_name
|
||||
self.study_dir = Path(study_dir)
|
||||
self.config = config
|
||||
self.verbose = verbose
|
||||
|
||||
# Extract Protocol 10 configuration
|
||||
self.protocol_config = config.get('intelligent_optimization', {})
|
||||
self.enabled = self.protocol_config.get('enabled', True)
|
||||
|
||||
# Setup tracking directory
|
||||
self.tracking_dir = self.study_dir / "intelligent_optimizer"
|
||||
self.tracking_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Initialize components
|
||||
self.landscape_analyzer = LandscapeAnalyzer(
|
||||
min_trials_for_analysis=self.protocol_config.get('min_analysis_trials', 10)
|
||||
)
|
||||
|
||||
self.strategy_selector = IntelligentStrategySelector(verbose=verbose)
|
||||
|
||||
self.transition_manager = StrategyTransitionManager(
|
||||
stagnation_window=self.protocol_config.get('stagnation_window', 10),
|
||||
min_improvement_threshold=self.protocol_config.get('min_improvement_threshold', 0.001),
|
||||
verbose=verbose,
|
||||
tracking_dir=self.tracking_dir
|
||||
)
|
||||
|
||||
# State tracking
|
||||
self.current_phase = "initialization"
|
||||
self.current_strategy = None
|
||||
self.landscape_cache = None
|
||||
self.recommendation_cache = None
|
||||
|
||||
# Optuna study (will be created in optimize())
|
||||
self.study: Optional[optuna.Study] = None
|
||||
self.directions: Optional[list] = None # Store study directions
|
||||
|
||||
# Protocol 13: Create realtime tracking callback
|
||||
self.realtime_callback = create_realtime_callback(
|
||||
tracking_dir=self.tracking_dir,
|
||||
optimizer_ref=self,
|
||||
verbose=self.verbose
|
||||
)
|
||||
|
||||
# Protocol 11: Print multi-objective support notice
|
||||
if self.verbose:
|
||||
print(f"\n[Protocol 11] Multi-objective optimization: ENABLED")
|
||||
print(f"[Protocol 11] Supports single-objective and multi-objective studies")
|
||||
print(f"[Protocol 13] Real-time tracking: ENABLED (per-trial JSON writes)")
|
||||
|
||||
def optimize(
|
||||
self,
|
||||
objective_function: Callable,
|
||||
design_variables: Dict[str, tuple],
|
||||
n_trials: int = 100,
|
||||
target_value: Optional[float] = None,
|
||||
tolerance: float = 0.1,
|
||||
directions: Optional[list] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run intelligent multi-strategy optimization.
|
||||
|
||||
This is the main entry point that orchestrates the entire Protocol 10 process.
|
||||
|
||||
Args:
|
||||
objective_function: Function to minimize, signature: f(trial) -> float or tuple
|
||||
design_variables: Dict of {var_name: (low, high)} bounds
|
||||
n_trials: Total trial budget
|
||||
target_value: Target objective value (optional, for single-objective)
|
||||
tolerance: Acceptable error from target
|
||||
directions: List of 'minimize' or 'maximize' for multi-objective (e.g., ['minimize', 'minimize'])
|
||||
If None, defaults to single-objective minimization
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- best_params: Best parameter configuration found
|
||||
- best_value: Best objective value achieved (or tuple for multi-objective)
|
||||
- strategy_used: Final strategy used
|
||||
- landscape_analysis: Problem characterization
|
||||
- performance_summary: Strategy performance breakdown
|
||||
"""
|
||||
# Store directions for study creation
|
||||
self.directions = directions
|
||||
if not self.enabled:
|
||||
return self._run_fallback_optimization(
|
||||
objective_function, design_variables, n_trials
|
||||
)
|
||||
|
||||
# Stage 1: Adaptive Characterization
|
||||
self.current_phase = "characterization"
|
||||
if self.verbose:
|
||||
self._print_phase_header("STAGE 1: ADAPTIVE CHARACTERIZATION")
|
||||
|
||||
# Get characterization config
|
||||
char_config = self.protocol_config.get('characterization', {})
|
||||
min_trials = char_config.get('min_trials', 10)
|
||||
max_trials = char_config.get('max_trials', 30)
|
||||
confidence_threshold = char_config.get('confidence_threshold', 0.85)
|
||||
check_interval = char_config.get('check_interval', 5)
|
||||
|
||||
# Create stopping criterion
|
||||
stopping_criterion = CharacterizationStoppingCriterion(
|
||||
min_trials=min_trials,
|
||||
max_trials=max_trials,
|
||||
confidence_threshold=confidence_threshold,
|
||||
check_interval=check_interval,
|
||||
verbose=self.verbose,
|
||||
tracking_dir=self.tracking_dir
|
||||
)
|
||||
|
||||
# Create characterization study with random sampler (unbiased exploration)
|
||||
self.study = self._create_study(
|
||||
sampler=optuna.samplers.RandomSampler(),
|
||||
design_variables=design_variables
|
||||
)
|
||||
|
||||
# Run adaptive characterization
|
||||
while not stopping_criterion.should_stop(self.study):
|
||||
# Run batch of trials
|
||||
self.study.optimize(
|
||||
objective_function,
|
||||
n_trials=check_interval,
|
||||
callbacks=[self.realtime_callback]
|
||||
)
|
||||
|
||||
# Analyze landscape
|
||||
self.landscape_cache = self.landscape_analyzer.analyze(self.study)
|
||||
|
||||
# Update stopping criterion
|
||||
if self.landscape_cache.get('ready', False):
|
||||
completed_trials = [t for t in self.study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
||||
stopping_criterion.update(self.landscape_cache, len(completed_trials))
|
||||
|
||||
# Print characterization summary
|
||||
if self.verbose:
|
||||
print(stopping_criterion.get_summary_report())
|
||||
print_landscape_report(self.landscape_cache)
|
||||
|
||||
# Stage 2: Intelligent Strategy Selection
|
||||
self.current_phase = "strategy_selection"
|
||||
if self.verbose:
|
||||
self._print_phase_header("STAGE 2: STRATEGY SELECTION")
|
||||
|
||||
strategy, recommendation = self.strategy_selector.recommend_strategy(
|
||||
landscape=self.landscape_cache,
|
||||
trials_completed=len(self.study.trials),
|
||||
trials_budget=n_trials
|
||||
)
|
||||
|
||||
self.current_strategy = strategy
|
||||
self.recommendation_cache = recommendation
|
||||
|
||||
# Create new study with recommended strategy
|
||||
sampler = create_sampler_from_config(recommendation['sampler_config'])
|
||||
self.study = self._create_study(
|
||||
sampler=sampler,
|
||||
design_variables=design_variables,
|
||||
load_from_previous=True # Preserve initial trials
|
||||
)
|
||||
|
||||
# Setup adaptive callbacks
|
||||
callbacks = self._create_callbacks(target_value, tolerance)
|
||||
|
||||
# Stage 3: Adaptive Optimization with Monitoring
|
||||
self.current_phase = "adaptive_optimization"
|
||||
if self.verbose:
|
||||
self._print_phase_header("STAGE 3: ADAPTIVE OPTIMIZATION")
|
||||
|
||||
remaining_trials = n_trials - len(self.study.trials)
|
||||
|
||||
if remaining_trials > 0:
|
||||
# Add realtime tracking to callbacks
|
||||
all_callbacks = callbacks + [self.realtime_callback]
|
||||
self.study.optimize(
|
||||
objective_function,
|
||||
n_trials=remaining_trials,
|
||||
callbacks=all_callbacks
|
||||
)
|
||||
|
||||
# Generate final report
|
||||
results = self._compile_results()
|
||||
|
||||
if self.verbose:
|
||||
self._print_final_summary(results)
|
||||
|
||||
return results
|
||||
|
||||
def _create_study(
|
||||
self,
|
||||
sampler: optuna.samplers.BaseSampler,
|
||||
design_variables: Dict[str, tuple],
|
||||
load_from_previous: bool = False
|
||||
) -> optuna.Study:
|
||||
"""
|
||||
Create Optuna study with specified sampler.
|
||||
|
||||
Args:
|
||||
sampler: Optuna sampler to use
|
||||
design_variables: Parameter bounds
|
||||
load_from_previous: Load trials from previous study
|
||||
|
||||
Returns:
|
||||
Configured Optuna study
|
||||
"""
|
||||
# Create study storage
|
||||
storage_path = self.study_dir / "study.db"
|
||||
storage = f"sqlite:///{storage_path}"
|
||||
|
||||
if load_from_previous and storage_path.exists():
|
||||
# Load existing study and change sampler
|
||||
study = optuna.load_study(
|
||||
study_name=self.study_name,
|
||||
storage=storage,
|
||||
sampler=sampler
|
||||
)
|
||||
else:
|
||||
# Create new study (single or multi-objective)
|
||||
if self.directions is not None:
|
||||
# Multi-objective optimization
|
||||
study = optuna.create_study(
|
||||
study_name=self.study_name,
|
||||
storage=storage,
|
||||
directions=self.directions,
|
||||
sampler=sampler,
|
||||
load_if_exists=True
|
||||
)
|
||||
else:
|
||||
# Single-objective optimization (backward compatibility)
|
||||
study = optuna.create_study(
|
||||
study_name=self.study_name,
|
||||
storage=storage,
|
||||
direction='minimize',
|
||||
sampler=sampler,
|
||||
load_if_exists=True
|
||||
)
|
||||
|
||||
return study
|
||||
|
||||
def _create_callbacks(
|
||||
self,
|
||||
target_value: Optional[float],
|
||||
tolerance: float
|
||||
) -> list:
|
||||
"""Create list of Optuna callbacks for adaptive optimization."""
|
||||
callbacks = []
|
||||
|
||||
# Adaptive exploitation callback (from Protocol 8)
|
||||
adaptive_callback = AdaptiveExploitationCallback(
|
||||
target_value=target_value,
|
||||
tolerance=tolerance,
|
||||
min_confidence_for_exploitation=0.65,
|
||||
min_trials=15,
|
||||
verbose=self.verbose,
|
||||
tracking_dir=self.tracking_dir
|
||||
)
|
||||
callbacks.append(adaptive_callback)
|
||||
|
||||
# Strategy switching callback (Protocol 10)
|
||||
strategy_callback = AdaptiveStrategyCallback(
|
||||
transition_manager=self.transition_manager,
|
||||
landscape_analyzer=self.landscape_analyzer,
|
||||
strategy_selector=self.strategy_selector,
|
||||
reanalysis_interval=self.protocol_config.get('reanalysis_interval', 15)
|
||||
)
|
||||
callbacks.append(strategy_callback)
|
||||
|
||||
return callbacks
|
||||
|
||||
def _compile_results(self) -> Dict[str, Any]:
|
||||
"""Compile comprehensive optimization results (supports single and multi-objective)."""
|
||||
is_multi_objective = len(self.study.directions) > 1
|
||||
|
||||
if is_multi_objective:
|
||||
# Multi-objective: Return Pareto front info
|
||||
best_trials = self.study.best_trials
|
||||
if best_trials:
|
||||
# Select the first Pareto-optimal solution as representative
|
||||
representative_trial = best_trials[0]
|
||||
best_params = representative_trial.params
|
||||
best_value = representative_trial.values # Tuple of objectives
|
||||
best_trial_num = representative_trial.number
|
||||
else:
|
||||
best_params = {}
|
||||
best_value = None
|
||||
best_trial_num = None
|
||||
else:
|
||||
# Single-objective: Use standard Optuna API
|
||||
best_params = self.study.best_params
|
||||
best_value = self.study.best_value
|
||||
best_trial_num = self.study.best_trial.number
|
||||
|
||||
return {
|
||||
'best_params': best_params,
|
||||
'best_value': best_value,
|
||||
'best_trial': best_trial_num,
|
||||
'is_multi_objective': is_multi_objective,
|
||||
'pareto_front_size': len(self.study.best_trials) if is_multi_objective else 1,
|
||||
'total_trials': len(self.study.trials),
|
||||
'final_strategy': self.current_strategy,
|
||||
'landscape_analysis': self.landscape_cache,
|
||||
'strategy_recommendation': self.recommendation_cache,
|
||||
'transition_history': self.transition_manager.transition_history,
|
||||
'strategy_performance': {
|
||||
name: {
|
||||
'trials_used': perf.trials_used,
|
||||
'best_value': perf.best_value_achieved,
|
||||
'improvement_rate': perf.improvement_rate
|
||||
}
|
||||
for name, perf in self.transition_manager.strategy_history.items()
|
||||
},
|
||||
'protocol_used': 'Protocol 10: Intelligent Multi-Strategy Optimization'
|
||||
}
|
||||
|
||||
def _run_fallback_optimization(
|
||||
self,
|
||||
objective_function: Callable,
|
||||
design_variables: Dict[str, tuple],
|
||||
n_trials: int
|
||||
) -> Dict[str, Any]:
|
||||
"""Fallback to standard TPE optimization if Protocol 10 is disabled (supports multi-objective)."""
|
||||
if self.verbose:
|
||||
print("\n Protocol 10 disabled - using standard TPE optimization\n")
|
||||
|
||||
sampler = optuna.samplers.TPESampler(multivariate=True, n_startup_trials=10)
|
||||
self.study = self._create_study(sampler, design_variables)
|
||||
|
||||
self.study.optimize(
|
||||
objective_function,
|
||||
n_trials=n_trials,
|
||||
callbacks=[self.realtime_callback]
|
||||
)
|
||||
|
||||
# Handle both single and multi-objective
|
||||
is_multi_objective = len(self.study.directions) > 1
|
||||
|
||||
if is_multi_objective:
|
||||
best_trials = self.study.best_trials
|
||||
if best_trials:
|
||||
representative_trial = best_trials[0]
|
||||
best_params = representative_trial.params
|
||||
best_value = representative_trial.values
|
||||
best_trial_num = representative_trial.number
|
||||
else:
|
||||
best_params = {}
|
||||
best_value = None
|
||||
best_trial_num = None
|
||||
else:
|
||||
best_params = self.study.best_params
|
||||
best_value = self.study.best_value
|
||||
best_trial_num = self.study.best_trial.number
|
||||
|
||||
return {
|
||||
'best_params': best_params,
|
||||
'best_value': best_value,
|
||||
'best_trial': best_trial_num,
|
||||
'is_multi_objective': is_multi_objective,
|
||||
'total_trials': len(self.study.trials),
|
||||
'protocol_used': 'Standard TPE (Protocol 10 disabled)'
|
||||
}
|
||||
|
||||
def _print_phase_header(self, phase_name: str):
|
||||
"""Print formatted phase transition header."""
|
||||
print(f"\n{'='*70}")
|
||||
print(f" {phase_name}")
|
||||
print(f"{'='*70}\n")
|
||||
|
||||
def _print_final_summary(self, results: Dict):
|
||||
"""Print comprehensive final optimization summary."""
|
||||
print(f"\n{'='*70}")
|
||||
print(f" OPTIMIZATION COMPLETE")
|
||||
print(f"{'='*70}")
|
||||
print(f" Protocol: {results['protocol_used']}")
|
||||
print(f" Total Trials: {results['total_trials']}")
|
||||
|
||||
# Handle both single and multi-objective best values
|
||||
best_value = results['best_value']
|
||||
if results.get('is_multi_objective', False):
|
||||
# Multi-objective: best_value is a tuple
|
||||
formatted_value = str(best_value) # Show as tuple
|
||||
print(f" Best Values (Pareto): {formatted_value} (Trial #{results['best_trial']})")
|
||||
else:
|
||||
# Single-objective: best_value is a scalar
|
||||
print(f" Best Value: {best_value:.6f} (Trial #{results['best_trial']})")
|
||||
|
||||
print(f" Final Strategy: {results.get('final_strategy', 'N/A').upper()}")
|
||||
|
||||
if results.get('transition_history'):
|
||||
print(f"\n Strategy Transitions: {len(results['transition_history'])}")
|
||||
for event in results['transition_history']:
|
||||
print(f" Trial #{event['trial_number']}: "
|
||||
f"{event['from_strategy']} → {event['to_strategy']}")
|
||||
|
||||
print(f"\n Best Parameters:")
|
||||
for param, value in results['best_params'].items():
|
||||
print(f" {param}: {value:.6f}")
|
||||
|
||||
print(f"{'='*70}\n")
|
||||
|
||||
# Print strategy performance report
|
||||
if self.transition_manager.strategy_history:
|
||||
print(self.transition_manager.get_performance_report())
|
||||
|
||||
def save_intelligence_report(self, filepath: Optional[Path] = None):
|
||||
"""
|
||||
Save comprehensive intelligence report to JSON.
|
||||
|
||||
This report contains all decision-making data for transparency,
|
||||
debugging, and transfer learning to future optimizations.
|
||||
"""
|
||||
if filepath is None:
|
||||
filepath = self.tracking_dir / "intelligence_report.json"
|
||||
|
||||
report = {
|
||||
'study_name': self.study_name,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'configuration': self.protocol_config,
|
||||
'landscape_analysis': self.landscape_cache,
|
||||
'initial_recommendation': self.recommendation_cache,
|
||||
'final_strategy': self.current_strategy,
|
||||
'transition_history': self.transition_manager.transition_history,
|
||||
'strategy_performance': {
|
||||
name: {
|
||||
'trials_used': perf.trials_used,
|
||||
'best_value_achieved': perf.best_value_achieved,
|
||||
'improvement_rate': perf.improvement_rate,
|
||||
'last_used_trial': perf.last_used_trial
|
||||
}
|
||||
for name, perf in self.transition_manager.strategy_history.items()
|
||||
},
|
||||
'recommendation_history': self.strategy_selector.recommendation_history
|
||||
}
|
||||
|
||||
try:
|
||||
with open(filepath, 'w') as f:
|
||||
json.dump(report, f, indent=2)
|
||||
|
||||
if self.verbose:
|
||||
print(f"\n Intelligence report saved: {filepath}\n")
|
||||
except Exception as e:
|
||||
if self.verbose:
|
||||
print(f"\n Warning: Failed to save intelligence report: {e}\n")
|
||||
|
||||
|
||||
# Convenience function for quick usage
|
||||
def create_intelligent_optimizer(
|
||||
study_name: str,
|
||||
study_dir: Path,
|
||||
config: Optional[Dict] = None,
|
||||
verbose: bool = True
|
||||
) -> IntelligentOptimizer:
|
||||
"""
|
||||
Factory function to create IntelligentOptimizer with sensible defaults.
|
||||
|
||||
Args:
|
||||
study_name: Name for the optimization study
|
||||
study_dir: Directory for results
|
||||
config: Optional configuration (uses defaults if None)
|
||||
verbose: Print progress
|
||||
|
||||
Returns:
|
||||
Configured IntelligentOptimizer instance
|
||||
"""
|
||||
if config is None:
|
||||
# Default Protocol 10 configuration
|
||||
config = {
|
||||
'intelligent_optimization': {
|
||||
'enabled': True,
|
||||
'characterization_trials': 15,
|
||||
'stagnation_window': 10,
|
||||
'min_improvement_threshold': 0.001,
|
||||
'min_analysis_trials': 10,
|
||||
'reanalysis_interval': 15
|
||||
}
|
||||
}
|
||||
|
||||
return IntelligentOptimizer(
|
||||
study_name=study_name,
|
||||
study_dir=study_dir,
|
||||
config=config,
|
||||
verbose=verbose
|
||||
)
|
||||
258
optimization_engine/realtime_tracking.py
Normal file
258
optimization_engine/realtime_tracking.py
Normal file
@@ -0,0 +1,258 @@
|
||||
"""
|
||||
Realtime Tracking System for Intelligent Optimizer
|
||||
|
||||
This module provides per-trial callbacks that write JSON tracking files
|
||||
immediately after each trial completes. This enables real-time dashboard
|
||||
updates and optimizer state visibility.
|
||||
|
||||
Protocol 13: Real-Time Tracking
|
||||
- Write JSON files AFTER EVERY SINGLE TRIAL
|
||||
- Use atomic writes (temp file + rename)
|
||||
- No batching allowed
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, Optional
|
||||
import optuna
|
||||
|
||||
|
||||
class RealtimeTrackingCallback:
|
||||
"""
|
||||
Optuna callback that writes tracking files after each trial.
|
||||
|
||||
Files Written (EVERY TRIAL):
|
||||
- optimizer_state.json: Current strategy, phase, confidence
|
||||
- strategy_history.json: Append-only log of all recommendations
|
||||
- trial_log.json: Append-only log of all trials with timestamps
|
||||
- landscape_snapshot.json: Latest landscape analysis (if available)
|
||||
- confidence_history.json: Confidence scores over time
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tracking_dir: Path,
|
||||
optimizer_ref: Any, # Reference to IntelligentOptimizer instance
|
||||
verbose: bool = True
|
||||
):
|
||||
"""
|
||||
Initialize realtime tracking callback.
|
||||
|
||||
Args:
|
||||
tracking_dir: Directory to write JSON files (intelligent_optimizer/)
|
||||
optimizer_ref: Reference to parent IntelligentOptimizer for state access
|
||||
verbose: Print status messages
|
||||
"""
|
||||
self.tracking_dir = Path(tracking_dir)
|
||||
self.tracking_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.optimizer = optimizer_ref
|
||||
self.verbose = verbose
|
||||
|
||||
# Initialize tracking files
|
||||
self._initialize_files()
|
||||
|
||||
def _initialize_files(self):
|
||||
"""Create initial empty tracking files."""
|
||||
# Strategy history (append-only)
|
||||
strategy_history_file = self.tracking_dir / "strategy_history.json"
|
||||
if not strategy_history_file.exists():
|
||||
self._atomic_write(strategy_history_file, [])
|
||||
|
||||
# Trial log (append-only)
|
||||
trial_log_file = self.tracking_dir / "trial_log.json"
|
||||
if not trial_log_file.exists():
|
||||
self._atomic_write(trial_log_file, [])
|
||||
|
||||
# Confidence history (append-only)
|
||||
confidence_file = self.tracking_dir / "confidence_history.json"
|
||||
if not confidence_file.exists():
|
||||
self._atomic_write(confidence_file, [])
|
||||
|
||||
def __call__(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
||||
"""
|
||||
Called after each trial completes.
|
||||
|
||||
Args:
|
||||
study: Optuna study object
|
||||
trial: Completed trial
|
||||
"""
|
||||
try:
|
||||
# Skip if trial didn't complete successfully
|
||||
if trial.state != optuna.trial.TrialState.COMPLETE:
|
||||
return
|
||||
|
||||
# Write all tracking files
|
||||
self._write_optimizer_state(study, trial)
|
||||
self._write_trial_log(study, trial)
|
||||
self._write_strategy_history(study, trial)
|
||||
self._write_landscape_snapshot(study, trial)
|
||||
self._write_confidence_history(study, trial)
|
||||
|
||||
if self.verbose:
|
||||
print(f"[Realtime Tracking] Trial #{trial.number} logged to {self.tracking_dir}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[Realtime Tracking] WARNING: Failed to write tracking files: {e}")
|
||||
|
||||
def _write_optimizer_state(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
||||
"""Write current optimizer state."""
|
||||
state = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"trial_number": trial.number,
|
||||
"total_trials": len(study.trials),
|
||||
"current_phase": getattr(self.optimizer, 'current_phase', 'unknown'),
|
||||
"current_strategy": getattr(self.optimizer, 'current_strategy', 'unknown'),
|
||||
"is_multi_objective": len(study.directions) > 1,
|
||||
"study_directions": [str(d) for d in study.directions],
|
||||
}
|
||||
|
||||
# Add latest strategy recommendation if available
|
||||
if hasattr(self.optimizer, 'strategy_selector') and hasattr(self.optimizer.strategy_selector, 'recommendation_history'):
|
||||
history = self.optimizer.strategy_selector.recommendation_history
|
||||
if history:
|
||||
latest = history[-1]
|
||||
state["latest_recommendation"] = {
|
||||
"strategy": latest.get("strategy", "unknown"),
|
||||
"confidence": latest.get("confidence", 0.0),
|
||||
"reasoning": latest.get("reasoning", "")
|
||||
}
|
||||
|
||||
self._atomic_write(self.tracking_dir / "optimizer_state.json", state)
|
||||
|
||||
def _write_trial_log(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
||||
"""Append trial to trial log."""
|
||||
trial_log_file = self.tracking_dir / "trial_log.json"
|
||||
|
||||
# Read existing log
|
||||
if trial_log_file.exists():
|
||||
with open(trial_log_file, 'r') as f:
|
||||
log = json.load(f)
|
||||
else:
|
||||
log = []
|
||||
|
||||
# Append new trial
|
||||
trial_entry = {
|
||||
"trial_number": trial.number,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"state": str(trial.state),
|
||||
"params": trial.params,
|
||||
"value": trial.value if trial.value is not None else None,
|
||||
"values": trial.values if hasattr(trial, 'values') and trial.values is not None else None,
|
||||
"duration_seconds": (trial.datetime_complete - trial.datetime_start).total_seconds() if trial.datetime_complete else None,
|
||||
"user_attrs": dict(trial.user_attrs) if trial.user_attrs else {}
|
||||
}
|
||||
|
||||
log.append(trial_entry)
|
||||
self._atomic_write(trial_log_file, log)
|
||||
|
||||
def _write_strategy_history(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
||||
"""Append strategy recommendation to history."""
|
||||
if not hasattr(self.optimizer, 'strategy_selector'):
|
||||
return
|
||||
|
||||
strategy_file = self.tracking_dir / "strategy_history.json"
|
||||
|
||||
# Read existing history
|
||||
if strategy_file.exists():
|
||||
with open(strategy_file, 'r') as f:
|
||||
history = json.load(f)
|
||||
else:
|
||||
history = []
|
||||
|
||||
# Get latest recommendation from strategy selector
|
||||
if hasattr(self.optimizer.strategy_selector, 'recommendation_history'):
|
||||
selector_history = self.optimizer.strategy_selector.recommendation_history
|
||||
if selector_history:
|
||||
latest = selector_history[-1]
|
||||
# Only append if this is a new recommendation (not duplicate)
|
||||
if not history or history[-1].get('trial_number') != trial.number:
|
||||
history.append({
|
||||
"trial_number": trial.number,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"strategy": latest.get("strategy", "unknown"),
|
||||
"confidence": latest.get("confidence", 0.0),
|
||||
"reasoning": latest.get("reasoning", "")
|
||||
})
|
||||
|
||||
self._atomic_write(strategy_file, history)
|
||||
|
||||
def _write_landscape_snapshot(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
||||
"""Write latest landscape analysis snapshot."""
|
||||
if not hasattr(self.optimizer, 'landscape_cache'):
|
||||
return
|
||||
|
||||
landscape = self.optimizer.landscape_cache
|
||||
if landscape is None:
|
||||
# Multi-objective - no landscape analysis
|
||||
snapshot = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"trial_number": trial.number,
|
||||
"ready": False,
|
||||
"message": "Landscape analysis not supported for multi-objective optimization"
|
||||
}
|
||||
else:
|
||||
snapshot = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"trial_number": trial.number,
|
||||
**landscape
|
||||
}
|
||||
|
||||
self._atomic_write(self.tracking_dir / "landscape_snapshot.json", snapshot)
|
||||
|
||||
def _write_confidence_history(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
|
||||
"""Append confidence score to history."""
|
||||
confidence_file = self.tracking_dir / "confidence_history.json"
|
||||
|
||||
# Read existing history
|
||||
if confidence_file.exists():
|
||||
with open(confidence_file, 'r') as f:
|
||||
history = json.load(f)
|
||||
else:
|
||||
history = []
|
||||
|
||||
# Get confidence from latest recommendation
|
||||
confidence = 0.0
|
||||
if hasattr(self.optimizer, 'strategy_selector') and hasattr(self.optimizer.strategy_selector, 'recommendation_history'):
|
||||
selector_history = self.optimizer.strategy_selector.recommendation_history
|
||||
if selector_history:
|
||||
confidence = selector_history[-1].get("confidence", 0.0)
|
||||
|
||||
history.append({
|
||||
"trial_number": trial.number,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"confidence": confidence
|
||||
})
|
||||
|
||||
self._atomic_write(confidence_file, history)
|
||||
|
||||
def _atomic_write(self, filepath: Path, data: Any):
|
||||
"""
|
||||
Write JSON file atomically (temp file + rename).
|
||||
|
||||
This prevents dashboard from reading partial/corrupted files.
|
||||
"""
|
||||
temp_file = filepath.with_suffix('.tmp')
|
||||
try:
|
||||
with open(temp_file, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
# Atomic rename
|
||||
temp_file.replace(filepath)
|
||||
except Exception as e:
|
||||
if temp_file.exists():
|
||||
temp_file.unlink()
|
||||
raise e
|
||||
|
||||
|
||||
def create_realtime_callback(tracking_dir: Path, optimizer_ref: Any, verbose: bool = True) -> RealtimeTrackingCallback:
|
||||
"""
|
||||
Factory function to create realtime tracking callback.
|
||||
|
||||
Usage in IntelligentOptimizer:
|
||||
```python
|
||||
callback = create_realtime_callback(self.tracking_dir, self, verbose=self.verbose)
|
||||
self.study.optimize(objective_function, n_trials=n, callbacks=[callback])
|
||||
```
|
||||
"""
|
||||
return RealtimeTrackingCallback(tracking_dir, optimizer_ref, verbose)
|
||||
Reference in New Issue
Block a user