Phase 1 - Session Bootstrap: - Add .claude/ATOMIZER_CONTEXT.md as single entry point for new sessions - Add study state detection and task routing Phase 2 - Code Deduplication: - Add optimization_engine/base_runner.py (ConfigDrivenRunner) - Add optimization_engine/generic_surrogate.py (ConfigDrivenSurrogate) - Add optimization_engine/study_state.py for study detection - Add optimization_engine/templates/ with registry and templates - Studies now require ~50 lines instead of ~300 Phase 3 - Skill Consolidation: - Add YAML frontmatter metadata to all skills (versioning, dependencies) - Consolidate create-study.md into core/study-creation-core.md - Update 00_BOOTSTRAP.md, 01_CHEATSHEET.md, 02_CONTEXT_LOADER.md Phase 4 - Self-Expanding Knowledge: - Add optimization_engine/auto_doc.py for auto-generating documentation - Generate docs/generated/EXTRACTORS.md (27 extractors documented) - Generate docs/generated/TEMPLATES.md (6 templates) - Generate docs/generated/EXTRACTOR_CHEATSHEET.md Phase 5 - Subagent Implementation: - Add .claude/commands/study-builder.md (create studies) - Add .claude/commands/nx-expert.md (NX Open API) - Add .claude/commands/protocol-auditor.md (config validation) - Add .claude/commands/results-analyzer.md (results analysis) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
835 lines
30 KiB
Python
835 lines
30 KiB
Python
"""
|
|
GenericSurrogate - Config-driven neural network surrogate for optimization.
|
|
|
|
This module eliminates ~2,800 lines of duplicated code across study run_nn_optimization.py files
|
|
by providing a fully config-driven neural surrogate system.
|
|
|
|
Usage:
|
|
# In study's run_nn_optimization.py (now ~30 lines instead of ~600):
|
|
from optimization_engine.generic_surrogate import ConfigDrivenSurrogate
|
|
|
|
surrogate = ConfigDrivenSurrogate(__file__)
|
|
surrogate.run() # Handles --train, --turbo, --all flags automatically
|
|
"""
|
|
|
|
from pathlib import Path
|
|
import sys
|
|
import json
|
|
import argparse
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List, Tuple
|
|
import time
|
|
|
|
import numpy as np
|
|
|
|
# Conditional PyTorch import
|
|
try:
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
from torch.utils.data import DataLoader, random_split, TensorDataset
|
|
TORCH_AVAILABLE = True
|
|
except ImportError:
|
|
TORCH_AVAILABLE = False
|
|
|
|
import optuna
|
|
from optuna.samplers import NSGAIISampler
|
|
|
|
|
|
class MLPSurrogate(nn.Module):
|
|
"""
|
|
Generic MLP architecture for surrogate modeling.
|
|
|
|
Architecture: Input -> [Linear -> LayerNorm -> ReLU -> Dropout] * N -> Output
|
|
"""
|
|
|
|
def __init__(self, n_inputs: int, n_outputs: int,
|
|
hidden_dims: List[int] = None, dropout: float = 0.1):
|
|
super().__init__()
|
|
|
|
if hidden_dims is None:
|
|
# Default architecture scales with problem size
|
|
hidden_dims = [64, 128, 128, 64]
|
|
|
|
layers = []
|
|
prev_dim = n_inputs
|
|
|
|
for hidden_dim in hidden_dims:
|
|
layers.extend([
|
|
nn.Linear(prev_dim, hidden_dim),
|
|
nn.LayerNorm(hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Dropout(dropout)
|
|
])
|
|
prev_dim = hidden_dim
|
|
|
|
layers.append(nn.Linear(prev_dim, n_outputs))
|
|
self.network = nn.Sequential(*layers)
|
|
|
|
# Initialize weights
|
|
for m in self.modules():
|
|
if isinstance(m, nn.Linear):
|
|
nn.init.kaiming_normal_(m.weight)
|
|
if m.bias is not None:
|
|
nn.init.constant_(m.bias, 0)
|
|
|
|
def forward(self, x):
|
|
return self.network(x)
|
|
|
|
|
|
class GenericSurrogate:
|
|
"""
|
|
Config-driven neural surrogate for FEA optimization.
|
|
|
|
Automatically adapts to any number of design variables and objectives
|
|
based on the optimization_config.json file.
|
|
"""
|
|
|
|
def __init__(self, config: Dict, device: str = 'auto'):
|
|
"""
|
|
Initialize surrogate from config.
|
|
|
|
Args:
|
|
config: Normalized config dictionary
|
|
device: 'auto', 'cuda', or 'cpu'
|
|
"""
|
|
if not TORCH_AVAILABLE:
|
|
raise ImportError("PyTorch required for neural surrogate")
|
|
|
|
self.config = config
|
|
self.device = torch.device(
|
|
'cuda' if torch.cuda.is_available() and device == 'auto' else 'cpu'
|
|
)
|
|
|
|
# Extract variable and objective info from config
|
|
self.design_var_names = [v['name'] for v in config['design_variables']]
|
|
self.design_var_bounds = {
|
|
v['name']: (v['min'], v['max'])
|
|
for v in config['design_variables']
|
|
}
|
|
self.design_var_types = {
|
|
v['name']: v.get('type', 'continuous')
|
|
for v in config['design_variables']
|
|
}
|
|
|
|
self.objective_names = [o['name'] for o in config['objectives']]
|
|
self.n_inputs = len(self.design_var_names)
|
|
self.n_outputs = len(self.objective_names)
|
|
|
|
self.model = None
|
|
self.normalization = None
|
|
|
|
def _get_hidden_dims(self) -> List[int]:
|
|
"""Calculate hidden layer dimensions based on problem size."""
|
|
n = self.n_inputs
|
|
|
|
if n <= 3:
|
|
return [32, 64, 32]
|
|
elif n <= 6:
|
|
return [64, 128, 128, 64]
|
|
elif n <= 10:
|
|
return [128, 256, 256, 128]
|
|
else:
|
|
return [256, 512, 512, 256]
|
|
|
|
def train_from_database(self, db_path: Path, study_name: str,
|
|
epochs: int = 300, validation_split: float = 0.2,
|
|
batch_size: int = 16, learning_rate: float = 0.001,
|
|
save_path: Path = None, verbose: bool = True):
|
|
"""
|
|
Train surrogate from Optuna database.
|
|
|
|
Args:
|
|
db_path: Path to study.db
|
|
study_name: Name of the Optuna study
|
|
epochs: Number of training epochs
|
|
validation_split: Fraction of data for validation
|
|
batch_size: Training batch size
|
|
learning_rate: Initial learning rate
|
|
save_path: Where to save the trained model
|
|
verbose: Print training progress
|
|
"""
|
|
if verbose:
|
|
print(f"\n{'='*60}")
|
|
print(f"Training Generic Surrogate ({self.n_inputs} inputs -> {self.n_outputs} outputs)")
|
|
print(f"{'='*60}")
|
|
print(f"Device: {self.device}")
|
|
print(f"Database: {db_path}")
|
|
|
|
# Load data from Optuna
|
|
storage = optuna.storages.RDBStorage(f"sqlite:///{db_path}")
|
|
study = optuna.load_study(study_name=study_name, storage=storage)
|
|
|
|
completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
|
|
|
if verbose:
|
|
print(f"Found {len(completed)} completed trials")
|
|
|
|
if len(completed) < 10:
|
|
raise ValueError(f"Need at least 10 trials for training, got {len(completed)}")
|
|
|
|
# Extract training data
|
|
design_params = []
|
|
objectives = []
|
|
|
|
for trial in completed:
|
|
# Skip inf values
|
|
if any(v == float('inf') or v != v for v in trial.values): # nan check
|
|
continue
|
|
|
|
params = [trial.params.get(name, 0) for name in self.design_var_names]
|
|
objs = list(trial.values)
|
|
|
|
design_params.append(params)
|
|
objectives.append(objs)
|
|
|
|
design_params = np.array(design_params, dtype=np.float32)
|
|
objectives = np.array(objectives, dtype=np.float32)
|
|
|
|
if verbose:
|
|
print(f"Valid samples: {len(design_params)}")
|
|
print(f"\nDesign variable ranges:")
|
|
for i, name in enumerate(self.design_var_names):
|
|
print(f" {name}: {design_params[:, i].min():.2f} - {design_params[:, i].max():.2f}")
|
|
print(f"\nObjective ranges:")
|
|
for i, name in enumerate(self.objective_names):
|
|
print(f" {name}: {objectives[:, i].min():.4f} - {objectives[:, i].max():.4f}")
|
|
|
|
# Compute normalization parameters
|
|
design_mean = design_params.mean(axis=0)
|
|
design_std = design_params.std(axis=0) + 1e-8
|
|
objective_mean = objectives.mean(axis=0)
|
|
objective_std = objectives.std(axis=0) + 1e-8
|
|
|
|
self.normalization = {
|
|
'design_mean': design_mean,
|
|
'design_std': design_std,
|
|
'objective_mean': objective_mean,
|
|
'objective_std': objective_std
|
|
}
|
|
|
|
# Normalize data
|
|
X = (design_params - design_mean) / design_std
|
|
Y = (objectives - objective_mean) / objective_std
|
|
|
|
X_tensor = torch.tensor(X, dtype=torch.float32)
|
|
Y_tensor = torch.tensor(Y, dtype=torch.float32)
|
|
|
|
# Create datasets
|
|
dataset = TensorDataset(X_tensor, Y_tensor)
|
|
n_val = max(1, int(len(dataset) * validation_split))
|
|
n_train = len(dataset) - n_val
|
|
train_ds, val_ds = random_split(dataset, [n_train, n_val])
|
|
|
|
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
|
|
val_loader = DataLoader(val_ds, batch_size=batch_size)
|
|
|
|
if verbose:
|
|
print(f"\nTraining: {n_train} samples, Validation: {n_val} samples")
|
|
|
|
# Build model
|
|
hidden_dims = self._get_hidden_dims()
|
|
self.model = MLPSurrogate(
|
|
n_inputs=self.n_inputs,
|
|
n_outputs=self.n_outputs,
|
|
hidden_dims=hidden_dims
|
|
).to(self.device)
|
|
|
|
n_params = sum(p.numel() for p in self.model.parameters())
|
|
if verbose:
|
|
print(f"Model architecture: {self.n_inputs} -> {hidden_dims} -> {self.n_outputs}")
|
|
print(f"Total parameters: {n_params:,}")
|
|
|
|
# Training setup
|
|
optimizer = torch.optim.AdamW(self.model.parameters(), lr=learning_rate, weight_decay=1e-5)
|
|
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
|
|
|
|
best_val_loss = float('inf')
|
|
best_state = None
|
|
|
|
if verbose:
|
|
print(f"\nTraining for {epochs} epochs...")
|
|
|
|
for epoch in range(epochs):
|
|
# Training
|
|
self.model.train()
|
|
train_loss = 0.0
|
|
for x, y in train_loader:
|
|
x, y = x.to(self.device), y.to(self.device)
|
|
optimizer.zero_grad()
|
|
pred = self.model(x)
|
|
loss = F.mse_loss(pred, y)
|
|
loss.backward()
|
|
optimizer.step()
|
|
train_loss += loss.item()
|
|
train_loss /= len(train_loader)
|
|
|
|
# Validation
|
|
self.model.eval()
|
|
val_loss = 0.0
|
|
with torch.no_grad():
|
|
for x, y in val_loader:
|
|
x, y = x.to(self.device), y.to(self.device)
|
|
pred = self.model(x)
|
|
val_loss += F.mse_loss(pred, y).item()
|
|
val_loss /= len(val_loader)
|
|
|
|
scheduler.step()
|
|
|
|
if val_loss < best_val_loss:
|
|
best_val_loss = val_loss
|
|
best_state = self.model.state_dict().copy()
|
|
|
|
if verbose and ((epoch + 1) % 50 == 0 or epoch == 0):
|
|
print(f" Epoch {epoch+1:3d}: train={train_loss:.6f}, val={val_loss:.6f}")
|
|
|
|
# Load best model
|
|
self.model.load_state_dict(best_state)
|
|
|
|
if verbose:
|
|
print(f"\nBest validation loss: {best_val_loss:.6f}")
|
|
|
|
# Final evaluation
|
|
self._print_validation_metrics(val_loader)
|
|
|
|
# Save model
|
|
if save_path:
|
|
self.save(save_path)
|
|
|
|
return self
|
|
|
|
def _print_validation_metrics(self, val_loader):
|
|
"""Print validation accuracy metrics."""
|
|
self.model.eval()
|
|
all_preds = []
|
|
all_targets = []
|
|
|
|
with torch.no_grad():
|
|
for x, y in val_loader:
|
|
x = x.to(self.device)
|
|
pred = self.model(x).cpu().numpy()
|
|
all_preds.append(pred)
|
|
all_targets.append(y.numpy())
|
|
|
|
all_preds = np.concatenate(all_preds)
|
|
all_targets = np.concatenate(all_targets)
|
|
|
|
# Denormalize
|
|
preds_denorm = all_preds * self.normalization['objective_std'] + self.normalization['objective_mean']
|
|
targets_denorm = all_targets * self.normalization['objective_std'] + self.normalization['objective_mean']
|
|
|
|
print(f"\nValidation accuracy:")
|
|
for i, name in enumerate(self.objective_names):
|
|
mae = np.abs(preds_denorm[:, i] - targets_denorm[:, i]).mean()
|
|
mape = (np.abs(preds_denorm[:, i] - targets_denorm[:, i]) /
|
|
(np.abs(targets_denorm[:, i]) + 1e-8)).mean() * 100
|
|
print(f" {name}: MAE={mae:.4f}, MAPE={mape:.1f}%")
|
|
|
|
def predict(self, design_params: Dict[str, float]) -> Dict[str, float]:
|
|
"""
|
|
Predict objectives from design parameters.
|
|
|
|
Args:
|
|
design_params: Dictionary of design variable values
|
|
|
|
Returns:
|
|
Dictionary of predicted objective values
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("Model not trained. Call train_from_database first.")
|
|
|
|
# Build input array
|
|
x = np.array([design_params.get(name, 0) for name in self.design_var_names], dtype=np.float32)
|
|
x_norm = (x - self.normalization['design_mean']) / self.normalization['design_std']
|
|
x_tensor = torch.tensor(x_norm, dtype=torch.float32, device=self.device).unsqueeze(0)
|
|
|
|
# Predict
|
|
self.model.eval()
|
|
with torch.no_grad():
|
|
y_norm = self.model(x_tensor).cpu().numpy()[0]
|
|
|
|
# Denormalize
|
|
y = y_norm * self.normalization['objective_std'] + self.normalization['objective_mean']
|
|
|
|
return {name: float(y[i]) for i, name in enumerate(self.objective_names)}
|
|
|
|
def sample_random_design(self) -> Dict[str, float]:
|
|
"""Sample a random point in the design space."""
|
|
params = {}
|
|
for name in self.design_var_names:
|
|
low, high = self.design_var_bounds[name]
|
|
if self.design_var_types[name] == 'integer':
|
|
params[name] = float(np.random.randint(int(low), int(high) + 1))
|
|
else:
|
|
params[name] = np.random.uniform(low, high)
|
|
return params
|
|
|
|
def save(self, path: Path):
|
|
"""Save model to file."""
|
|
path = Path(path)
|
|
torch.save({
|
|
'model_state_dict': self.model.state_dict(),
|
|
'normalization': {
|
|
'design_mean': self.normalization['design_mean'].tolist(),
|
|
'design_std': self.normalization['design_std'].tolist(),
|
|
'objective_mean': self.normalization['objective_mean'].tolist(),
|
|
'objective_std': self.normalization['objective_std'].tolist()
|
|
},
|
|
'design_var_names': self.design_var_names,
|
|
'objective_names': self.objective_names,
|
|
'n_inputs': self.n_inputs,
|
|
'n_outputs': self.n_outputs,
|
|
'hidden_dims': self._get_hidden_dims()
|
|
}, path)
|
|
print(f"Model saved to {path}")
|
|
|
|
def load(self, path: Path):
|
|
"""Load model from file."""
|
|
path = Path(path)
|
|
checkpoint = torch.load(path, map_location=self.device)
|
|
|
|
hidden_dims = checkpoint.get('hidden_dims', self._get_hidden_dims())
|
|
self.model = MLPSurrogate(
|
|
n_inputs=checkpoint['n_inputs'],
|
|
n_outputs=checkpoint['n_outputs'],
|
|
hidden_dims=hidden_dims
|
|
).to(self.device)
|
|
self.model.load_state_dict(checkpoint['model_state_dict'])
|
|
self.model.eval()
|
|
|
|
norm = checkpoint['normalization']
|
|
self.normalization = {
|
|
'design_mean': np.array(norm['design_mean']),
|
|
'design_std': np.array(norm['design_std']),
|
|
'objective_mean': np.array(norm['objective_mean']),
|
|
'objective_std': np.array(norm['objective_std'])
|
|
}
|
|
|
|
self.design_var_names = checkpoint.get('design_var_names', self.design_var_names)
|
|
self.objective_names = checkpoint.get('objective_names', self.objective_names)
|
|
print(f"Model loaded from {path}")
|
|
|
|
|
|
class ConfigDrivenSurrogate:
|
|
"""
|
|
Fully config-driven neural surrogate system.
|
|
|
|
Provides complete --train, --turbo, --all workflow based on optimization_config.json.
|
|
Handles FEA validation, surrogate retraining, and result reporting automatically.
|
|
"""
|
|
|
|
def __init__(self, script_path: str, config_path: Optional[str] = None,
|
|
element_type: str = 'auto'):
|
|
"""
|
|
Initialize config-driven surrogate.
|
|
|
|
Args:
|
|
script_path: Path to study's run_nn_optimization.py (__file__)
|
|
config_path: Optional explicit path to config
|
|
element_type: Element type for stress extraction ('auto' detects from DAT file)
|
|
"""
|
|
self.study_dir = Path(script_path).parent
|
|
self.config_path = Path(config_path) if config_path else self._find_config()
|
|
self.model_dir = self.study_dir / "1_setup" / "model"
|
|
self.results_dir = self.study_dir / "2_results"
|
|
|
|
# Load config
|
|
with open(self.config_path, 'r') as f:
|
|
self.raw_config = json.load(f)
|
|
|
|
# Normalize config (reuse from base_runner)
|
|
self.config = self._normalize_config(self.raw_config)
|
|
|
|
self.study_name = self.config['study_name']
|
|
self.element_type = element_type
|
|
|
|
self.surrogate = None
|
|
self.logger = None
|
|
self.nx_solver = None
|
|
|
|
def _find_config(self) -> Path:
|
|
"""Find the optimization config file."""
|
|
candidates = [
|
|
self.study_dir / "optimization_config.json",
|
|
self.study_dir / "1_setup" / "optimization_config.json",
|
|
]
|
|
for path in candidates:
|
|
if path.exists():
|
|
return path
|
|
raise FileNotFoundError(f"No optimization_config.json found in {self.study_dir}")
|
|
|
|
def _normalize_config(self, config: Dict) -> Dict:
|
|
"""Normalize config format variations."""
|
|
# This mirrors ConfigNormalizer from base_runner.py
|
|
normalized = {
|
|
'study_name': config.get('study_name', 'unnamed_study'),
|
|
'description': config.get('description', ''),
|
|
'design_variables': [],
|
|
'objectives': [],
|
|
'constraints': [],
|
|
'simulation': {},
|
|
'neural_acceleration': config.get('neural_acceleration', {}),
|
|
}
|
|
|
|
# Normalize design variables
|
|
for var in config.get('design_variables', []):
|
|
normalized['design_variables'].append({
|
|
'name': var.get('parameter') or var.get('name'),
|
|
'type': var.get('type', 'continuous'),
|
|
'min': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[0] if 'bounds' in var else var.get('min', 0),
|
|
'max': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[1] if 'bounds' in var else var.get('max', 1),
|
|
})
|
|
|
|
# Normalize objectives
|
|
for obj in config.get('objectives', []):
|
|
normalized['objectives'].append({
|
|
'name': obj.get('name'),
|
|
'direction': obj.get('goal') or obj.get('direction', 'minimize'),
|
|
})
|
|
|
|
# Normalize simulation
|
|
sim = config.get('simulation', {})
|
|
normalized['simulation'] = {
|
|
'sim_file': sim.get('sim_file', ''),
|
|
'dat_file': sim.get('dat_file', ''),
|
|
'solution_name': sim.get('solution_name', 'Solution 1'),
|
|
}
|
|
|
|
return normalized
|
|
|
|
def _setup(self):
|
|
"""Initialize solver and logger."""
|
|
project_root = self.study_dir.parents[1]
|
|
if str(project_root) not in sys.path:
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from optimization_engine.nx_solver import NXSolver
|
|
from optimization_engine.logger import get_logger
|
|
|
|
self.results_dir.mkdir(exist_ok=True)
|
|
self.logger = get_logger(self.study_name, study_dir=self.results_dir)
|
|
self.nx_solver = NXSolver(nastran_version="2506")
|
|
|
|
def _detect_element_type(self, dat_file: Path) -> str:
|
|
"""Auto-detect element type from DAT file."""
|
|
if self.element_type != 'auto':
|
|
return self.element_type
|
|
|
|
try:
|
|
with open(dat_file, 'r') as f:
|
|
content = f.read(50000)
|
|
|
|
if 'CTETRA' in content:
|
|
return 'ctetra'
|
|
elif 'CHEXA' in content:
|
|
return 'chexa'
|
|
elif 'CQUAD4' in content:
|
|
return 'cquad4'
|
|
else:
|
|
return 'ctetra'
|
|
except Exception:
|
|
return 'ctetra'
|
|
|
|
def train(self, epochs: int = 300) -> GenericSurrogate:
|
|
"""Train surrogate model from FEA database."""
|
|
print(f"\n{'='*60}")
|
|
print("PHASE: Train Surrogate Model")
|
|
print(f"{'='*60}")
|
|
|
|
self.surrogate = GenericSurrogate(self.config, device='auto')
|
|
self.surrogate.train_from_database(
|
|
db_path=self.results_dir / "study.db",
|
|
study_name=self.study_name,
|
|
epochs=epochs,
|
|
save_path=self.results_dir / "surrogate_best.pt"
|
|
)
|
|
|
|
return self.surrogate
|
|
|
|
def turbo(self, total_nn_trials: int = 5000, batch_size: int = 100,
|
|
retrain_every: int = 10, epochs: int = 150):
|
|
"""
|
|
Run TURBO mode: NN exploration + FEA validation + surrogate retraining.
|
|
|
|
Args:
|
|
total_nn_trials: Total NN trials to run
|
|
batch_size: NN trials per batch before FEA validation
|
|
retrain_every: Retrain surrogate every N FEA validations
|
|
epochs: Training epochs for surrogate
|
|
"""
|
|
from optimization_engine.extractors.bdf_mass_extractor import extract_mass_from_bdf
|
|
from optimization_engine.extractors.extract_displacement import extract_displacement
|
|
from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress
|
|
|
|
print(f"\n{'#'*60}")
|
|
print(f"# TURBO MODE: {self.study_name}")
|
|
print(f"{'#'*60}")
|
|
print(f"Design variables: {len(self.config['design_variables'])}")
|
|
print(f"Objectives: {len(self.config['objectives'])}")
|
|
print(f"Total NN budget: {total_nn_trials:,} trials")
|
|
print(f"NN batch size: {batch_size}")
|
|
print(f"Expected FEA validations: ~{total_nn_trials // batch_size}")
|
|
|
|
# Initial training
|
|
print(f"\n[INIT] Training initial surrogate...")
|
|
self.train(epochs=epochs)
|
|
|
|
sim_file = self.model_dir / self.config['simulation']['sim_file']
|
|
dat_file = self.model_dir / self.config['simulation']['dat_file']
|
|
element_type = self._detect_element_type(dat_file)
|
|
|
|
fea_count = 0
|
|
nn_count = 0
|
|
best_solutions = []
|
|
iteration = 0
|
|
start_time = time.time()
|
|
|
|
# Get objective info
|
|
obj_names = [o['name'] for o in self.config['objectives']]
|
|
obj_directions = [o['direction'] for o in self.config['objectives']]
|
|
|
|
while nn_count < total_nn_trials:
|
|
iteration += 1
|
|
batch_trials = min(batch_size, total_nn_trials - nn_count)
|
|
|
|
print(f"\n{'─'*50}")
|
|
print(f"Iteration {iteration}: NN trials {nn_count+1}-{nn_count+batch_trials}")
|
|
|
|
# Find best candidate via NN
|
|
best_candidate = None
|
|
best_score = float('inf')
|
|
|
|
for _ in range(batch_trials):
|
|
params = self.surrogate.sample_random_design()
|
|
pred = self.surrogate.predict(params)
|
|
|
|
# Compute score (simple weighted sum - lower is better)
|
|
score = sum(pred[name] if obj_directions[i] == 'minimize' else -pred[name]
|
|
for i, name in enumerate(obj_names))
|
|
|
|
if score < best_score:
|
|
best_score = score
|
|
best_candidate = {'params': params, 'nn_pred': pred}
|
|
|
|
nn_count += batch_trials
|
|
|
|
params = best_candidate['params']
|
|
nn_pred = best_candidate['nn_pred']
|
|
|
|
# Log NN prediction
|
|
var_str = ", ".join(f"{k}={v:.2f}" for k, v in list(params.items())[:3])
|
|
print(f" Best NN: {var_str}...")
|
|
pred_str = ", ".join(f"{k}={v:.2f}" for k, v in nn_pred.items())
|
|
print(f" NN pred: {pred_str}")
|
|
|
|
# Run FEA validation
|
|
result = self.nx_solver.run_simulation(
|
|
sim_file=sim_file,
|
|
working_dir=self.model_dir,
|
|
expression_updates=params,
|
|
solution_name=self.config['simulation'].get('solution_name'),
|
|
cleanup=True
|
|
)
|
|
|
|
if not result['success']:
|
|
print(f" FEA FAILED - skipping")
|
|
continue
|
|
|
|
# Extract FEA results
|
|
op2_file = result['op2_file']
|
|
fea_results = self._extract_fea_results(op2_file, dat_file, element_type,
|
|
extract_mass_from_bdf, extract_displacement,
|
|
extract_solid_stress)
|
|
|
|
fea_str = ", ".join(f"{k}={v:.2f}" for k, v in fea_results.items())
|
|
print(f" FEA: {fea_str}")
|
|
|
|
# Compute errors
|
|
errors = {}
|
|
for name in obj_names:
|
|
if name in fea_results and name in nn_pred and fea_results[name] != 0:
|
|
errors[name] = abs(fea_results[name] - nn_pred[name]) / abs(fea_results[name]) * 100
|
|
|
|
if errors:
|
|
err_str = ", ".join(f"{k}={v:.1f}%" for k, v in errors.items())
|
|
print(f" Error: {err_str}")
|
|
|
|
fea_count += 1
|
|
|
|
# Add to main study database
|
|
self._add_to_study(params, fea_results, iteration)
|
|
|
|
best_solutions.append({
|
|
'iteration': iteration,
|
|
'params': {k: float(v) for k, v in params.items()},
|
|
'fea': [fea_results.get(name, 0) for name in obj_names],
|
|
'nn_error': [errors.get(name, 0) for name in obj_names[:2]] # First 2 errors
|
|
})
|
|
|
|
# Retrain periodically
|
|
if fea_count % retrain_every == 0:
|
|
print(f"\n [RETRAIN] Retraining surrogate...")
|
|
self.train(epochs=epochs)
|
|
|
|
# Progress
|
|
elapsed = time.time() - start_time
|
|
rate = nn_count / elapsed if elapsed > 0 else 0
|
|
remaining = (total_nn_trials - nn_count) / rate if rate > 0 else 0
|
|
print(f" Progress: {nn_count:,}/{total_nn_trials:,} NN | {fea_count} FEA | {elapsed/60:.1f}min | ~{remaining/60:.1f}min left")
|
|
|
|
# Final summary
|
|
print(f"\n{'#'*60}")
|
|
print("# TURBO MODE COMPLETE")
|
|
print(f"{'#'*60}")
|
|
print(f"NN trials: {nn_count:,}")
|
|
print(f"FEA validations: {fea_count}")
|
|
print(f"Time: {(time.time() - start_time)/60:.1f} minutes")
|
|
|
|
# Save report
|
|
turbo_report = {
|
|
'mode': 'turbo',
|
|
'total_nn_trials': nn_count,
|
|
'fea_validations': fea_count,
|
|
'time_minutes': (time.time() - start_time) / 60,
|
|
'best_solutions': best_solutions[-20:]
|
|
}
|
|
|
|
report_path = self.results_dir / "turbo_report.json"
|
|
with open(report_path, 'w') as f:
|
|
json.dump(turbo_report, f, indent=2)
|
|
|
|
print(f"\nReport saved to {report_path}")
|
|
|
|
def _extract_fea_results(self, op2_file: Path, dat_file: Path, element_type: str,
|
|
extract_mass_from_bdf, extract_displacement, extract_solid_stress) -> Dict[str, float]:
|
|
"""Extract FEA results for all objectives."""
|
|
results = {}
|
|
|
|
for obj in self.config['objectives']:
|
|
name = obj['name'].lower()
|
|
|
|
try:
|
|
if 'mass' in name:
|
|
results[obj['name']] = extract_mass_from_bdf(str(dat_file))
|
|
|
|
elif 'stress' in name:
|
|
stress_result = extract_solid_stress(op2_file, subcase=1, element_type=element_type)
|
|
results[obj['name']] = stress_result.get('max_von_mises', float('inf')) / 1000.0
|
|
|
|
elif 'displacement' in name:
|
|
disp_result = extract_displacement(op2_file, subcase=1)
|
|
results[obj['name']] = disp_result['max_displacement']
|
|
|
|
elif 'stiffness' in name:
|
|
disp_result = extract_displacement(op2_file, subcase=1)
|
|
max_disp = disp_result['max_displacement']
|
|
# Negative for minimization in multi-objective
|
|
results[obj['name']] = -1000.0 / max(abs(max_disp), 1e-6)
|
|
results['displacement'] = max_disp
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Failed to extract {name}: {e}")
|
|
results[obj['name']] = float('inf')
|
|
|
|
return results
|
|
|
|
def _add_to_study(self, params: Dict, fea_results: Dict, iteration: int):
|
|
"""Add FEA result to main Optuna study."""
|
|
try:
|
|
storage = f"sqlite:///{self.results_dir / 'study.db'}"
|
|
study = optuna.load_study(
|
|
study_name=self.study_name,
|
|
storage=storage,
|
|
sampler=NSGAIISampler(population_size=20, seed=42)
|
|
)
|
|
|
|
trial = study.ask()
|
|
|
|
for var in self.config['design_variables']:
|
|
name = var['name']
|
|
value = params[name]
|
|
if var['type'] == 'integer':
|
|
trial.suggest_int(name, int(value), int(value))
|
|
else:
|
|
trial.suggest_float(name, value, value)
|
|
|
|
# Get objective values in order
|
|
obj_values = [fea_results.get(o['name'], float('inf')) for o in self.config['objectives']]
|
|
study.tell(trial, obj_values)
|
|
|
|
trial.set_user_attr('source', 'turbo_mode')
|
|
trial.set_user_attr('iteration', iteration)
|
|
|
|
except Exception as e:
|
|
print(f" Warning: couldn't add to study: {e}")
|
|
|
|
def run(self, args=None):
|
|
"""
|
|
Main entry point with argument parsing.
|
|
|
|
Handles --train, --turbo, --all flags.
|
|
"""
|
|
if args is None:
|
|
args = self.parse_args()
|
|
|
|
self._setup()
|
|
|
|
print(f"\n{'#'*60}")
|
|
print(f"# {self.study_name} - Hybrid NN Optimization")
|
|
print(f"{'#'*60}")
|
|
|
|
if args.all or args.train:
|
|
self.train(epochs=args.epochs)
|
|
|
|
if args.all or args.turbo:
|
|
self.turbo(
|
|
total_nn_trials=args.nn_trials,
|
|
batch_size=args.batch_size,
|
|
retrain_every=args.retrain_every,
|
|
epochs=args.epochs
|
|
)
|
|
|
|
print(f"\n{'#'*60}")
|
|
print("# Workflow Complete!")
|
|
print(f"{'#'*60}\n")
|
|
|
|
return 0
|
|
|
|
def parse_args(self) -> argparse.Namespace:
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(description=f'{self.study_name} - Hybrid NN Optimization')
|
|
|
|
parser.add_argument('--train', action='store_true', help='Train surrogate only')
|
|
parser.add_argument('--turbo', action='store_true', help='TURBO mode (recommended)')
|
|
parser.add_argument('--all', action='store_true', help='Train then run turbo')
|
|
|
|
nn_config = self.config.get('neural_acceleration', {})
|
|
parser.add_argument('--epochs', type=int, default=nn_config.get('epochs', 200), help='Training epochs')
|
|
parser.add_argument('--nn-trials', type=int, default=nn_config.get('nn_trials', 5000), help='Total NN trials')
|
|
parser.add_argument('--batch-size', type=int, default=100, help='NN batch size')
|
|
parser.add_argument('--retrain-every', type=int, default=10, help='Retrain every N FEA')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not any([args.train, args.turbo, args.all]):
|
|
print("No phase specified. Use --train, --turbo, or --all")
|
|
print("\nRecommended workflow:")
|
|
print(f" python run_nn_optimization.py --turbo --nn-trials {nn_config.get('nn_trials', 5000)}")
|
|
sys.exit(1)
|
|
|
|
return args
|
|
|
|
|
|
def create_surrogate(script_path: str, element_type: str = 'auto') -> ConfigDrivenSurrogate:
|
|
"""
|
|
Factory function to create a ConfigDrivenSurrogate.
|
|
|
|
Args:
|
|
script_path: Path to study's run_nn_optimization.py (__file__)
|
|
element_type: Element type for stress extraction
|
|
|
|
Returns:
|
|
Configured surrogate ready to run
|
|
"""
|
|
return ConfigDrivenSurrogate(script_path, element_type=element_type)
|