""" L-BFGS Gradient-Based Optimizer for Surrogate Models This module exploits the differentiability of trained neural network surrogates to perform gradient-based optimization using L-BFGS and other second-order methods. Key Advantages over Derivative-Free Methods: - 100-1000x faster convergence for local refinement - Exploits the smooth landscape learned by the surrogate - Can find precise local optima that sampling-based methods miss Usage: from optimization_engine.gradient_optimizer import GradientOptimizer from optimization_engine.generic_surrogate import GenericSurrogate # Load trained surrogate surrogate = GenericSurrogate(config) surrogate.load("surrogate_best.pt") # Create gradient optimizer optimizer = GradientOptimizer(surrogate, objective_weights=[5.0, 5.0, 1.0]) # Find optimal designs from multiple starting points results = optimizer.optimize( starting_points=top_10_candidates, method='lbfgs', n_iterations=100 ) Reference: - "Physics-informed deep learning for simultaneous surrogate modeling and PDE-constrained optimization" - ScienceDirect 2023 - "Neural Network Surrogate and Projected Gradient Descent for Fast and Reliable Finite Element Model Calibration" - arXiv 2024 """ from pathlib import Path from typing import Dict, List, Optional, Tuple, Union, Callable from dataclasses import dataclass, field import json import time import numpy as np try: import torch import torch.nn as nn from torch.optim import LBFGS, Adam, SGD TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False @dataclass class OptimizationResult: """Result from gradient-based optimization.""" # Final solution params: Dict[str, float] objectives: Dict[str, float] weighted_sum: float # Convergence info converged: bool n_iterations: int n_function_evals: int final_gradient_norm: float # Trajectory (optional) history: List[Dict] = field(default_factory=list) # Starting point info starting_params: Dict[str, float] = field(default_factory=dict) starting_weighted_sum: float = 0.0 improvement: float = 0.0 def to_dict(self) -> dict: return { 'params': self.params, 'objectives': self.objectives, 'weighted_sum': self.weighted_sum, 'converged': self.converged, 'n_iterations': self.n_iterations, 'n_function_evals': self.n_function_evals, 'final_gradient_norm': self.final_gradient_norm, 'starting_weighted_sum': self.starting_weighted_sum, 'improvement': self.improvement } class GradientOptimizer: """ Gradient-based optimizer exploiting differentiable surrogates. Supports: - L-BFGS (recommended for surrogate optimization) - Adam (good for noisy landscapes) - Projected gradient descent (for bound constraints) The key insight is that once we have a trained neural network surrogate, we can compute exact gradients via backpropagation, enabling much faster convergence than derivative-free methods like TPE or CMA-ES. """ def __init__( self, surrogate, objective_weights: Optional[List[float]] = None, objective_directions: Optional[List[str]] = None, constraints: Optional[List[Dict]] = None, device: str = 'auto' ): """ Initialize gradient optimizer. Args: surrogate: Trained GenericSurrogate or compatible model objective_weights: Weight for each objective in weighted sum objective_directions: 'minimize' or 'maximize' for each objective constraints: List of constraint dicts with 'name', 'type', 'threshold' device: 'auto', 'cuda', or 'cpu' """ if not TORCH_AVAILABLE: raise ImportError("PyTorch required for gradient optimization") self.surrogate = surrogate self.model = surrogate.model self.normalization = surrogate.normalization self.var_names = surrogate.design_var_names self.obj_names = surrogate.objective_names self.bounds = surrogate.design_var_bounds self.n_vars = len(self.var_names) self.n_objs = len(self.obj_names) # Device setup if device == 'auto': self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') else: self.device = torch.device(device) # Move model to device self.model = self.model.to(self.device) self.model.eval() # Objective configuration if objective_weights is None: objective_weights = [1.0] * self.n_objs self.weights = torch.tensor(objective_weights, dtype=torch.float32, device=self.device) if objective_directions is None: objective_directions = ['minimize'] * self.n_objs # Convert directions to signs: minimize=+1, maximize=-1 self.signs = torch.tensor( [1.0 if d == 'minimize' else -1.0 for d in objective_directions], dtype=torch.float32, device=self.device ) self.constraints = constraints or [] # Precompute bound tensors for projection self.bounds_low = torch.tensor( [self.bounds[name][0] for name in self.var_names], dtype=torch.float32, device=self.device ) self.bounds_high = torch.tensor( [self.bounds[name][1] for name in self.var_names], dtype=torch.float32, device=self.device ) # Normalization tensors self.design_mean = torch.tensor( self.normalization['design_mean'], dtype=torch.float32, device=self.device ) self.design_std = torch.tensor( self.normalization['design_std'], dtype=torch.float32, device=self.device ) self.obj_mean = torch.tensor( self.normalization['objective_mean'], dtype=torch.float32, device=self.device ) self.obj_std = torch.tensor( self.normalization['objective_std'], dtype=torch.float32, device=self.device ) def _normalize_params(self, x: torch.Tensor) -> torch.Tensor: """Normalize parameters for model input.""" return (x - self.design_mean) / self.design_std def _denormalize_objectives(self, y_norm: torch.Tensor) -> torch.Tensor: """Denormalize model output to actual objective values.""" return y_norm * self.obj_std + self.obj_mean def _project_to_bounds(self, x: torch.Tensor) -> torch.Tensor: """Project parameters to feasible bounds.""" return torch.clamp(x, self.bounds_low, self.bounds_high) def _compute_objective(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """ Compute weighted objective from parameters. Args: x: Parameter tensor (raw, not normalized) Returns: (weighted_sum, objectives): Scalar loss and objective vector """ # Project to bounds x_bounded = self._project_to_bounds(x) # Normalize and predict x_norm = self._normalize_params(x_bounded) y_norm = self.model(x_norm.unsqueeze(0)).squeeze(0) # Denormalize objectives objectives = self._denormalize_objectives(y_norm) # Weighted sum with direction signs weighted_sum = (objectives * self.signs * self.weights).sum() return weighted_sum, objectives def _tensor_to_params(self, x: torch.Tensor) -> Dict[str, float]: """Convert parameter tensor to dictionary.""" x_np = x.detach().cpu().numpy() return {name: float(x_np[i]) for i, name in enumerate(self.var_names)} def _params_to_tensor(self, params: Dict[str, float]) -> torch.Tensor: """Convert parameter dictionary to tensor.""" values = [params.get(name, (self.bounds[name][0] + self.bounds[name][1]) / 2) for name in self.var_names] return torch.tensor(values, dtype=torch.float32, device=self.device) def _objectives_to_dict(self, objectives: torch.Tensor) -> Dict[str, float]: """Convert objectives tensor to dictionary.""" obj_np = objectives.detach().cpu().numpy() return {name: float(obj_np[i]) for i, name in enumerate(self.obj_names)} def optimize_single( self, starting_point: Dict[str, float], method: str = 'lbfgs', n_iterations: int = 100, lr: float = 0.1, tolerance: float = 1e-7, record_history: bool = False, verbose: bool = False ) -> OptimizationResult: """ Optimize from a single starting point. Args: starting_point: Initial parameter values method: 'lbfgs', 'adam', or 'sgd' n_iterations: Maximum iterations lr: Learning rate (for Adam/SGD) or line search step (for L-BFGS) tolerance: Convergence tolerance for gradient norm record_history: Store optimization trajectory verbose: Print progress Returns: OptimizationResult with optimized parameters """ # Initialize parameter tensor with gradient tracking x = self._params_to_tensor(starting_point).clone().requires_grad_(True) # Get starting objective with torch.no_grad(): start_ws, start_obj = self._compute_objective(x) start_ws_val = start_ws.item() # Setup optimizer if method.lower() == 'lbfgs': optimizer = LBFGS( [x], lr=lr, max_iter=20, max_eval=25, tolerance_grad=tolerance, tolerance_change=1e-9, history_size=10, line_search_fn='strong_wolfe' ) elif method.lower() == 'adam': optimizer = Adam([x], lr=lr) elif method.lower() == 'sgd': optimizer = SGD([x], lr=lr, momentum=0.9) else: raise ValueError(f"Unknown method: {method}. Use 'lbfgs', 'adam', or 'sgd'") history = [] n_evals = 0 converged = False final_grad_norm = float('inf') def closure(): nonlocal n_evals optimizer.zero_grad() loss, _ = self._compute_objective(x) loss.backward() n_evals += 1 return loss # Optimization loop for iteration in range(n_iterations): if method.lower() == 'lbfgs': # L-BFGS does multiple function evals per step loss = optimizer.step(closure) else: # Adam/SGD: single step optimizer.zero_grad() loss, objectives = self._compute_objective(x) loss.backward() optimizer.step() n_evals += 1 # Project to bounds after step with torch.no_grad(): x.data = self._project_to_bounds(x.data) # Check gradient norm if x.grad is not None: grad_norm = x.grad.norm().item() else: grad_norm = float('inf') final_grad_norm = grad_norm # Record history if record_history: with torch.no_grad(): ws, obj = self._compute_objective(x) history.append({ 'iteration': iteration, 'weighted_sum': ws.item(), 'objectives': self._objectives_to_dict(obj), 'params': self._tensor_to_params(x), 'grad_norm': grad_norm }) if verbose and iteration % 10 == 0: with torch.no_grad(): ws, _ = self._compute_objective(x) print(f" Iter {iteration:3d}: WS={ws.item():.4f}, |grad|={grad_norm:.2e}") # Convergence check if grad_norm < tolerance: converged = True if verbose: print(f" Converged at iteration {iteration} (grad_norm={grad_norm:.2e})") break # Final evaluation with torch.no_grad(): x_final = self._project_to_bounds(x) final_ws, final_obj = self._compute_objective(x_final) final_params = self._tensor_to_params(x_final) final_objectives = self._objectives_to_dict(final_obj) final_ws_val = final_ws.item() improvement = start_ws_val - final_ws_val return OptimizationResult( params=final_params, objectives=final_objectives, weighted_sum=final_ws_val, converged=converged, n_iterations=iteration + 1, n_function_evals=n_evals, final_gradient_norm=final_grad_norm, history=history, starting_params=starting_point, starting_weighted_sum=start_ws_val, improvement=improvement ) def optimize( self, starting_points: Union[List[Dict[str, float]], int] = 10, method: str = 'lbfgs', n_iterations: int = 100, lr: float = 0.1, tolerance: float = 1e-7, n_random_restarts: int = 0, return_all: bool = False, verbose: bool = True ) -> Union[OptimizationResult, List[OptimizationResult]]: """ Optimize from multiple starting points and return best result. Args: starting_points: List of starting dicts, or int for random starts method: Optimization method ('lbfgs', 'adam', 'sgd') n_iterations: Max iterations per start lr: Learning rate tolerance: Convergence tolerance n_random_restarts: Additional random starting points return_all: Return all results, not just best verbose: Print progress Returns: Best OptimizationResult, or list of all results if return_all=True """ # Build starting points list if isinstance(starting_points, int): points = [self._random_starting_point() for _ in range(starting_points)] else: points = list(starting_points) # Add random restarts for _ in range(n_random_restarts): points.append(self._random_starting_point()) if verbose: print(f"\n{'='*60}") print(f"L-BFGS GRADIENT OPTIMIZATION") print(f"{'='*60}") print(f"Method: {method.upper()}") print(f"Starting points: {len(points)}") print(f"Max iterations per start: {n_iterations}") print(f"Tolerance: {tolerance:.0e}") results = [] start_time = time.time() for i, start in enumerate(points): if verbose: var_preview = ", ".join(f"{k}={v:.2f}" for k, v in list(start.items())[:3]) print(f"\n[{i+1}/{len(points)}] Starting from: {var_preview}...") result = self.optimize_single( starting_point=start, method=method, n_iterations=n_iterations, lr=lr, tolerance=tolerance, record_history=False, verbose=False ) results.append(result) if verbose: status = "CONVERGED" if result.converged else f"iter={result.n_iterations}" print(f" Result: WS={result.weighted_sum:.4f} ({status})") print(f" Improvement: {result.improvement:.4f} ({result.improvement/max(result.starting_weighted_sum, 1e-6)*100:.1f}%)") # Sort by weighted sum (ascending = better) results.sort(key=lambda r: r.weighted_sum) elapsed = time.time() - start_time if verbose: print(f"\n{'-'*60}") print(f"OPTIMIZATION COMPLETE") print(f"{'-'*60}") print(f"Time: {elapsed:.1f}s") print(f"Function evaluations: {sum(r.n_function_evals for r in results)}") print(f"\nBest result:") best = results[0] for name, val in best.params.items(): bounds = self.bounds[name] pct = (val - bounds[0]) / (bounds[1] - bounds[0]) * 100 print(f" {name}: {val:.4f} ({pct:.0f}% of range)") print(f"\nObjectives:") for name, val in best.objectives.items(): print(f" {name}: {val:.4f}") print(f"\nWeighted sum: {best.weighted_sum:.4f}") print(f"Total improvement: {best.starting_weighted_sum - best.weighted_sum:.4f}") if return_all: return results return results[0] def _random_starting_point(self) -> Dict[str, float]: """Generate random starting point within bounds.""" params = {} for name in self.var_names: low, high = self.bounds[name] params[name] = np.random.uniform(low, high) return params def grid_search_then_gradient( self, n_grid_samples: int = 100, n_top_for_gradient: int = 10, method: str = 'lbfgs', n_iterations: int = 100, verbose: bool = True ) -> List[OptimizationResult]: """ Two-phase optimization: random grid search then gradient refinement. This is often more effective than pure gradient descent because it identifies multiple promising basins before local refinement. Args: n_grid_samples: Number of random samples for initial search n_top_for_gradient: Number of top candidates to refine with gradient method: Gradient optimization method n_iterations: Gradient iterations per candidate verbose: Print progress Returns: List of results sorted by weighted sum """ if verbose: print(f"\n{'='*60}") print("HYBRID GRID + GRADIENT OPTIMIZATION") print(f"{'='*60}") print(f"Phase 1: {n_grid_samples} random samples") print(f"Phase 2: L-BFGS refinement of top {n_top_for_gradient}") # Phase 1: Random grid search candidates = [] for i in range(n_grid_samples): params = self._random_starting_point() x = self._params_to_tensor(params) with torch.no_grad(): ws, obj = self._compute_objective(x) candidates.append({ 'params': params, 'weighted_sum': ws.item(), 'objectives': self._objectives_to_dict(obj) }) if verbose and (i + 1) % (n_grid_samples // 5) == 0: print(f" Grid search: {i+1}/{n_grid_samples}") # Sort by weighted sum candidates.sort(key=lambda c: c['weighted_sum']) if verbose: print(f"\nTop {n_top_for_gradient} from grid search:") for i, c in enumerate(candidates[:n_top_for_gradient]): print(f" {i+1}. WS={c['weighted_sum']:.4f}") # Phase 2: Gradient refinement top_starts = [c['params'] for c in candidates[:n_top_for_gradient]] results = self.optimize( starting_points=top_starts, method=method, n_iterations=n_iterations, verbose=verbose ) # Return as list if isinstance(results, OptimizationResult): return [results] return results class MultiStartLBFGS: """ Convenience class for multi-start L-BFGS optimization. Provides a simple interface for the common use case of: 1. Load trained surrogate 2. Run L-BFGS from multiple starting points 3. Get best result """ def __init__(self, surrogate_path: Path, config: Dict): """ Initialize from saved surrogate. Args: surrogate_path: Path to surrogate_best.pt config: Optimization config dict """ from optimization_engine.generic_surrogate import GenericSurrogate self.surrogate = GenericSurrogate(config) self.surrogate.load(surrogate_path) # Extract weights from config weights = [obj.get('weight', 1.0) for obj in config.get('objectives', [])] directions = [obj.get('direction', 'minimize') for obj in config.get('objectives', [])] self.optimizer = GradientOptimizer( surrogate=self.surrogate, objective_weights=weights, objective_directions=directions ) def find_optimum( self, n_starts: int = 20, n_iterations: int = 100, top_candidates: Optional[List[Dict[str, float]]] = None ) -> OptimizationResult: """ Find optimal design. Args: n_starts: Number of random starting points n_iterations: L-BFGS iterations per start top_candidates: Optional list of good starting candidates Returns: Best OptimizationResult """ if top_candidates: return self.optimizer.optimize( starting_points=top_candidates, n_random_restarts=n_starts, n_iterations=n_iterations ) else: return self.optimizer.optimize( starting_points=n_starts, n_iterations=n_iterations ) def run_lbfgs_polish( study_dir: Path, n_starts: int = 20, use_top_fea: bool = True, n_iterations: int = 100, verbose: bool = True ) -> List[OptimizationResult]: """ Run L-BFGS polish on a study with trained surrogate. This is the recommended way to use gradient optimization: 1. Load study config and trained surrogate 2. Optionally use top FEA results as starting points 3. Run L-BFGS refinement 4. Return optimized candidates for FEA validation Args: study_dir: Path to study directory n_starts: Number of starting points (random if use_top_fea=False) use_top_fea: Use top FEA results from study.db as starting points n_iterations: L-BFGS iterations verbose: Print progress Returns: List of OptimizationResults sorted by weighted sum """ import optuna study_dir = Path(study_dir) # Find config config_path = study_dir / "1_setup" / "optimization_config.json" if not config_path.exists(): config_path = study_dir / "optimization_config.json" with open(config_path) as f: config = json.load(f) # Normalize config (simplified) if 'design_variables' in config: for var in config['design_variables']: if 'name' not in var and 'parameter' in var: var['name'] = var['parameter'] if 'min' not in var and 'bounds' in var: var['min'], var['max'] = var['bounds'] # Load surrogate results_dir = study_dir / "3_results" if not results_dir.exists(): results_dir = study_dir / "2_results" surrogate_path = results_dir / "surrogate_best.pt" if not surrogate_path.exists(): raise FileNotFoundError(f"No trained surrogate found at {surrogate_path}") # Get starting points from FEA database starting_points = [] if use_top_fea: db_path = results_dir / "study.db" if db_path.exists(): storage = f"sqlite:///{db_path}" study_name = config.get('study_name', 'unnamed_study') try: study = optuna.load_study(study_name=study_name, storage=storage) completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] # Sort by first objective (or weighted sum if available) completed.sort(key=lambda t: sum(t.values) if t.values else float('inf')) for trial in completed[:n_starts]: starting_points.append(dict(trial.params)) if verbose: print(f"Loaded {len(starting_points)} starting points from FEA database") except Exception as e: if verbose: print(f"Warning: Could not load from database: {e}") # Create optimizer weights = [obj.get('weight', 1.0) for obj in config.get('objectives', [])] directions = [obj.get('direction', 'minimize') for obj in config.get('objectives', [])] from optimization_engine.generic_surrogate import GenericSurrogate surrogate = GenericSurrogate(config) surrogate.load(surrogate_path) optimizer = GradientOptimizer( surrogate=surrogate, objective_weights=weights, objective_directions=directions ) # Run optimization if starting_points: results = optimizer.optimize( starting_points=starting_points, n_random_restarts=max(0, n_starts - len(starting_points)), n_iterations=n_iterations, verbose=verbose, return_all=True ) else: results = optimizer.optimize( starting_points=n_starts, n_iterations=n_iterations, verbose=verbose, return_all=True ) # Save results output_path = results_dir / "lbfgs_results.json" with open(output_path, 'w') as f: json.dump([r.to_dict() for r in results], f, indent=2) if verbose: print(f"\nResults saved to {output_path}") return results if __name__ == "__main__": """CLI interface for L-BFGS optimization.""" import sys import argparse parser = argparse.ArgumentParser(description="L-BFGS Gradient Optimization on Surrogate") parser.add_argument("study_dir", type=str, help="Path to study directory") parser.add_argument("--n-starts", type=int, default=20, help="Number of starting points") parser.add_argument("--n-iterations", type=int, default=100, help="L-BFGS iterations per start") parser.add_argument("--use-top-fea", action="store_true", default=True, help="Use top FEA results as starting points") parser.add_argument("--random-only", action="store_true", help="Use only random starting points") args = parser.parse_args() results = run_lbfgs_polish( study_dir=Path(args.study_dir), n_starts=args.n_starts, use_top_fea=not args.random_only, n_iterations=args.n_iterations, verbose=True ) print(f"\nTop 5 L-BFGS results:") for i, r in enumerate(results[:5]): print(f"\n{i+1}. WS={r.weighted_sum:.4f}") for name, val in r.params.items(): print(f" {name}: {val:.4f}")