649 lines
20 KiB
Python
649 lines
20 KiB
Python
|
|
"""
|
||
|
|
Simple MLP Surrogate for Fast Optimization
|
||
|
|
|
||
|
|
This module provides a lightweight neural network surrogate that:
|
||
|
|
1. Trains directly from Optuna database (no mesh parsing needed)
|
||
|
|
2. Uses simple MLP: design_params -> [mass, frequency, max_disp, max_stress]
|
||
|
|
3. Provides millisecond predictions for optimization
|
||
|
|
|
||
|
|
This is much simpler than the GNN-based approach and works well when:
|
||
|
|
- You have enough FEA data in the database
|
||
|
|
- You only need scalar objective predictions (no field data)
|
||
|
|
- You want quick setup without mesh parsing pipeline
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
from optimization_engine.simple_mlp_surrogate import SimpleSurrogate, train_from_database
|
||
|
|
|
||
|
|
# Train from database
|
||
|
|
surrogate = train_from_database(
|
||
|
|
db_path="studies/uav_arm_atomizerfield_test/2_results/study.db",
|
||
|
|
study_name="uav_arm_atomizerfield_test"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Predict
|
||
|
|
results = surrogate.predict({
|
||
|
|
'beam_half_core_thickness': 3.0,
|
||
|
|
'beam_face_thickness': 1.5,
|
||
|
|
'holes_diameter': 8.0,
|
||
|
|
'hole_count': 4
|
||
|
|
})
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import time
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, Any, Optional, List, Tuple
|
||
|
|
import numpy as np
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
try:
|
||
|
|
import torch
|
||
|
|
import torch.nn as nn
|
||
|
|
import torch.nn.functional as F
|
||
|
|
from torch.utils.data import Dataset, DataLoader, random_split
|
||
|
|
TORCH_AVAILABLE = True
|
||
|
|
except ImportError:
|
||
|
|
TORCH_AVAILABLE = False
|
||
|
|
logger.warning("PyTorch not installed. SimpleSurrogate will be limited.")
|
||
|
|
|
||
|
|
try:
|
||
|
|
import optuna
|
||
|
|
OPTUNA_AVAILABLE = True
|
||
|
|
except ImportError:
|
||
|
|
OPTUNA_AVAILABLE = False
|
||
|
|
|
||
|
|
|
||
|
|
class MLPModel(nn.Module):
|
||
|
|
"""Simple MLP for design parameter -> objective prediction."""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
n_inputs: int = 4,
|
||
|
|
n_outputs: int = 4,
|
||
|
|
hidden_dims: List[int] = [128, 256, 128, 64],
|
||
|
|
dropout: float = 0.1
|
||
|
|
):
|
||
|
|
super().__init__()
|
||
|
|
|
||
|
|
layers = []
|
||
|
|
prev_dim = n_inputs
|
||
|
|
|
||
|
|
for hidden_dim in hidden_dims:
|
||
|
|
layers.extend([
|
||
|
|
nn.Linear(prev_dim, hidden_dim),
|
||
|
|
nn.LayerNorm(hidden_dim),
|
||
|
|
nn.ReLU(),
|
||
|
|
nn.Dropout(dropout)
|
||
|
|
])
|
||
|
|
prev_dim = hidden_dim
|
||
|
|
|
||
|
|
layers.append(nn.Linear(prev_dim, n_outputs))
|
||
|
|
|
||
|
|
self.network = nn.Sequential(*layers)
|
||
|
|
|
||
|
|
# Initialize weights
|
||
|
|
self._init_weights()
|
||
|
|
|
||
|
|
def _init_weights(self):
|
||
|
|
for m in self.modules():
|
||
|
|
if isinstance(m, nn.Linear):
|
||
|
|
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
|
||
|
|
if m.bias is not None:
|
||
|
|
nn.init.constant_(m.bias, 0)
|
||
|
|
|
||
|
|
def forward(self, x):
|
||
|
|
return self.network(x)
|
||
|
|
|
||
|
|
|
||
|
|
class FEADataset(Dataset):
|
||
|
|
"""Dataset for training from FEA results."""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
design_params: np.ndarray,
|
||
|
|
objectives: np.ndarray
|
||
|
|
):
|
||
|
|
self.design_params = torch.tensor(design_params, dtype=torch.float32)
|
||
|
|
self.objectives = torch.tensor(objectives, dtype=torch.float32)
|
||
|
|
|
||
|
|
def __len__(self):
|
||
|
|
return len(self.design_params)
|
||
|
|
|
||
|
|
def __getitem__(self, idx):
|
||
|
|
return self.design_params[idx], self.objectives[idx]
|
||
|
|
|
||
|
|
|
||
|
|
class SimpleSurrogate:
|
||
|
|
"""
|
||
|
|
Simple MLP-based surrogate for FEA prediction.
|
||
|
|
|
||
|
|
This is a lightweight alternative to the GNN-based approach that:
|
||
|
|
- Doesn't require mesh parsing
|
||
|
|
- Trains directly from database
|
||
|
|
- Provides fast scalar predictions
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
model: nn.Module = None,
|
||
|
|
design_var_names: List[str] = None,
|
||
|
|
objective_names: List[str] = None,
|
||
|
|
normalization: Dict[str, Any] = None,
|
||
|
|
device: str = 'auto'
|
||
|
|
):
|
||
|
|
if not TORCH_AVAILABLE:
|
||
|
|
raise ImportError("PyTorch required. Install: pip install torch")
|
||
|
|
|
||
|
|
# Set device
|
||
|
|
if device == 'auto':
|
||
|
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||
|
|
else:
|
||
|
|
self.device = torch.device(device)
|
||
|
|
|
||
|
|
self.model = model
|
||
|
|
if model is not None:
|
||
|
|
self.model = model.to(self.device)
|
||
|
|
self.model.eval()
|
||
|
|
|
||
|
|
self.design_var_names = design_var_names or []
|
||
|
|
self.objective_names = objective_names or ['mass', 'frequency', 'max_displacement', 'max_stress']
|
||
|
|
|
||
|
|
# Normalization stats
|
||
|
|
self.normalization = normalization or {
|
||
|
|
'design_mean': np.zeros(len(self.design_var_names)),
|
||
|
|
'design_std': np.ones(len(self.design_var_names)),
|
||
|
|
'objective_mean': np.zeros(len(self.objective_names)),
|
||
|
|
'objective_std': np.ones(len(self.objective_names))
|
||
|
|
}
|
||
|
|
|
||
|
|
# Performance tracking
|
||
|
|
self.stats = {
|
||
|
|
'predictions': 0,
|
||
|
|
'total_time_ms': 0.0
|
||
|
|
}
|
||
|
|
|
||
|
|
logger.info(f"SimpleSurrogate initialized on {self.device}")
|
||
|
|
|
||
|
|
def predict(self, design_params: Dict[str, float]) -> Dict[str, Any]:
|
||
|
|
"""
|
||
|
|
Predict FEA objectives from design parameters.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
design_params: Dict of design variable values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with mass, frequency, max_displacement, max_stress, inference_time_ms
|
||
|
|
"""
|
||
|
|
start_time = time.time()
|
||
|
|
|
||
|
|
# Build input tensor
|
||
|
|
param_values = [design_params.get(name, 0.0) for name in self.design_var_names]
|
||
|
|
x = np.array(param_values, dtype=np.float32)
|
||
|
|
|
||
|
|
# Normalize
|
||
|
|
x_norm = (x - self.normalization['design_mean']) / (self.normalization['design_std'] + 1e-8)
|
||
|
|
x_tensor = torch.tensor(x_norm, dtype=torch.float32, device=self.device).unsqueeze(0)
|
||
|
|
|
||
|
|
# Predict
|
||
|
|
with torch.no_grad():
|
||
|
|
y_norm = self.model(x_tensor).cpu().numpy()[0]
|
||
|
|
|
||
|
|
# Denormalize
|
||
|
|
y = y_norm * self.normalization['objective_std'] + self.normalization['objective_mean']
|
||
|
|
|
||
|
|
inference_time = (time.time() - start_time) * 1000
|
||
|
|
|
||
|
|
results = {
|
||
|
|
self.objective_names[i]: float(y[i]) for i in range(len(self.objective_names))
|
||
|
|
}
|
||
|
|
results['inference_time_ms'] = inference_time
|
||
|
|
|
||
|
|
# Update stats
|
||
|
|
self.stats['predictions'] += 1
|
||
|
|
self.stats['total_time_ms'] += inference_time
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
def get_statistics(self) -> Dict[str, Any]:
|
||
|
|
"""Get prediction statistics."""
|
||
|
|
avg_time = self.stats['total_time_ms'] / self.stats['predictions'] \
|
||
|
|
if self.stats['predictions'] > 0 else 0
|
||
|
|
|
||
|
|
return {
|
||
|
|
'total_predictions': self.stats['predictions'],
|
||
|
|
'total_time_ms': self.stats['total_time_ms'],
|
||
|
|
'average_time_ms': avg_time,
|
||
|
|
'device': str(self.device),
|
||
|
|
'design_var_names': self.design_var_names,
|
||
|
|
'objective_names': self.objective_names
|
||
|
|
}
|
||
|
|
|
||
|
|
def save(self, path: Path):
|
||
|
|
"""Save surrogate to file."""
|
||
|
|
path = Path(path)
|
||
|
|
torch.save({
|
||
|
|
'model_state_dict': self.model.state_dict(),
|
||
|
|
'design_var_names': self.design_var_names,
|
||
|
|
'objective_names': self.objective_names,
|
||
|
|
'normalization': self.normalization,
|
||
|
|
'model_config': {
|
||
|
|
'n_inputs': len(self.design_var_names),
|
||
|
|
'n_outputs': len(self.objective_names)
|
||
|
|
}
|
||
|
|
}, path)
|
||
|
|
logger.info(f"Surrogate saved to {path}")
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def load(cls, path: Path, device: str = 'auto') -> 'SimpleSurrogate':
|
||
|
|
"""Load surrogate from file."""
|
||
|
|
path = Path(path)
|
||
|
|
checkpoint = torch.load(path, map_location='cpu')
|
||
|
|
|
||
|
|
# Create model
|
||
|
|
model_config = checkpoint['model_config']
|
||
|
|
model = MLPModel(
|
||
|
|
n_inputs=model_config['n_inputs'],
|
||
|
|
n_outputs=model_config['n_outputs']
|
||
|
|
)
|
||
|
|
model.load_state_dict(checkpoint['model_state_dict'])
|
||
|
|
|
||
|
|
return cls(
|
||
|
|
model=model,
|
||
|
|
design_var_names=checkpoint['design_var_names'],
|
||
|
|
objective_names=checkpoint['objective_names'],
|
||
|
|
normalization=checkpoint['normalization'],
|
||
|
|
device=device
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def extract_data_from_database(
|
||
|
|
db_path: str,
|
||
|
|
study_name: str
|
||
|
|
) -> Tuple[np.ndarray, np.ndarray, List[str], List[str]]:
|
||
|
|
"""
|
||
|
|
Extract training data from Optuna database.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
db_path: Path to SQLite database
|
||
|
|
study_name: Name of Optuna study
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (design_params, objectives, design_var_names, objective_names)
|
||
|
|
"""
|
||
|
|
if not OPTUNA_AVAILABLE:
|
||
|
|
raise ImportError("Optuna required. Install: pip install optuna")
|
||
|
|
|
||
|
|
storage = optuna.storages.RDBStorage(f"sqlite:///{db_path}")
|
||
|
|
study = optuna.load_study(study_name=study_name, storage=storage)
|
||
|
|
|
||
|
|
# Get completed trials
|
||
|
|
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
||
|
|
|
||
|
|
if not completed_trials:
|
||
|
|
raise ValueError(f"No completed trials in study {study_name}")
|
||
|
|
|
||
|
|
logger.info(f"Found {len(completed_trials)} completed trials")
|
||
|
|
|
||
|
|
# Get design variable names from first trial
|
||
|
|
design_var_names = list(completed_trials[0].params.keys())
|
||
|
|
|
||
|
|
# Determine objective structure
|
||
|
|
first_values = completed_trials[0].values
|
||
|
|
if first_values is None:
|
||
|
|
raise ValueError("Trials have no objective values")
|
||
|
|
|
||
|
|
# For multi-objective, values are [mass, frequency, ...]
|
||
|
|
# We also need user_attrs for constraints
|
||
|
|
|
||
|
|
# Collect data - filter out invalid samples
|
||
|
|
design_params_list = []
|
||
|
|
objectives_list = []
|
||
|
|
skipped = 0
|
||
|
|
|
||
|
|
for trial in completed_trials:
|
||
|
|
# Objectives - need mass, frequency, max_disp, max_stress
|
||
|
|
mass = trial.values[0] if len(trial.values) > 0 else 0.0
|
||
|
|
frequency = trial.values[1] if len(trial.values) > 1 else 0.0
|
||
|
|
|
||
|
|
# Get constraints from user_attrs
|
||
|
|
max_disp = trial.user_attrs.get('max_displacement', 0.0)
|
||
|
|
max_stress = trial.user_attrs.get('max_stress', 0.0)
|
||
|
|
|
||
|
|
# Note: frequency is stored as -freq for minimization, so convert back
|
||
|
|
# Also filter out inf values
|
||
|
|
objectives = [mass, -frequency, max_disp, max_stress]
|
||
|
|
|
||
|
|
# Skip invalid samples (inf, nan, or extreme values)
|
||
|
|
if any(np.isinf(v) or np.isnan(v) or v > 1e10 for v in objectives):
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Skip if frequency is negative (indicates error)
|
||
|
|
if -frequency <= 0:
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Design parameters
|
||
|
|
params = [trial.params.get(name, 0.0) for name in design_var_names]
|
||
|
|
design_params_list.append(params)
|
||
|
|
objectives_list.append(objectives)
|
||
|
|
|
||
|
|
if skipped > 0:
|
||
|
|
logger.info(f"Skipped {skipped} invalid samples")
|
||
|
|
|
||
|
|
if not design_params_list:
|
||
|
|
raise ValueError("No valid samples found after filtering")
|
||
|
|
|
||
|
|
design_params = np.array(design_params_list, dtype=np.float32)
|
||
|
|
objectives = np.array(objectives_list, dtype=np.float32)
|
||
|
|
|
||
|
|
objective_names = ['mass', 'frequency', 'max_displacement', 'max_stress']
|
||
|
|
|
||
|
|
logger.info(f"Extracted {len(design_params)} valid samples")
|
||
|
|
logger.info(f"Design vars: {design_var_names}")
|
||
|
|
logger.info(f"Objectives: {objective_names}")
|
||
|
|
|
||
|
|
return design_params, objectives, design_var_names, objective_names
|
||
|
|
|
||
|
|
|
||
|
|
def train_from_database(
|
||
|
|
db_path: str,
|
||
|
|
study_name: str,
|
||
|
|
epochs: int = 200,
|
||
|
|
batch_size: int = 32,
|
||
|
|
learning_rate: float = 0.001,
|
||
|
|
val_split: float = 0.2,
|
||
|
|
save_path: Optional[str] = None,
|
||
|
|
device: str = 'auto'
|
||
|
|
) -> SimpleSurrogate:
|
||
|
|
"""
|
||
|
|
Train SimpleSurrogate from Optuna database.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
db_path: Path to SQLite database
|
||
|
|
study_name: Name of Optuna study
|
||
|
|
epochs: Training epochs
|
||
|
|
batch_size: Batch size
|
||
|
|
learning_rate: Learning rate
|
||
|
|
val_split: Validation split ratio
|
||
|
|
save_path: Optional path to save trained model
|
||
|
|
device: Computing device
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Trained SimpleSurrogate
|
||
|
|
"""
|
||
|
|
if not TORCH_AVAILABLE:
|
||
|
|
raise ImportError("PyTorch required")
|
||
|
|
|
||
|
|
# Set device
|
||
|
|
if device == 'auto':
|
||
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||
|
|
else:
|
||
|
|
device = torch.device(device)
|
||
|
|
|
||
|
|
print(f"\n{'='*60}")
|
||
|
|
print("Training Simple MLP Surrogate from Database")
|
||
|
|
print(f"{'='*60}")
|
||
|
|
print(f"Device: {device}")
|
||
|
|
|
||
|
|
# Extract data
|
||
|
|
print(f"\n[1] Loading data from {db_path}")
|
||
|
|
design_params, objectives, design_var_names, objective_names = extract_data_from_database(
|
||
|
|
db_path, study_name
|
||
|
|
)
|
||
|
|
|
||
|
|
print(f" Samples: {len(design_params)}")
|
||
|
|
print(f" Design vars: {design_var_names}")
|
||
|
|
print(f" Objectives: {objective_names}")
|
||
|
|
|
||
|
|
# Compute normalization stats
|
||
|
|
design_mean = design_params.mean(axis=0)
|
||
|
|
design_std = design_params.std(axis=0)
|
||
|
|
objective_mean = objectives.mean(axis=0)
|
||
|
|
objective_std = objectives.std(axis=0)
|
||
|
|
|
||
|
|
print(f"\n Objective ranges:")
|
||
|
|
for i, name in enumerate(objective_names):
|
||
|
|
print(f" {name}: {objectives[:, i].min():.2f} - {objectives[:, i].max():.2f}")
|
||
|
|
|
||
|
|
# Normalize data
|
||
|
|
design_params_norm = (design_params - design_mean) / (design_std + 1e-8)
|
||
|
|
objectives_norm = (objectives - objective_mean) / (objective_std + 1e-8)
|
||
|
|
|
||
|
|
# Create dataset
|
||
|
|
dataset = FEADataset(design_params_norm, objectives_norm)
|
||
|
|
|
||
|
|
# Split into train/val
|
||
|
|
n_val = int(len(dataset) * val_split)
|
||
|
|
n_train = len(dataset) - n_val
|
||
|
|
train_dataset, val_dataset = random_split(dataset, [n_train, n_val])
|
||
|
|
|
||
|
|
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
||
|
|
val_loader = DataLoader(val_dataset, batch_size=batch_size)
|
||
|
|
|
||
|
|
print(f"\n[2] Creating model")
|
||
|
|
print(f" Train samples: {n_train}")
|
||
|
|
print(f" Val samples: {n_val}")
|
||
|
|
|
||
|
|
# Create model
|
||
|
|
model = MLPModel(
|
||
|
|
n_inputs=len(design_var_names),
|
||
|
|
n_outputs=len(objective_names),
|
||
|
|
hidden_dims=[128, 256, 128, 64]
|
||
|
|
).to(device)
|
||
|
|
|
||
|
|
n_params = sum(p.numel() for p in model.parameters())
|
||
|
|
print(f" Model params: {n_params:,}")
|
||
|
|
|
||
|
|
# Training
|
||
|
|
print(f"\n[3] Training for {epochs} epochs")
|
||
|
|
|
||
|
|
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-5)
|
||
|
|
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
|
||
|
|
|
||
|
|
best_val_loss = float('inf')
|
||
|
|
best_state = None
|
||
|
|
|
||
|
|
for epoch in range(epochs):
|
||
|
|
# Train
|
||
|
|
model.train()
|
||
|
|
train_loss = 0.0
|
||
|
|
for x, y in train_loader:
|
||
|
|
x, y = x.to(device), y.to(device)
|
||
|
|
|
||
|
|
optimizer.zero_grad()
|
||
|
|
pred = model(x)
|
||
|
|
loss = F.mse_loss(pred, y)
|
||
|
|
loss.backward()
|
||
|
|
optimizer.step()
|
||
|
|
|
||
|
|
train_loss += loss.item()
|
||
|
|
|
||
|
|
train_loss /= len(train_loader)
|
||
|
|
|
||
|
|
# Validate
|
||
|
|
model.eval()
|
||
|
|
val_loss = 0.0
|
||
|
|
with torch.no_grad():
|
||
|
|
for x, y in val_loader:
|
||
|
|
x, y = x.to(device), y.to(device)
|
||
|
|
pred = model(x)
|
||
|
|
val_loss += F.mse_loss(pred, y).item()
|
||
|
|
|
||
|
|
val_loss /= len(val_loader)
|
||
|
|
scheduler.step()
|
||
|
|
|
||
|
|
# Track best
|
||
|
|
if val_loss < best_val_loss:
|
||
|
|
best_val_loss = val_loss
|
||
|
|
best_state = model.state_dict().copy()
|
||
|
|
|
||
|
|
# Log progress
|
||
|
|
if (epoch + 1) % 20 == 0 or epoch == 0:
|
||
|
|
print(f" Epoch {epoch+1:3d}: train_loss={train_loss:.6f}, val_loss={val_loss:.6f}")
|
||
|
|
|
||
|
|
# Load best model
|
||
|
|
model.load_state_dict(best_state)
|
||
|
|
print(f"\n Best val_loss: {best_val_loss:.6f}")
|
||
|
|
|
||
|
|
# Create surrogate
|
||
|
|
normalization = {
|
||
|
|
'design_mean': design_mean,
|
||
|
|
'design_std': design_std,
|
||
|
|
'objective_mean': objective_mean,
|
||
|
|
'objective_std': objective_std
|
||
|
|
}
|
||
|
|
|
||
|
|
surrogate = SimpleSurrogate(
|
||
|
|
model=model,
|
||
|
|
design_var_names=design_var_names,
|
||
|
|
objective_names=objective_names,
|
||
|
|
normalization=normalization,
|
||
|
|
device=str(device)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Evaluate accuracy
|
||
|
|
print(f"\n[4] Evaluating accuracy on validation set")
|
||
|
|
model.eval()
|
||
|
|
|
||
|
|
all_preds = []
|
||
|
|
all_targets = []
|
||
|
|
|
||
|
|
with torch.no_grad():
|
||
|
|
for x, y in val_loader:
|
||
|
|
x = x.to(device)
|
||
|
|
pred = model(x).cpu().numpy()
|
||
|
|
all_preds.append(pred)
|
||
|
|
all_targets.append(y.numpy())
|
||
|
|
|
||
|
|
all_preds = np.concatenate(all_preds)
|
||
|
|
all_targets = np.concatenate(all_targets)
|
||
|
|
|
||
|
|
# Denormalize for error calculation
|
||
|
|
preds_denorm = all_preds * objective_std + objective_mean
|
||
|
|
targets_denorm = all_targets * objective_std + objective_mean
|
||
|
|
|
||
|
|
for i, name in enumerate(objective_names):
|
||
|
|
mae = np.abs(preds_denorm[:, i] - targets_denorm[:, i]).mean()
|
||
|
|
mape = (np.abs(preds_denorm[:, i] - targets_denorm[:, i]) / (np.abs(targets_denorm[:, i]) + 1e-8)).mean() * 100
|
||
|
|
print(f" {name}: MAE={mae:.4f}, MAPE={mape:.1f}%")
|
||
|
|
|
||
|
|
# Save if requested
|
||
|
|
if save_path:
|
||
|
|
surrogate.save(save_path)
|
||
|
|
|
||
|
|
print(f"\n{'='*60}")
|
||
|
|
print("Training complete!")
|
||
|
|
print(f"{'='*60}")
|
||
|
|
|
||
|
|
return surrogate
|
||
|
|
|
||
|
|
|
||
|
|
def create_simple_surrogate_for_study(
|
||
|
|
db_path: str = None,
|
||
|
|
study_name: str = None,
|
||
|
|
model_path: str = None,
|
||
|
|
project_root: Path = None
|
||
|
|
) -> Optional[SimpleSurrogate]:
|
||
|
|
"""
|
||
|
|
Factory function to create or load SimpleSurrogate for UAV arm study.
|
||
|
|
|
||
|
|
If model_path exists, loads the model. Otherwise trains from database.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
db_path: Path to Optuna database
|
||
|
|
study_name: Name of study
|
||
|
|
model_path: Path to saved model (auto-detect if None)
|
||
|
|
project_root: Project root for auto-detection
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
SimpleSurrogate instance or None
|
||
|
|
"""
|
||
|
|
if not TORCH_AVAILABLE:
|
||
|
|
logger.warning("PyTorch not available")
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Auto-detect paths
|
||
|
|
if project_root is None:
|
||
|
|
project_root = Path(__file__).parent.parent
|
||
|
|
|
||
|
|
if model_path is None:
|
||
|
|
model_path = project_root / "simple_mlp_surrogate.pt"
|
||
|
|
else:
|
||
|
|
model_path = Path(model_path)
|
||
|
|
|
||
|
|
# Try to load existing model
|
||
|
|
if model_path.exists():
|
||
|
|
logger.info(f"Loading existing surrogate from {model_path}")
|
||
|
|
return SimpleSurrogate.load(model_path)
|
||
|
|
|
||
|
|
# Otherwise train from database
|
||
|
|
if db_path is None:
|
||
|
|
db_path = project_root / "studies" / "uav_arm_atomizerfield_test" / "2_results" / "study.db"
|
||
|
|
else:
|
||
|
|
db_path = Path(db_path)
|
||
|
|
|
||
|
|
if study_name is None:
|
||
|
|
study_name = "uav_arm_atomizerfield_test"
|
||
|
|
|
||
|
|
if not db_path.exists():
|
||
|
|
logger.warning(f"Database not found: {db_path}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
logger.info(f"Training surrogate from {db_path}")
|
||
|
|
return train_from_database(
|
||
|
|
db_path=str(db_path),
|
||
|
|
study_name=study_name,
|
||
|
|
save_path=str(model_path)
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
import sys
|
||
|
|
|
||
|
|
# Default paths
|
||
|
|
project_root = Path(__file__).parent.parent
|
||
|
|
db_path = project_root / "studies" / "uav_arm_atomizerfield_test" / "2_results" / "study.db"
|
||
|
|
model_path = project_root / "simple_mlp_surrogate.pt"
|
||
|
|
|
||
|
|
print("Simple MLP Surrogate Training")
|
||
|
|
print("="*60)
|
||
|
|
|
||
|
|
if not db_path.exists():
|
||
|
|
print(f"ERROR: Database not found: {db_path}")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
# Train
|
||
|
|
surrogate = train_from_database(
|
||
|
|
db_path=str(db_path),
|
||
|
|
study_name="uav_arm_atomizerfield_test",
|
||
|
|
epochs=300,
|
||
|
|
save_path=str(model_path)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Test predictions
|
||
|
|
print("\n[5] Testing predictions")
|
||
|
|
test_params = {
|
||
|
|
'beam_half_core_thickness': 3.0,
|
||
|
|
'beam_face_thickness': 1.5,
|
||
|
|
'holes_diameter': 8.0,
|
||
|
|
'hole_count': 4
|
||
|
|
}
|
||
|
|
|
||
|
|
print(f" Input: {test_params}")
|
||
|
|
results = surrogate.predict(test_params)
|
||
|
|
print(f" Mass: {results['mass']:.2f} g")
|
||
|
|
print(f" Frequency: {results['frequency']:.2f} Hz")
|
||
|
|
print(f" Max Displacement: {results['max_displacement']:.6f} mm")
|
||
|
|
print(f" Max Stress: {results['max_stress']:.2f} MPa")
|
||
|
|
print(f" Inference time: {results['inference_time_ms']:.2f} ms")
|
||
|
|
|
||
|
|
# Test variation
|
||
|
|
print("\n[6] Testing variation with parameters")
|
||
|
|
for thickness in [1.0, 3.0, 5.0]:
|
||
|
|
params = {**test_params, 'beam_half_core_thickness': thickness}
|
||
|
|
r = surrogate.predict(params)
|
||
|
|
print(f" thickness={thickness}: mass={r['mass']:.0f}g, freq={r['frequency']:.2f}Hz")
|