""" Hyperparameter Tuning for Neural Network Surrogates This module provides automatic hyperparameter optimization for MLP surrogates using Optuna, with proper train/validation splits and early stopping. Key Features: 1. Optuna-based hyperparameter search 2. K-fold cross-validation 3. Early stopping to prevent overfitting 4. Ensemble model support 5. Proper uncertainty quantification Usage: from optimization_engine.surrogate_tuner import SurrogateHyperparameterTuner tuner = SurrogateHyperparameterTuner( input_dim=11, output_dim=3, n_trials=50 ) best_config = tuner.tune(X_train, Y_train) model = tuner.create_tuned_model(best_config) """ import logging import numpy as np from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) try: import torch import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False logger.warning("PyTorch not installed") try: import optuna from optuna.samplers import TPESampler from optuna.pruners import MedianPruner OPTUNA_AVAILABLE = True except ImportError: OPTUNA_AVAILABLE = False logger.warning("Optuna not installed") @dataclass class SurrogateConfig: """Configuration for a tuned surrogate model.""" hidden_dims: List[int] = field(default_factory=lambda: [128, 256, 128]) dropout: float = 0.1 activation: str = 'relu' use_batch_norm: bool = True learning_rate: float = 1e-3 weight_decay: float = 1e-4 batch_size: int = 16 max_epochs: int = 500 early_stopping_patience: int = 30 # Normalization stats (filled during training) input_mean: Optional[np.ndarray] = None input_std: Optional[np.ndarray] = None output_mean: Optional[np.ndarray] = None output_std: Optional[np.ndarray] = None # Validation metrics val_loss: float = float('inf') val_r2: Dict[str, float] = field(default_factory=dict) class TunableMLP(nn.Module): """Flexible MLP with configurable architecture.""" def __init__( self, input_dim: int, output_dim: int, hidden_dims: List[int], dropout: float = 0.1, activation: str = 'relu', use_batch_norm: bool = True ): super().__init__() self.input_dim = input_dim self.output_dim = output_dim # Activation function activations = { 'relu': nn.ReLU(), 'leaky_relu': nn.LeakyReLU(0.1), 'elu': nn.ELU(), 'selu': nn.SELU(), 'gelu': nn.GELU(), 'swish': nn.SiLU() } act_fn = activations.get(activation, nn.ReLU()) # Build layers layers = [] prev_dim = input_dim for hidden_dim in hidden_dims: layers.append(nn.Linear(prev_dim, hidden_dim)) if use_batch_norm: layers.append(nn.BatchNorm1d(hidden_dim)) layers.append(act_fn) if dropout > 0: layers.append(nn.Dropout(dropout)) prev_dim = hidden_dim layers.append(nn.Linear(prev_dim, output_dim)) self.network = nn.Sequential(*layers) self._init_weights() def _init_weights(self): """Initialize weights using Kaiming initialization.""" for m in self.modules(): if isinstance(m, nn.Linear): nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu') if m.bias is not None: nn.init.constant_(m.bias, 0) def forward(self, x): return self.network(x) class EarlyStopping: """Early stopping to prevent overfitting.""" def __init__(self, patience: int = 20, min_delta: float = 1e-5): self.patience = patience self.min_delta = min_delta self.counter = 0 self.best_loss = float('inf') self.best_model_state = None self.should_stop = False def __call__(self, val_loss: float, model: nn.Module) -> bool: if val_loss < self.best_loss - self.min_delta: self.best_loss = val_loss self.best_model_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} self.counter = 0 else: self.counter += 1 if self.counter >= self.patience: self.should_stop = True return self.should_stop def restore_best(self, model: nn.Module): """Restore model to best state.""" if self.best_model_state is not None: model.load_state_dict(self.best_model_state) class SurrogateHyperparameterTuner: """ Automatic hyperparameter tuning for neural network surrogates. Uses Optuna for Bayesian optimization of: - Network architecture (layers, widths) - Regularization (dropout, weight decay) - Learning rate and batch size - Activation functions """ def __init__( self, input_dim: int, output_dim: int, n_trials: int = 50, n_cv_folds: int = 5, device: str = 'auto', seed: int = 42, timeout_seconds: Optional[int] = None ): """ Initialize hyperparameter tuner. Args: input_dim: Number of input features (design variables) output_dim: Number of outputs (objectives) n_trials: Number of Optuna trials for hyperparameter search n_cv_folds: Number of cross-validation folds device: Computing device ('cuda', 'cpu', or 'auto') seed: Random seed for reproducibility timeout_seconds: Optional timeout for tuning """ if not TORCH_AVAILABLE: raise ImportError("PyTorch required for surrogate tuning") if not OPTUNA_AVAILABLE: raise ImportError("Optuna required for hyperparameter tuning") self.input_dim = input_dim self.output_dim = output_dim self.n_trials = n_trials self.n_cv_folds = n_cv_folds self.seed = seed self.timeout = timeout_seconds if device == 'auto': self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') else: self.device = torch.device(device) self.best_config: Optional[SurrogateConfig] = None self.study: Optional[optuna.Study] = None # Set seeds np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) def _suggest_hyperparameters(self, trial: optuna.Trial) -> SurrogateConfig: """Suggest hyperparameters for a trial.""" # Architecture n_layers = trial.suggest_int('n_layers', 2, 5) hidden_dims = [] for i in range(n_layers): dim = trial.suggest_int(f'hidden_dim_{i}', 32, 512, step=32) hidden_dims.append(dim) # Regularization dropout = trial.suggest_float('dropout', 0.0, 0.5) weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True) # Training learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True) batch_size = trial.suggest_categorical('batch_size', [8, 16, 32, 64]) # Activation activation = trial.suggest_categorical('activation', ['relu', 'leaky_relu', 'elu', 'gelu', 'swish']) # Batch norm use_batch_norm = trial.suggest_categorical('use_batch_norm', [True, False]) return SurrogateConfig( hidden_dims=hidden_dims, dropout=dropout, activation=activation, use_batch_norm=use_batch_norm, learning_rate=learning_rate, weight_decay=weight_decay, batch_size=batch_size ) def _train_fold( self, config: SurrogateConfig, X_train: np.ndarray, Y_train: np.ndarray, X_val: np.ndarray, Y_val: np.ndarray, trial: Optional[optuna.Trial] = None ) -> Tuple[float, Dict[str, float]]: """Train model on one fold and return validation metrics.""" # Create model model = TunableMLP( input_dim=self.input_dim, output_dim=self.output_dim, hidden_dims=config.hidden_dims, dropout=config.dropout, activation=config.activation, use_batch_norm=config.use_batch_norm ).to(self.device) # Prepare data X_train_t = torch.tensor(X_train, dtype=torch.float32, device=self.device) Y_train_t = torch.tensor(Y_train, dtype=torch.float32, device=self.device) X_val_t = torch.tensor(X_val, dtype=torch.float32, device=self.device) Y_val_t = torch.tensor(Y_val, dtype=torch.float32, device=self.device) train_dataset = TensorDataset(X_train_t, Y_train_t) train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True) # Optimizer and scheduler optimizer = torch.optim.AdamW( model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay ) scheduler = torch.optim.lr_scheduler.CosineAnnealingLR( optimizer, T_max=config.max_epochs ) early_stopping = EarlyStopping(patience=config.early_stopping_patience) # Training loop for epoch in range(config.max_epochs): model.train() for X_batch, Y_batch in train_loader: optimizer.zero_grad() pred = model(X_batch) loss = nn.functional.mse_loss(pred, Y_batch) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() scheduler.step() # Validation model.eval() with torch.no_grad(): val_pred = model(X_val_t) val_loss = nn.functional.mse_loss(val_pred, Y_val_t).item() # Early stopping if early_stopping(val_loss, model): break # Optuna pruning (only report once per epoch across all folds) if trial is not None and epoch % 10 == 0: trial.report(val_loss, epoch // 10) if trial.should_prune(): raise optuna.TrialPruned() # Restore best model early_stopping.restore_best(model) # Final validation metrics model.eval() with torch.no_grad(): val_pred = model(X_val_t).cpu().numpy() Y_val_np = Y_val_t.cpu().numpy() val_loss = float(np.mean((val_pred - Y_val_np) ** 2)) # R² per output r2_scores = {} for i in range(self.output_dim): ss_res = np.sum((Y_val_np[:, i] - val_pred[:, i]) ** 2) ss_tot = np.sum((Y_val_np[:, i] - Y_val_np[:, i].mean()) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 r2_scores[f'output_{i}'] = r2 return val_loss, r2_scores def _cross_validate( self, config: SurrogateConfig, X: np.ndarray, Y: np.ndarray, trial: Optional[optuna.Trial] = None ) -> Tuple[float, Dict[str, float]]: """Perform k-fold cross-validation.""" n_samples = len(X) indices = np.random.permutation(n_samples) fold_size = n_samples // self.n_cv_folds fold_losses = [] fold_r2s = {f'output_{i}': [] for i in range(self.output_dim)} for fold in range(self.n_cv_folds): # Split indices val_start = fold * fold_size val_end = val_start + fold_size if fold < self.n_cv_folds - 1 else n_samples val_indices = indices[val_start:val_end] train_indices = np.concatenate([indices[:val_start], indices[val_end:]]) X_train, Y_train = X[train_indices], Y[train_indices] X_val, Y_val = X[val_indices], Y[val_indices] # Skip fold if too few samples if len(X_train) < 10 or len(X_val) < 2: continue val_loss, r2_scores = self._train_fold( config, X_train, Y_train, X_val, Y_val, trial ) fold_losses.append(val_loss) for key, val in r2_scores.items(): fold_r2s[key].append(val) mean_loss = np.mean(fold_losses) mean_r2 = {k: np.mean(v) for k, v in fold_r2s.items()} return mean_loss, mean_r2 def tune( self, X: np.ndarray, Y: np.ndarray, output_names: Optional[List[str]] = None ) -> SurrogateConfig: """ Tune hyperparameters using Optuna. Args: X: Input features [n_samples, input_dim] Y: Outputs [n_samples, output_dim] output_names: Optional names for outputs (for logging) Returns: Best SurrogateConfig found """ logger.info(f"Starting hyperparameter tuning with {self.n_trials} trials...") logger.info(f"Data: {len(X)} samples, {self.n_cv_folds}-fold CV") # Normalize data self.input_mean = X.mean(axis=0) self.input_std = X.std(axis=0) + 1e-8 self.output_mean = Y.mean(axis=0) self.output_std = Y.std(axis=0) + 1e-8 X_norm = (X - self.input_mean) / self.input_std Y_norm = (Y - self.output_mean) / self.output_std def objective(trial: optuna.Trial) -> float: config = self._suggest_hyperparameters(trial) val_loss, r2_scores = self._cross_validate(config, X_norm, Y_norm, trial) # Log R² scores for key, val in r2_scores.items(): trial.set_user_attr(f'r2_{key}', val) return val_loss # Create study self.study = optuna.create_study( direction='minimize', sampler=TPESampler(seed=self.seed, n_startup_trials=10), pruner=MedianPruner(n_startup_trials=5, n_warmup_steps=20) ) self.study.optimize( objective, n_trials=self.n_trials, timeout=self.timeout, show_progress_bar=True, catch=(RuntimeError,) # Catch GPU OOM errors ) # Build best config best_trial = self.study.best_trial self.best_config = self._suggest_hyperparameters_from_params(best_trial.params) self.best_config.val_loss = best_trial.value self.best_config.val_r2 = { k.replace('r2_', ''): v for k, v in best_trial.user_attrs.items() if k.startswith('r2_') } # Store normalization self.best_config.input_mean = self.input_mean self.best_config.input_std = self.input_std self.best_config.output_mean = self.output_mean self.best_config.output_std = self.output_std # Log results logger.info(f"\nBest hyperparameters found:") logger.info(f" Hidden dims: {self.best_config.hidden_dims}") logger.info(f" Dropout: {self.best_config.dropout:.3f}") logger.info(f" Activation: {self.best_config.activation}") logger.info(f" Batch norm: {self.best_config.use_batch_norm}") logger.info(f" Learning rate: {self.best_config.learning_rate:.2e}") logger.info(f" Weight decay: {self.best_config.weight_decay:.2e}") logger.info(f" Batch size: {self.best_config.batch_size}") logger.info(f" Validation loss: {self.best_config.val_loss:.6f}") if output_names: for i, name in enumerate(output_names): r2 = self.best_config.val_r2.get(f'output_{i}', 0) logger.info(f" {name} R² (CV): {r2:.4f}") return self.best_config def _suggest_hyperparameters_from_params(self, params: Dict[str, Any]) -> SurrogateConfig: """Reconstruct config from Optuna params dict.""" n_layers = params['n_layers'] hidden_dims = [params[f'hidden_dim_{i}'] for i in range(n_layers)] return SurrogateConfig( hidden_dims=hidden_dims, dropout=params['dropout'], activation=params['activation'], use_batch_norm=params['use_batch_norm'], learning_rate=params['learning_rate'], weight_decay=params['weight_decay'], batch_size=params['batch_size'] ) def create_tuned_model( self, config: Optional[SurrogateConfig] = None ) -> TunableMLP: """Create a model with tuned hyperparameters.""" if config is None: config = self.best_config if config is None: raise ValueError("No config available. Run tune() first.") return TunableMLP( input_dim=self.input_dim, output_dim=self.output_dim, hidden_dims=config.hidden_dims, dropout=config.dropout, activation=config.activation, use_batch_norm=config.use_batch_norm ) class TunedEnsembleSurrogate: """ Ensemble of tuned surrogate models for better uncertainty quantification. Trains multiple models with different random seeds and aggregates predictions. """ def __init__( self, config: SurrogateConfig, input_dim: int, output_dim: int, n_models: int = 5, device: str = 'auto' ): """ Initialize ensemble surrogate. Args: config: Tuned configuration to use for all models input_dim: Number of input features output_dim: Number of outputs n_models: Number of models in ensemble device: Computing device """ self.config = config self.input_dim = input_dim self.output_dim = output_dim self.n_models = n_models if device == 'auto': self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') else: self.device = torch.device(device) self.models: List[TunableMLP] = [] self.trained = False def train(self, X: np.ndarray, Y: np.ndarray, val_split: float = 0.2): """Train all models in the ensemble.""" logger.info(f"Training ensemble of {self.n_models} models...") # Normalize using config stats X_norm = (X - self.config.input_mean) / self.config.input_std Y_norm = (Y - self.config.output_mean) / self.config.output_std # Split data n_val = int(len(X) * val_split) indices = np.random.permutation(len(X)) train_idx, val_idx = indices[n_val:], indices[:n_val] X_train, Y_train = X_norm[train_idx], Y_norm[train_idx] X_val, Y_val = X_norm[val_idx], Y_norm[val_idx] X_train_t = torch.tensor(X_train, dtype=torch.float32, device=self.device) Y_train_t = torch.tensor(Y_train, dtype=torch.float32, device=self.device) X_val_t = torch.tensor(X_val, dtype=torch.float32, device=self.device) Y_val_t = torch.tensor(Y_val, dtype=torch.float32, device=self.device) train_dataset = TensorDataset(X_train_t, Y_train_t) self.models = [] for i in range(self.n_models): torch.manual_seed(42 + i) model = TunableMLP( input_dim=self.input_dim, output_dim=self.output_dim, hidden_dims=self.config.hidden_dims, dropout=self.config.dropout, activation=self.config.activation, use_batch_norm=self.config.use_batch_norm ).to(self.device) optimizer = torch.optim.AdamW( model.parameters(), lr=self.config.learning_rate, weight_decay=self.config.weight_decay ) scheduler = torch.optim.lr_scheduler.CosineAnnealingLR( optimizer, T_max=self.config.max_epochs ) train_loader = DataLoader( train_dataset, batch_size=self.config.batch_size, shuffle=True ) early_stopping = EarlyStopping(patience=self.config.early_stopping_patience) for epoch in range(self.config.max_epochs): model.train() for X_batch, Y_batch in train_loader: optimizer.zero_grad() pred = model(X_batch) loss = nn.functional.mse_loss(pred, Y_batch) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() scheduler.step() model.eval() with torch.no_grad(): val_pred = model(X_val_t) val_loss = nn.functional.mse_loss(val_pred, Y_val_t).item() if early_stopping(val_loss, model): break early_stopping.restore_best(model) model.eval() self.models.append(model) logger.info(f" Model {i+1}/{self.n_models}: val_loss = {early_stopping.best_loss:.6f}") self.trained = True logger.info("Ensemble training complete") def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Predict with uncertainty estimation. Args: X: Input features [n_samples, input_dim] Returns: Tuple of (mean_predictions, std_predictions) """ if not self.trained: raise RuntimeError("Ensemble not trained. Call train() first.") # Normalize input X_norm = (X - self.config.input_mean) / self.config.input_std X_t = torch.tensor(X_norm, dtype=torch.float32, device=self.device) # Collect predictions from all models predictions = [] for model in self.models: model.eval() with torch.no_grad(): pred = model(X_t).cpu().numpy() # Denormalize pred = pred * self.config.output_std + self.config.output_mean predictions.append(pred) predictions = np.array(predictions) # [n_models, n_samples, output_dim] mean_pred = predictions.mean(axis=0) std_pred = predictions.std(axis=0) return mean_pred, std_pred def predict_single(self, params: Dict[str, float], var_names: List[str]) -> Tuple[Dict[str, float], float]: """ Predict for a single point with uncertainty. Args: params: Dictionary of input parameters var_names: List of variable names in order Returns: Tuple of (predictions dict, total uncertainty) """ X = np.array([[params[name] for name in var_names]]) mean, std = self.predict(X) pred_dict = {f'output_{i}': mean[0, i] for i in range(self.output_dim)} uncertainty = float(np.sum(std[0])) return pred_dict, uncertainty def save(self, path: Path): """Save ensemble to disk.""" state = { 'config': { 'hidden_dims': self.config.hidden_dims, 'dropout': self.config.dropout, 'activation': self.config.activation, 'use_batch_norm': self.config.use_batch_norm, 'learning_rate': self.config.learning_rate, 'weight_decay': self.config.weight_decay, 'batch_size': self.config.batch_size, 'input_mean': self.config.input_mean.tolist(), 'input_std': self.config.input_std.tolist(), 'output_mean': self.config.output_mean.tolist(), 'output_std': self.config.output_std.tolist(), }, 'n_models': self.n_models, 'model_states': [m.state_dict() for m in self.models] } torch.save(state, path) logger.info(f"Saved ensemble to {path}") def load(self, path: Path): """Load ensemble from disk.""" state = torch.load(path, map_location=self.device) # Restore config cfg = state['config'] self.config = SurrogateConfig( hidden_dims=cfg['hidden_dims'], dropout=cfg['dropout'], activation=cfg['activation'], use_batch_norm=cfg['use_batch_norm'], learning_rate=cfg['learning_rate'], weight_decay=cfg['weight_decay'], batch_size=cfg['batch_size'], input_mean=np.array(cfg['input_mean']), input_std=np.array(cfg['input_std']), output_mean=np.array(cfg['output_mean']), output_std=np.array(cfg['output_std']) ) self.n_models = state['n_models'] self.models = [] for model_state in state['model_states']: model = TunableMLP( input_dim=self.input_dim, output_dim=self.output_dim, hidden_dims=self.config.hidden_dims, dropout=self.config.dropout, activation=self.config.activation, use_batch_norm=self.config.use_batch_norm ).to(self.device) model.load_state_dict(model_state) model.eval() self.models.append(model) self.trained = True logger.info(f"Loaded ensemble with {self.n_models} models from {path}") def tune_surrogate_for_study( fea_data: List[Dict], design_var_names: List[str], objective_names: List[str], n_tuning_trials: int = 50, n_ensemble_models: int = 5 ) -> TunedEnsembleSurrogate: """ Convenience function to tune and create ensemble surrogate. Args: fea_data: List of FEA results with 'params' and 'objectives' keys design_var_names: List of design variable names objective_names: List of objective names n_tuning_trials: Number of Optuna trials n_ensemble_models: Number of models in ensemble Returns: Trained TunedEnsembleSurrogate """ # Prepare data X = np.array([[d['params'][name] for name in design_var_names] for d in fea_data]) Y = np.array([[d['objectives'][name] for name in objective_names] for d in fea_data]) logger.info(f"Tuning surrogate on {len(X)} samples...") logger.info(f"Input: {len(design_var_names)} design variables") logger.info(f"Output: {len(objective_names)} objectives") # Tune hyperparameters tuner = SurrogateHyperparameterTuner( input_dim=len(design_var_names), output_dim=len(objective_names), n_trials=n_tuning_trials, n_cv_folds=5 ) best_config = tuner.tune(X, Y, output_names=objective_names) # Create and train ensemble ensemble = TunedEnsembleSurrogate( config=best_config, input_dim=len(design_var_names), output_dim=len(objective_names), n_models=n_ensemble_models ) ensemble.train(X, Y) return ensemble