Files
Atomizer/optimization_engine/surrogate_tuner.py

801 lines
27 KiB
Python
Raw Permalink Normal View History

"""
Hyperparameter Tuning for Neural Network Surrogates
This module provides automatic hyperparameter optimization for MLP surrogates
using Optuna, with proper train/validation splits and early stopping.
Key Features:
1. Optuna-based hyperparameter search
2. K-fold cross-validation
3. Early stopping to prevent overfitting
4. Ensemble model support
5. Proper uncertainty quantification
Usage:
from optimization_engine.surrogate_tuner import SurrogateHyperparameterTuner
tuner = SurrogateHyperparameterTuner(
input_dim=11,
output_dim=3,
n_trials=50
)
best_config = tuner.tune(X_train, Y_train)
model = tuner.create_tuned_model(best_config)
"""
import logging
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from pathlib import Path
logger = logging.getLogger(__name__)
try:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
logger.warning("PyTorch not installed")
try:
import optuna
from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner
OPTUNA_AVAILABLE = True
except ImportError:
OPTUNA_AVAILABLE = False
logger.warning("Optuna not installed")
@dataclass
class SurrogateConfig:
"""Configuration for a tuned surrogate model."""
hidden_dims: List[int] = field(default_factory=lambda: [128, 256, 128])
dropout: float = 0.1
activation: str = 'relu'
use_batch_norm: bool = True
learning_rate: float = 1e-3
weight_decay: float = 1e-4
batch_size: int = 16
max_epochs: int = 500
early_stopping_patience: int = 30
# Normalization stats (filled during training)
input_mean: Optional[np.ndarray] = None
input_std: Optional[np.ndarray] = None
output_mean: Optional[np.ndarray] = None
output_std: Optional[np.ndarray] = None
# Validation metrics
val_loss: float = float('inf')
val_r2: Dict[str, float] = field(default_factory=dict)
class TunableMLP(nn.Module):
"""Flexible MLP with configurable architecture."""
def __init__(
self,
input_dim: int,
output_dim: int,
hidden_dims: List[int],
dropout: float = 0.1,
activation: str = 'relu',
use_batch_norm: bool = True
):
super().__init__()
self.input_dim = input_dim
self.output_dim = output_dim
# Activation function
activations = {
'relu': nn.ReLU(),
'leaky_relu': nn.LeakyReLU(0.1),
'elu': nn.ELU(),
'selu': nn.SELU(),
'gelu': nn.GELU(),
'swish': nn.SiLU()
}
act_fn = activations.get(activation, nn.ReLU())
# Build layers
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
if use_batch_norm:
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(act_fn)
if dropout > 0:
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
self._init_weights()
def _init_weights(self):
"""Initialize weights using Kaiming initialization."""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
def forward(self, x):
return self.network(x)
class EarlyStopping:
"""Early stopping to prevent overfitting."""
def __init__(self, patience: int = 20, min_delta: float = 1e-5):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = float('inf')
self.best_model_state = None
self.should_stop = False
def __call__(self, val_loss: float, model: nn.Module) -> bool:
if val_loss < self.best_loss - self.min_delta:
self.best_loss = val_loss
self.best_model_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
self.should_stop = True
return self.should_stop
def restore_best(self, model: nn.Module):
"""Restore model to best state."""
if self.best_model_state is not None:
model.load_state_dict(self.best_model_state)
class SurrogateHyperparameterTuner:
"""
Automatic hyperparameter tuning for neural network surrogates.
Uses Optuna for Bayesian optimization of:
- Network architecture (layers, widths)
- Regularization (dropout, weight decay)
- Learning rate and batch size
- Activation functions
"""
def __init__(
self,
input_dim: int,
output_dim: int,
n_trials: int = 50,
n_cv_folds: int = 5,
device: str = 'auto',
seed: int = 42,
timeout_seconds: Optional[int] = None
):
"""
Initialize hyperparameter tuner.
Args:
input_dim: Number of input features (design variables)
output_dim: Number of outputs (objectives)
n_trials: Number of Optuna trials for hyperparameter search
n_cv_folds: Number of cross-validation folds
device: Computing device ('cuda', 'cpu', or 'auto')
seed: Random seed for reproducibility
timeout_seconds: Optional timeout for tuning
"""
if not TORCH_AVAILABLE:
raise ImportError("PyTorch required for surrogate tuning")
if not OPTUNA_AVAILABLE:
raise ImportError("Optuna required for hyperparameter tuning")
self.input_dim = input_dim
self.output_dim = output_dim
self.n_trials = n_trials
self.n_cv_folds = n_cv_folds
self.seed = seed
self.timeout = timeout_seconds
if device == 'auto':
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
self.device = torch.device(device)
self.best_config: Optional[SurrogateConfig] = None
self.study: Optional[optuna.Study] = None
# Set seeds
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
def _suggest_hyperparameters(self, trial: optuna.Trial) -> SurrogateConfig:
"""Suggest hyperparameters for a trial."""
# Architecture
n_layers = trial.suggest_int('n_layers', 2, 5)
hidden_dims = []
for i in range(n_layers):
dim = trial.suggest_int(f'hidden_dim_{i}', 32, 512, step=32)
hidden_dims.append(dim)
# Regularization
dropout = trial.suggest_float('dropout', 0.0, 0.5)
weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)
# Training
learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
batch_size = trial.suggest_categorical('batch_size', [8, 16, 32, 64])
# Activation
activation = trial.suggest_categorical('activation',
['relu', 'leaky_relu', 'elu', 'gelu', 'swish'])
# Batch norm
use_batch_norm = trial.suggest_categorical('use_batch_norm', [True, False])
return SurrogateConfig(
hidden_dims=hidden_dims,
dropout=dropout,
activation=activation,
use_batch_norm=use_batch_norm,
learning_rate=learning_rate,
weight_decay=weight_decay,
batch_size=batch_size
)
def _train_fold(
self,
config: SurrogateConfig,
X_train: np.ndarray,
Y_train: np.ndarray,
X_val: np.ndarray,
Y_val: np.ndarray,
trial: Optional[optuna.Trial] = None
) -> Tuple[float, Dict[str, float]]:
"""Train model on one fold and return validation metrics."""
# Create model
model = TunableMLP(
input_dim=self.input_dim,
output_dim=self.output_dim,
hidden_dims=config.hidden_dims,
dropout=config.dropout,
activation=config.activation,
use_batch_norm=config.use_batch_norm
).to(self.device)
# Prepare data
X_train_t = torch.tensor(X_train, dtype=torch.float32, device=self.device)
Y_train_t = torch.tensor(Y_train, dtype=torch.float32, device=self.device)
X_val_t = torch.tensor(X_val, dtype=torch.float32, device=self.device)
Y_val_t = torch.tensor(Y_val, dtype=torch.float32, device=self.device)
train_dataset = TensorDataset(X_train_t, Y_train_t)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
# Optimizer and scheduler
optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay
)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=config.max_epochs
)
early_stopping = EarlyStopping(patience=config.early_stopping_patience)
# Training loop
for epoch in range(config.max_epochs):
model.train()
for X_batch, Y_batch in train_loader:
optimizer.zero_grad()
pred = model(X_batch)
loss = nn.functional.mse_loss(pred, Y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
# Validation
model.eval()
with torch.no_grad():
val_pred = model(X_val_t)
val_loss = nn.functional.mse_loss(val_pred, Y_val_t).item()
# Early stopping
if early_stopping(val_loss, model):
break
# Optuna pruning (only report once per epoch across all folds)
if trial is not None and epoch % 10 == 0:
trial.report(val_loss, epoch // 10)
if trial.should_prune():
raise optuna.TrialPruned()
# Restore best model
early_stopping.restore_best(model)
# Final validation metrics
model.eval()
with torch.no_grad():
val_pred = model(X_val_t).cpu().numpy()
Y_val_np = Y_val_t.cpu().numpy()
val_loss = float(np.mean((val_pred - Y_val_np) ** 2))
# R² per output
r2_scores = {}
for i in range(self.output_dim):
ss_res = np.sum((Y_val_np[:, i] - val_pred[:, i]) ** 2)
ss_tot = np.sum((Y_val_np[:, i] - Y_val_np[:, i].mean()) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
r2_scores[f'output_{i}'] = r2
return val_loss, r2_scores
def _cross_validate(
self,
config: SurrogateConfig,
X: np.ndarray,
Y: np.ndarray,
trial: Optional[optuna.Trial] = None
) -> Tuple[float, Dict[str, float]]:
"""Perform k-fold cross-validation."""
n_samples = len(X)
indices = np.random.permutation(n_samples)
fold_size = n_samples // self.n_cv_folds
fold_losses = []
fold_r2s = {f'output_{i}': [] for i in range(self.output_dim)}
for fold in range(self.n_cv_folds):
# Split indices
val_start = fold * fold_size
val_end = val_start + fold_size if fold < self.n_cv_folds - 1 else n_samples
val_indices = indices[val_start:val_end]
train_indices = np.concatenate([indices[:val_start], indices[val_end:]])
X_train, Y_train = X[train_indices], Y[train_indices]
X_val, Y_val = X[val_indices], Y[val_indices]
# Skip fold if too few samples
if len(X_train) < 10 or len(X_val) < 2:
continue
val_loss, r2_scores = self._train_fold(
config, X_train, Y_train, X_val, Y_val, trial
)
fold_losses.append(val_loss)
for key, val in r2_scores.items():
fold_r2s[key].append(val)
mean_loss = np.mean(fold_losses)
mean_r2 = {k: np.mean(v) for k, v in fold_r2s.items()}
return mean_loss, mean_r2
def tune(
self,
X: np.ndarray,
Y: np.ndarray,
output_names: Optional[List[str]] = None
) -> SurrogateConfig:
"""
Tune hyperparameters using Optuna.
Args:
X: Input features [n_samples, input_dim]
Y: Outputs [n_samples, output_dim]
output_names: Optional names for outputs (for logging)
Returns:
Best SurrogateConfig found
"""
logger.info(f"Starting hyperparameter tuning with {self.n_trials} trials...")
logger.info(f"Data: {len(X)} samples, {self.n_cv_folds}-fold CV")
# Normalize data
self.input_mean = X.mean(axis=0)
self.input_std = X.std(axis=0) + 1e-8
self.output_mean = Y.mean(axis=0)
self.output_std = Y.std(axis=0) + 1e-8
X_norm = (X - self.input_mean) / self.input_std
Y_norm = (Y - self.output_mean) / self.output_std
def objective(trial: optuna.Trial) -> float:
config = self._suggest_hyperparameters(trial)
val_loss, r2_scores = self._cross_validate(config, X_norm, Y_norm, trial)
# Log R² scores
for key, val in r2_scores.items():
trial.set_user_attr(f'r2_{key}', val)
return val_loss
# Create study
self.study = optuna.create_study(
direction='minimize',
sampler=TPESampler(seed=self.seed, n_startup_trials=10),
pruner=MedianPruner(n_startup_trials=5, n_warmup_steps=20)
)
self.study.optimize(
objective,
n_trials=self.n_trials,
timeout=self.timeout,
show_progress_bar=True,
catch=(RuntimeError,) # Catch GPU OOM errors
)
# Build best config
best_trial = self.study.best_trial
self.best_config = self._suggest_hyperparameters_from_params(best_trial.params)
self.best_config.val_loss = best_trial.value
self.best_config.val_r2 = {
k.replace('r2_', ''): v
for k, v in best_trial.user_attrs.items()
if k.startswith('r2_')
}
# Store normalization
self.best_config.input_mean = self.input_mean
self.best_config.input_std = self.input_std
self.best_config.output_mean = self.output_mean
self.best_config.output_std = self.output_std
# Log results
logger.info(f"\nBest hyperparameters found:")
logger.info(f" Hidden dims: {self.best_config.hidden_dims}")
logger.info(f" Dropout: {self.best_config.dropout:.3f}")
logger.info(f" Activation: {self.best_config.activation}")
logger.info(f" Batch norm: {self.best_config.use_batch_norm}")
logger.info(f" Learning rate: {self.best_config.learning_rate:.2e}")
logger.info(f" Weight decay: {self.best_config.weight_decay:.2e}")
logger.info(f" Batch size: {self.best_config.batch_size}")
logger.info(f" Validation loss: {self.best_config.val_loss:.6f}")
if output_names:
for i, name in enumerate(output_names):
r2 = self.best_config.val_r2.get(f'output_{i}', 0)
logger.info(f" {name} R² (CV): {r2:.4f}")
return self.best_config
def _suggest_hyperparameters_from_params(self, params: Dict[str, Any]) -> SurrogateConfig:
"""Reconstruct config from Optuna params dict."""
n_layers = params['n_layers']
hidden_dims = [params[f'hidden_dim_{i}'] for i in range(n_layers)]
return SurrogateConfig(
hidden_dims=hidden_dims,
dropout=params['dropout'],
activation=params['activation'],
use_batch_norm=params['use_batch_norm'],
learning_rate=params['learning_rate'],
weight_decay=params['weight_decay'],
batch_size=params['batch_size']
)
def create_tuned_model(
self,
config: Optional[SurrogateConfig] = None
) -> TunableMLP:
"""Create a model with tuned hyperparameters."""
if config is None:
config = self.best_config
if config is None:
raise ValueError("No config available. Run tune() first.")
return TunableMLP(
input_dim=self.input_dim,
output_dim=self.output_dim,
hidden_dims=config.hidden_dims,
dropout=config.dropout,
activation=config.activation,
use_batch_norm=config.use_batch_norm
)
class TunedEnsembleSurrogate:
"""
Ensemble of tuned surrogate models for better uncertainty quantification.
Trains multiple models with different random seeds and aggregates predictions.
"""
def __init__(
self,
config: SurrogateConfig,
input_dim: int,
output_dim: int,
n_models: int = 5,
device: str = 'auto'
):
"""
Initialize ensemble surrogate.
Args:
config: Tuned configuration to use for all models
input_dim: Number of input features
output_dim: Number of outputs
n_models: Number of models in ensemble
device: Computing device
"""
self.config = config
self.input_dim = input_dim
self.output_dim = output_dim
self.n_models = n_models
if device == 'auto':
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
self.device = torch.device(device)
self.models: List[TunableMLP] = []
self.trained = False
def train(self, X: np.ndarray, Y: np.ndarray, val_split: float = 0.2):
"""Train all models in the ensemble."""
logger.info(f"Training ensemble of {self.n_models} models...")
# Normalize using config stats
X_norm = (X - self.config.input_mean) / self.config.input_std
Y_norm = (Y - self.config.output_mean) / self.config.output_std
# Split data
n_val = int(len(X) * val_split)
indices = np.random.permutation(len(X))
train_idx, val_idx = indices[n_val:], indices[:n_val]
X_train, Y_train = X_norm[train_idx], Y_norm[train_idx]
X_val, Y_val = X_norm[val_idx], Y_norm[val_idx]
X_train_t = torch.tensor(X_train, dtype=torch.float32, device=self.device)
Y_train_t = torch.tensor(Y_train, dtype=torch.float32, device=self.device)
X_val_t = torch.tensor(X_val, dtype=torch.float32, device=self.device)
Y_val_t = torch.tensor(Y_val, dtype=torch.float32, device=self.device)
train_dataset = TensorDataset(X_train_t, Y_train_t)
self.models = []
for i in range(self.n_models):
torch.manual_seed(42 + i)
model = TunableMLP(
input_dim=self.input_dim,
output_dim=self.output_dim,
hidden_dims=self.config.hidden_dims,
dropout=self.config.dropout,
activation=self.config.activation,
use_batch_norm=self.config.use_batch_norm
).to(self.device)
optimizer = torch.optim.AdamW(
model.parameters(),
lr=self.config.learning_rate,
weight_decay=self.config.weight_decay
)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=self.config.max_epochs
)
train_loader = DataLoader(
train_dataset,
batch_size=self.config.batch_size,
shuffle=True
)
early_stopping = EarlyStopping(patience=self.config.early_stopping_patience)
for epoch in range(self.config.max_epochs):
model.train()
for X_batch, Y_batch in train_loader:
optimizer.zero_grad()
pred = model(X_batch)
loss = nn.functional.mse_loss(pred, Y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
model.eval()
with torch.no_grad():
val_pred = model(X_val_t)
val_loss = nn.functional.mse_loss(val_pred, Y_val_t).item()
if early_stopping(val_loss, model):
break
early_stopping.restore_best(model)
model.eval()
self.models.append(model)
logger.info(f" Model {i+1}/{self.n_models}: val_loss = {early_stopping.best_loss:.6f}")
self.trained = True
logger.info("Ensemble training complete")
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict with uncertainty estimation.
Args:
X: Input features [n_samples, input_dim]
Returns:
Tuple of (mean_predictions, std_predictions)
"""
if not self.trained:
raise RuntimeError("Ensemble not trained. Call train() first.")
# Normalize input
X_norm = (X - self.config.input_mean) / self.config.input_std
X_t = torch.tensor(X_norm, dtype=torch.float32, device=self.device)
# Collect predictions from all models
predictions = []
for model in self.models:
model.eval()
with torch.no_grad():
pred = model(X_t).cpu().numpy()
# Denormalize
pred = pred * self.config.output_std + self.config.output_mean
predictions.append(pred)
predictions = np.array(predictions) # [n_models, n_samples, output_dim]
mean_pred = predictions.mean(axis=0)
std_pred = predictions.std(axis=0)
return mean_pred, std_pred
def predict_single(self, params: Dict[str, float], var_names: List[str]) -> Tuple[Dict[str, float], float]:
"""
Predict for a single point with uncertainty.
Args:
params: Dictionary of input parameters
var_names: List of variable names in order
Returns:
Tuple of (predictions dict, total uncertainty)
"""
X = np.array([[params[name] for name in var_names]])
mean, std = self.predict(X)
pred_dict = {f'output_{i}': mean[0, i] for i in range(self.output_dim)}
uncertainty = float(np.sum(std[0]))
return pred_dict, uncertainty
def save(self, path: Path):
"""Save ensemble to disk."""
state = {
'config': {
'hidden_dims': self.config.hidden_dims,
'dropout': self.config.dropout,
'activation': self.config.activation,
'use_batch_norm': self.config.use_batch_norm,
'learning_rate': self.config.learning_rate,
'weight_decay': self.config.weight_decay,
'batch_size': self.config.batch_size,
'input_mean': self.config.input_mean.tolist(),
'input_std': self.config.input_std.tolist(),
'output_mean': self.config.output_mean.tolist(),
'output_std': self.config.output_std.tolist(),
},
'n_models': self.n_models,
'model_states': [m.state_dict() for m in self.models]
}
torch.save(state, path)
logger.info(f"Saved ensemble to {path}")
def load(self, path: Path):
"""Load ensemble from disk."""
state = torch.load(path, map_location=self.device)
# Restore config
cfg = state['config']
self.config = SurrogateConfig(
hidden_dims=cfg['hidden_dims'],
dropout=cfg['dropout'],
activation=cfg['activation'],
use_batch_norm=cfg['use_batch_norm'],
learning_rate=cfg['learning_rate'],
weight_decay=cfg['weight_decay'],
batch_size=cfg['batch_size'],
input_mean=np.array(cfg['input_mean']),
input_std=np.array(cfg['input_std']),
output_mean=np.array(cfg['output_mean']),
output_std=np.array(cfg['output_std'])
)
self.n_models = state['n_models']
self.models = []
for model_state in state['model_states']:
model = TunableMLP(
input_dim=self.input_dim,
output_dim=self.output_dim,
hidden_dims=self.config.hidden_dims,
dropout=self.config.dropout,
activation=self.config.activation,
use_batch_norm=self.config.use_batch_norm
).to(self.device)
model.load_state_dict(model_state)
model.eval()
self.models.append(model)
self.trained = True
logger.info(f"Loaded ensemble with {self.n_models} models from {path}")
def tune_surrogate_for_study(
fea_data: List[Dict],
design_var_names: List[str],
objective_names: List[str],
n_tuning_trials: int = 50,
n_ensemble_models: int = 5
) -> TunedEnsembleSurrogate:
"""
Convenience function to tune and create ensemble surrogate.
Args:
fea_data: List of FEA results with 'params' and 'objectives' keys
design_var_names: List of design variable names
objective_names: List of objective names
n_tuning_trials: Number of Optuna trials
n_ensemble_models: Number of models in ensemble
Returns:
Trained TunedEnsembleSurrogate
"""
# Prepare data
X = np.array([[d['params'][name] for name in design_var_names] for d in fea_data])
Y = np.array([[d['objectives'][name] for name in objective_names] for d in fea_data])
logger.info(f"Tuning surrogate on {len(X)} samples...")
logger.info(f"Input: {len(design_var_names)} design variables")
logger.info(f"Output: {len(objective_names)} objectives")
# Tune hyperparameters
tuner = SurrogateHyperparameterTuner(
input_dim=len(design_var_names),
output_dim=len(objective_names),
n_trials=n_tuning_trials,
n_cv_folds=5
)
best_config = tuner.tune(X, Y, output_names=objective_names)
# Create and train ensemble
ensemble = TunedEnsembleSurrogate(
config=best_config,
input_dim=len(design_var_names),
output_dim=len(objective_names),
n_models=n_ensemble_models
)
ensemble.train(X, Y)
return ensemble