Files
Atomizer/optimization_engine/realtime_tracking.py
Anto01 d228ccec66 refactor: Archive experimental LLM features for MVP stability (Phase 1.1)
Moved experimental LLM integration code to optimization_engine/future/:
- llm_optimization_runner.py - Runtime LLM API runner
- llm_workflow_analyzer.py - Workflow analysis
- inline_code_generator.py - Auto-generate calculations
- hook_generator.py - Auto-generate hooks
- report_generator.py - LLM report generation
- extractor_orchestrator.py - Extractor orchestration

Added comprehensive optimization_engine/future/README.md explaining:
- MVP LLM strategy (Claude Code skills, not runtime LLM)
- Why files were archived
- When to revisit post-MVP
- Production architecture reference

Production runner confirmed: optimization_engine/runner.py is sole active runner.

This establishes clear separation between:
- Production code (stable, no runtime LLM dependencies)
- Experimental code (archived for post-MVP exploration)

Part of Phase 1: Core Stabilization & Organization for MVP

Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 09:12:36 -05:00

280 lines
11 KiB
Python

"""
Realtime Tracking System for Intelligent Optimizer
This module provides per-trial callbacks that write JSON tracking files
immediately after each trial completes. This enables real-time dashboard
updates and optimizer state visibility.
Protocol 13: Real-Time Tracking
- Write JSON files AFTER EVERY SINGLE TRIAL
- Use atomic writes (temp file + rename)
- No batching allowed
"""
import json
import time
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional
import optuna
class RealtimeTrackingCallback:
"""
Optuna callback that writes tracking files after each trial.
Files Written (EVERY TRIAL):
- optimizer_state.json: Current strategy, phase, confidence
- strategy_history.json: Append-only log of all recommendations
- trial_log.json: Append-only log of all trials with timestamps
- landscape_snapshot.json: Latest landscape analysis (if available)
- confidence_history.json: Confidence scores over time
"""
def __init__(
self,
tracking_dir: Path,
optimizer_ref: Any, # Reference to IntelligentOptimizer instance
verbose: bool = True
):
"""
Initialize realtime tracking callback.
Args:
tracking_dir: Directory to write JSON files (intelligent_optimizer/)
optimizer_ref: Reference to parent IntelligentOptimizer for state access
verbose: Print status messages
"""
self.tracking_dir = Path(tracking_dir)
self.tracking_dir.mkdir(parents=True, exist_ok=True)
self.optimizer = optimizer_ref
self.verbose = verbose
# Initialize tracking files
self._initialize_files()
def _initialize_files(self):
"""Create initial empty tracking files."""
# Strategy history (append-only)
strategy_history_file = self.tracking_dir / "strategy_history.json"
if not strategy_history_file.exists():
self._atomic_write(strategy_history_file, [])
# Trial log (append-only)
trial_log_file = self.tracking_dir / "trial_log.json"
if not trial_log_file.exists():
self._atomic_write(trial_log_file, [])
# Confidence history (append-only)
confidence_file = self.tracking_dir / "confidence_history.json"
if not confidence_file.exists():
self._atomic_write(confidence_file, [])
def __call__(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
"""
Called after each trial completes.
Args:
study: Optuna study object
trial: Completed trial
"""
try:
# Skip if trial didn't complete successfully
if trial.state != optuna.trial.TrialState.COMPLETE:
return
# Write all tracking files
self._write_optimizer_state(study, trial)
self._write_trial_log(study, trial)
self._write_strategy_history(study, trial)
self._write_landscape_snapshot(study, trial)
self._write_confidence_history(study, trial)
if self.verbose:
print(f"[Realtime Tracking] Trial #{trial.number} logged to {self.tracking_dir}")
except Exception as e:
print(f"[Realtime Tracking] WARNING: Failed to write tracking files: {e}")
def _write_optimizer_state(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
"""Write current optimizer state."""
# [Protocol 11] For multi-objective, strategy is always NSGA-II
is_multi_objective = len(study.directions) > 1
if is_multi_objective:
# Multi-objective studies use NSGA-II, skip adaptive characterization
current_strategy = "NSGA-II"
current_phase = "multi_objective_optimization"
else:
# Single-objective uses intelligent strategy selection
current_strategy = getattr(self.optimizer, 'current_strategy', 'unknown')
current_phase = getattr(self.optimizer, 'current_phase', 'unknown')
state = {
"timestamp": datetime.now().isoformat(),
"trial_number": trial.number,
"total_trials": len(study.trials),
"current_phase": current_phase,
"current_strategy": current_strategy,
"is_multi_objective": is_multi_objective,
"study_directions": [str(d) for d in study.directions],
}
# Add latest strategy recommendation if available
if hasattr(self.optimizer, 'strategy_selector') and hasattr(self.optimizer.strategy_selector, 'recommendation_history'):
history = self.optimizer.strategy_selector.recommendation_history
if history:
latest = history[-1]
state["latest_recommendation"] = {
"strategy": latest.get("strategy", "unknown"),
"confidence": latest.get("confidence", 0.0),
"reasoning": latest.get("reasoning", "")
}
self._atomic_write(self.tracking_dir / "optimizer_state.json", state)
def _write_trial_log(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
"""Append trial to trial log."""
trial_log_file = self.tracking_dir / "trial_log.json"
# Read existing log
if trial_log_file.exists():
with open(trial_log_file, 'r') as f:
log = json.load(f)
else:
log = []
# [Protocol 11] Handle both single and multi-objective
is_multi_objective = len(study.directions) > 1
# Append new trial
trial_entry = {
"trial_number": trial.number,
"timestamp": datetime.now().isoformat(),
"state": str(trial.state),
"params": trial.params,
"duration_seconds": (trial.datetime_complete - trial.datetime_start).total_seconds() if trial.datetime_complete else None,
"user_attrs": dict(trial.user_attrs) if trial.user_attrs else {}
}
# Add objectives (Protocol 11 compliant)
if is_multi_objective:
trial_entry["values"] = trial.values if trial.values is not None else None
trial_entry["value"] = None # Not available
else:
trial_entry["value"] = trial.value if trial.value is not None else None
trial_entry["values"] = None
log.append(trial_entry)
self._atomic_write(trial_log_file, log)
def _write_strategy_history(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
"""Append strategy recommendation to history."""
if not hasattr(self.optimizer, 'strategy_selector'):
return
strategy_file = self.tracking_dir / "strategy_history.json"
# Read existing history
if strategy_file.exists():
with open(strategy_file, 'r') as f:
history = json.load(f)
else:
history = []
# Get latest recommendation from strategy selector
if hasattr(self.optimizer.strategy_selector, 'recommendation_history'):
selector_history = self.optimizer.strategy_selector.recommendation_history
if selector_history:
latest = selector_history[-1]
# Only append if this is a new recommendation (not duplicate)
if not history or history[-1].get('trial_number') != trial.number:
history.append({
"trial_number": trial.number,
"timestamp": datetime.now().isoformat(),
"strategy": latest.get("strategy", "unknown"),
"confidence": latest.get("confidence", 0.0),
"reasoning": latest.get("reasoning", "")
})
self._atomic_write(strategy_file, history)
def _write_landscape_snapshot(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
"""Write latest landscape analysis snapshot."""
if not hasattr(self.optimizer, 'landscape_cache'):
return
landscape = self.optimizer.landscape_cache
if landscape is None:
# Multi-objective - no landscape analysis
snapshot = {
"timestamp": datetime.now().isoformat(),
"trial_number": trial.number,
"ready": False,
"message": "Landscape analysis not supported for multi-objective optimization"
}
else:
snapshot = {
"timestamp": datetime.now().isoformat(),
"trial_number": trial.number,
**landscape
}
self._atomic_write(self.tracking_dir / "landscape_snapshot.json", snapshot)
def _write_confidence_history(self, study: optuna.Study, trial: optuna.trial.FrozenTrial):
"""Append confidence score to history."""
confidence_file = self.tracking_dir / "confidence_history.json"
# Read existing history
if confidence_file.exists():
with open(confidence_file, 'r') as f:
history = json.load(f)
else:
history = []
# Get confidence from latest recommendation
confidence = 0.0
if hasattr(self.optimizer, 'strategy_selector') and hasattr(self.optimizer.strategy_selector, 'recommendation_history'):
selector_history = self.optimizer.strategy_selector.recommendation_history
if selector_history:
confidence = selector_history[-1].get("confidence", 0.0)
history.append({
"trial_number": trial.number,
"timestamp": datetime.now().isoformat(),
"confidence": confidence
})
self._atomic_write(confidence_file, history)
def _atomic_write(self, filepath: Path, data: Any):
"""
Write JSON file atomically (temp file + rename).
This prevents dashboard from reading partial/corrupted files.
"""
temp_file = filepath.with_suffix('.tmp')
try:
with open(temp_file, 'w') as f:
json.dump(data, f, indent=2)
# Atomic rename
temp_file.replace(filepath)
except Exception as e:
if temp_file.exists():
temp_file.unlink()
raise e
def create_realtime_callback(tracking_dir: Path, optimizer_ref: Any, verbose: bool = True) -> RealtimeTrackingCallback:
"""
Factory function to create realtime tracking callback.
Usage in IntelligentOptimizer:
```python
callback = create_realtime_callback(self.tracking_dir, self, verbose=self.verbose)
self.study.optimize(objective_function, n_trials=n, callbacks=[callback])
```
"""
return RealtimeTrackingCallback(tracking_dir, optimizer_ref, verbose)