""" Realtime Tracking System for Intelligent Optimizer This module provides per-trial callbacks that write JSON tracking files immediately after each trial completes. This enables real-time dashboard updates and optimizer state visibility. Protocol 13: Real-Time Tracking - Write JSON files AFTER EVERY SINGLE TRIAL - Use atomic writes (temp file + rename) - No batching allowed """ import json import time from pathlib import Path from datetime import datetime from typing import Dict, Any, Optional import optuna class RealtimeTrackingCallback: """ Optuna callback that writes tracking files after each trial. Files Written (EVERY TRIAL): - optimizer_state.json: Current strategy, phase, confidence - strategy_history.json: Append-only log of all recommendations - trial_log.json: Append-only log of all trials with timestamps - landscape_snapshot.json: Latest landscape analysis (if available) - confidence_history.json: Confidence scores over time """ def __init__( self, tracking_dir: Path, optimizer_ref: Any, # Reference to IntelligentOptimizer instance verbose: bool = True ): """ Initialize realtime tracking callback. Args: tracking_dir: Directory to write JSON files (intelligent_optimizer/) optimizer_ref: Reference to parent IntelligentOptimizer for state access verbose: Print status messages """ self.tracking_dir = Path(tracking_dir) self.tracking_dir.mkdir(parents=True, exist_ok=True) self.optimizer = optimizer_ref self.verbose = verbose # Initialize tracking files self._initialize_files() def _initialize_files(self): """Create initial empty tracking files.""" # Strategy history (append-only) strategy_history_file = self.tracking_dir / "strategy_history.json" if not strategy_history_file.exists(): self._atomic_write(strategy_history_file, []) # Trial log (append-only) trial_log_file = self.tracking_dir / "trial_log.json" if not trial_log_file.exists(): self._atomic_write(trial_log_file, []) # Confidence history (append-only) confidence_file = self.tracking_dir / "confidence_history.json" if not confidence_file.exists(): self._atomic_write(confidence_file, []) def __call__(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """ Called after each trial completes. Args: study: Optuna study object trial: Completed trial """ try: # Skip if trial didn't complete successfully if trial.state != optuna.trial.TrialState.COMPLETE: return # Write all tracking files self._write_optimizer_state(study, trial) self._write_trial_log(study, trial) self._write_strategy_history(study, trial) self._write_landscape_snapshot(study, trial) self._write_confidence_history(study, trial) if self.verbose: print(f"[Realtime Tracking] Trial #{trial.number} logged to {self.tracking_dir}") except Exception as e: print(f"[Realtime Tracking] WARNING: Failed to write tracking files: {e}") def _write_optimizer_state(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """Write current optimizer state.""" # [Protocol 11] For multi-objective, strategy is always NSGA-II is_multi_objective = len(study.directions) > 1 if is_multi_objective: # Multi-objective studies use NSGA-II, skip adaptive characterization current_strategy = "NSGA-II" current_phase = "multi_objective_optimization" else: # Single-objective uses intelligent strategy selection current_strategy = getattr(self.optimizer, 'current_strategy', 'unknown') current_phase = getattr(self.optimizer, 'current_phase', 'unknown') state = { "timestamp": datetime.now().isoformat(), "trial_number": trial.number, "total_trials": len(study.trials), "current_phase": current_phase, "current_strategy": current_strategy, "is_multi_objective": is_multi_objective, "study_directions": [str(d) for d in study.directions], } # Add latest strategy recommendation if available if hasattr(self.optimizer, 'strategy_selector') and hasattr(self.optimizer.strategy_selector, 'recommendation_history'): history = self.optimizer.strategy_selector.recommendation_history if history: latest = history[-1] state["latest_recommendation"] = { "strategy": latest.get("strategy", "unknown"), "confidence": latest.get("confidence", 0.0), "reasoning": latest.get("reasoning", "") } self._atomic_write(self.tracking_dir / "optimizer_state.json", state) def _write_trial_log(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """Append trial to trial log.""" trial_log_file = self.tracking_dir / "trial_log.json" # Read existing log if trial_log_file.exists(): with open(trial_log_file, 'r') as f: log = json.load(f) else: log = [] # [Protocol 11] Handle both single and multi-objective is_multi_objective = len(study.directions) > 1 # Append new trial trial_entry = { "trial_number": trial.number, "timestamp": datetime.now().isoformat(), "state": str(trial.state), "params": trial.params, "duration_seconds": (trial.datetime_complete - trial.datetime_start).total_seconds() if trial.datetime_complete else None, "user_attrs": dict(trial.user_attrs) if trial.user_attrs else {} } # Add objectives (Protocol 11 compliant) if is_multi_objective: trial_entry["values"] = trial.values if trial.values is not None else None trial_entry["value"] = None # Not available else: trial_entry["value"] = trial.value if trial.value is not None else None trial_entry["values"] = None log.append(trial_entry) self._atomic_write(trial_log_file, log) def _write_strategy_history(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """Append strategy recommendation to history.""" if not hasattr(self.optimizer, 'strategy_selector'): return strategy_file = self.tracking_dir / "strategy_history.json" # Read existing history if strategy_file.exists(): with open(strategy_file, 'r') as f: history = json.load(f) else: history = [] # Get latest recommendation from strategy selector if hasattr(self.optimizer.strategy_selector, 'recommendation_history'): selector_history = self.optimizer.strategy_selector.recommendation_history if selector_history: latest = selector_history[-1] # Only append if this is a new recommendation (not duplicate) if not history or history[-1].get('trial_number') != trial.number: history.append({ "trial_number": trial.number, "timestamp": datetime.now().isoformat(), "strategy": latest.get("strategy", "unknown"), "confidence": latest.get("confidence", 0.0), "reasoning": latest.get("reasoning", "") }) self._atomic_write(strategy_file, history) def _write_landscape_snapshot(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """Write latest landscape analysis snapshot.""" if not hasattr(self.optimizer, 'landscape_cache'): return landscape = self.optimizer.landscape_cache if landscape is None: # Multi-objective - no landscape analysis snapshot = { "timestamp": datetime.now().isoformat(), "trial_number": trial.number, "ready": False, "message": "Landscape analysis not supported for multi-objective optimization" } else: snapshot = { "timestamp": datetime.now().isoformat(), "trial_number": trial.number, **landscape } self._atomic_write(self.tracking_dir / "landscape_snapshot.json", snapshot) def _write_confidence_history(self, study: optuna.Study, trial: optuna.trial.FrozenTrial): """Append confidence score to history.""" confidence_file = self.tracking_dir / "confidence_history.json" # Read existing history if confidence_file.exists(): with open(confidence_file, 'r') as f: history = json.load(f) else: history = [] # Get confidence from latest recommendation confidence = 0.0 if hasattr(self.optimizer, 'strategy_selector') and hasattr(self.optimizer.strategy_selector, 'recommendation_history'): selector_history = self.optimizer.strategy_selector.recommendation_history if selector_history: confidence = selector_history[-1].get("confidence", 0.0) history.append({ "trial_number": trial.number, "timestamp": datetime.now().isoformat(), "confidence": confidence }) self._atomic_write(confidence_file, history) def _atomic_write(self, filepath: Path, data: Any): """ Write JSON file atomically (temp file + rename). This prevents dashboard from reading partial/corrupted files. """ temp_file = filepath.with_suffix('.tmp') try: with open(temp_file, 'w') as f: json.dump(data, f, indent=2) # Atomic rename temp_file.replace(filepath) except Exception as e: if temp_file.exists(): temp_file.unlink() raise e def create_realtime_callback(tracking_dir: Path, optimizer_ref: Any, verbose: bool = True) -> RealtimeTrackingCallback: """ Factory function to create realtime tracking callback. Usage in IntelligentOptimizer: ```python callback = create_realtime_callback(self.tracking_dir, self, verbose=self.verbose) self.study.optimize(objective_function, n_trials=n, callbacks=[callback]) ``` """ return RealtimeTrackingCallback(tracking_dir, optimizer_ref, verbose)