BREAKING CHANGE: Module paths have been reorganized for better maintainability. Backwards compatibility aliases with deprecation warnings are provided. New Structure: - core/ - Optimization runners (runner, intelligent_optimizer, etc.) - processors/ - Data processing - surrogates/ - Neural network surrogates - nx/ - NX/Nastran integration (solver, updater, session_manager) - study/ - Study management (creator, wizard, state, reset) - reporting/ - Reports and analysis (visualizer, report_generator) - config/ - Configuration management (manager, builder) - utils/ - Utilities (logger, auto_doc, etc.) - future/ - Research/experimental code Migration: - ~200 import changes across 125 files - All __init__.py files use lazy loading to avoid circular imports - Backwards compatibility layer supports old import paths with warnings - All existing functionality preserved To migrate existing code: OLD: from optimization_engine.nx_solver import NXSolver NEW: from optimization_engine.nx.solver import NXSolver OLD: from optimization_engine.runner import OptimizationRunner NEW: from optimization_engine.core.runner import OptimizationRunner 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
592 lines
22 KiB
Python
592 lines
22 KiB
Python
"""
|
|
Active Learning Surrogate with Uncertainty Estimation
|
|
|
|
This module implements an ensemble-based neural network surrogate that:
|
|
1. Provides uncertainty estimates via ensemble disagreement
|
|
2. Supports active learning for strategic FEA validation
|
|
3. Tracks confidence and knows when predictions are reliable
|
|
|
|
Key Concept:
|
|
- Train multiple NNs (ensemble) on slightly different data (bootstrap)
|
|
- Uncertainty = disagreement between ensemble members
|
|
- High uncertainty regions need FEA validation
|
|
- Low uncertainty + good accuracy = ready for optimization
|
|
"""
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional
|
|
import json
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EnsembleMLP(nn.Module):
|
|
"""Single MLP member of the ensemble."""
|
|
|
|
def __init__(self, input_dim: int, output_dim: int, hidden_dims: List[int] = [64, 64, 32]):
|
|
super().__init__()
|
|
|
|
layers = []
|
|
prev_dim = input_dim
|
|
for hidden_dim in hidden_dims:
|
|
layers.extend([
|
|
nn.Linear(prev_dim, hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Dropout(0.1)
|
|
])
|
|
prev_dim = hidden_dim
|
|
layers.append(nn.Linear(prev_dim, output_dim))
|
|
|
|
self.network = nn.Sequential(*layers)
|
|
|
|
def forward(self, x):
|
|
return self.network(x)
|
|
|
|
|
|
class ActiveLearningSurrogate:
|
|
"""
|
|
Ensemble-based surrogate with uncertainty estimation for active learning.
|
|
|
|
Strategy:
|
|
1. Use ensemble of 5-10 neural networks
|
|
2. Each trained on bootstrap sample of data
|
|
3. Uncertainty = std dev of predictions across ensemble
|
|
4. Select high-uncertainty designs for FEA validation
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
n_ensemble: int = 5,
|
|
hidden_dims: List[int] = [64, 64, 32],
|
|
device: str = 'cpu'
|
|
):
|
|
self.n_ensemble = n_ensemble
|
|
self.hidden_dims = hidden_dims
|
|
self.device = device
|
|
|
|
self.models: List[EnsembleMLP] = []
|
|
self.design_var_names: List[str] = []
|
|
self.objective_names: List[str] = ['mass', 'frequency', 'max_displacement', 'max_stress']
|
|
|
|
# Normalization parameters
|
|
self.input_mean = None
|
|
self.input_std = None
|
|
self.output_mean = None
|
|
self.output_std = None
|
|
|
|
# Training history for each ensemble member
|
|
self.training_history = []
|
|
|
|
# Confidence tracking
|
|
self.validation_errors = [] # Track FEA validation errors
|
|
self.confidence_score = 0.0
|
|
|
|
def _normalize_input(self, x: np.ndarray) -> torch.Tensor:
|
|
"""Normalize input features."""
|
|
x_norm = (x - self.input_mean) / (self.input_std + 1e-8)
|
|
return torch.FloatTensor(x_norm).to(self.device)
|
|
|
|
def _denormalize_output(self, y: torch.Tensor) -> np.ndarray:
|
|
"""Denormalize output predictions."""
|
|
y_np = y.cpu().numpy()
|
|
return y_np * (self.output_std + 1e-8) + self.output_mean
|
|
|
|
def train(
|
|
self,
|
|
design_params: np.ndarray,
|
|
objectives: np.ndarray,
|
|
design_var_names: List[str],
|
|
epochs: int = 200,
|
|
lr: float = 0.001,
|
|
batch_size: int = 32,
|
|
val_split: float = 0.2
|
|
):
|
|
"""
|
|
Train ensemble on the data with bootstrap sampling.
|
|
|
|
Args:
|
|
design_params: (N, D) array of design parameters
|
|
objectives: (N, O) array of objective values
|
|
design_var_names: Names of design variables
|
|
epochs: Training epochs per ensemble member
|
|
lr: Learning rate
|
|
batch_size: Batch size
|
|
val_split: Validation split ratio
|
|
"""
|
|
self.design_var_names = design_var_names
|
|
n_samples = len(design_params)
|
|
input_dim = design_params.shape[1]
|
|
output_dim = objectives.shape[1]
|
|
|
|
# Compute normalization parameters from full dataset
|
|
self.input_mean = design_params.mean(axis=0)
|
|
self.input_std = design_params.std(axis=0)
|
|
self.output_mean = objectives.mean(axis=0)
|
|
self.output_std = objectives.std(axis=0)
|
|
|
|
# Train each ensemble member on bootstrap sample
|
|
self.models = []
|
|
self.training_history = []
|
|
|
|
for i in range(self.n_ensemble):
|
|
logger.info(f"Training ensemble member {i+1}/{self.n_ensemble}")
|
|
|
|
# Bootstrap sampling (sample with replacement)
|
|
bootstrap_idx = np.random.choice(n_samples, size=n_samples, replace=True)
|
|
X_boot = design_params[bootstrap_idx]
|
|
y_boot = objectives[bootstrap_idx]
|
|
|
|
# Split into train/val
|
|
n_val = int(len(X_boot) * val_split)
|
|
indices = np.random.permutation(len(X_boot))
|
|
train_idx, val_idx = indices[n_val:], indices[:n_val]
|
|
|
|
X_train = self._normalize_input(X_boot[train_idx])
|
|
y_train = torch.FloatTensor((y_boot[train_idx] - self.output_mean) / (self.output_std + 1e-8)).to(self.device)
|
|
X_val = self._normalize_input(X_boot[val_idx])
|
|
y_val = torch.FloatTensor((y_boot[val_idx] - self.output_mean) / (self.output_std + 1e-8)).to(self.device)
|
|
|
|
# Create and train model
|
|
model = EnsembleMLP(input_dim, output_dim, self.hidden_dims).to(self.device)
|
|
optimizer = optim.Adam(model.parameters(), lr=lr)
|
|
criterion = nn.MSELoss()
|
|
|
|
best_val_loss = float('inf')
|
|
patience_counter = 0
|
|
best_state = None
|
|
|
|
for epoch in range(epochs):
|
|
model.train()
|
|
|
|
# Mini-batch training
|
|
perm = torch.randperm(len(X_train))
|
|
epoch_loss = 0.0
|
|
n_batches = 0
|
|
|
|
for j in range(0, len(X_train), batch_size):
|
|
batch_idx = perm[j:j+batch_size]
|
|
X_batch = X_train[batch_idx]
|
|
y_batch = y_train[batch_idx]
|
|
|
|
optimizer.zero_grad()
|
|
pred = model(X_batch)
|
|
loss = criterion(pred, y_batch)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
epoch_loss += loss.item()
|
|
n_batches += 1
|
|
|
|
# Validation
|
|
model.eval()
|
|
with torch.no_grad():
|
|
val_pred = model(X_val)
|
|
val_loss = criterion(val_pred, y_val).item()
|
|
|
|
# Early stopping
|
|
if val_loss < best_val_loss:
|
|
best_val_loss = val_loss
|
|
best_state = model.state_dict().copy()
|
|
patience_counter = 0
|
|
else:
|
|
patience_counter += 1
|
|
if patience_counter >= 20:
|
|
break
|
|
|
|
# Restore best model
|
|
if best_state is not None:
|
|
model.load_state_dict(best_state)
|
|
|
|
self.models.append(model)
|
|
self.training_history.append({
|
|
'member': i,
|
|
'best_val_loss': best_val_loss,
|
|
'epochs_trained': epoch + 1
|
|
})
|
|
|
|
logger.info(f" Member {i+1}: val_loss={best_val_loss:.6f}, epochs={epoch+1}")
|
|
|
|
def predict(self, params: Dict[str, float]) -> Dict[str, float]:
|
|
"""
|
|
Predict objectives for a single design.
|
|
|
|
Returns dict with predictions and uncertainty estimates.
|
|
"""
|
|
# Convert to array
|
|
x = np.array([[params.get(name, 0.0) for name in self.design_var_names]], dtype=np.float32)
|
|
|
|
# Get predictions from all ensemble members
|
|
predictions = []
|
|
for model in self.models:
|
|
model.eval()
|
|
with torch.no_grad():
|
|
x_norm = self._normalize_input(x)
|
|
pred_norm = model(x_norm)
|
|
pred = self._denormalize_output(pred_norm)
|
|
predictions.append(pred[0])
|
|
|
|
predictions = np.array(predictions) # (n_ensemble, n_objectives)
|
|
|
|
# Mean prediction and uncertainty (std dev)
|
|
mean_pred = predictions.mean(axis=0)
|
|
std_pred = predictions.std(axis=0)
|
|
|
|
result = {}
|
|
for i, name in enumerate(self.objective_names):
|
|
result[name] = float(mean_pred[i])
|
|
result[f'{name}_uncertainty'] = float(std_pred[i])
|
|
|
|
# Overall uncertainty score (normalized)
|
|
result['total_uncertainty'] = float(np.mean(std_pred / (self.output_std + 1e-8)))
|
|
|
|
return result
|
|
|
|
def predict_batch(self, params_list: List[Dict[str, float]]) -> List[Dict[str, float]]:
|
|
"""Predict for multiple designs efficiently."""
|
|
return [self.predict(p) for p in params_list]
|
|
|
|
def select_designs_for_validation(
|
|
self,
|
|
candidate_designs: List[Dict[str, float]],
|
|
n_select: int = 5,
|
|
strategy: str = 'uncertainty'
|
|
) -> List[Tuple[int, Dict[str, float], float]]:
|
|
"""
|
|
Select designs that should be validated with FEA.
|
|
|
|
Strategies:
|
|
- 'uncertainty': Select highest uncertainty designs
|
|
- 'pareto_uncertainty': Select from Pareto front with high uncertainty
|
|
- 'diverse': Select diverse designs with moderate uncertainty
|
|
|
|
Returns: List of (index, params, uncertainty_score)
|
|
"""
|
|
# Get predictions with uncertainty
|
|
predictions = self.predict_batch(candidate_designs)
|
|
|
|
# Score each design
|
|
scored = []
|
|
for i, (design, pred) in enumerate(zip(candidate_designs, predictions)):
|
|
uncertainty = pred['total_uncertainty']
|
|
scored.append((i, design, pred, uncertainty))
|
|
|
|
if strategy == 'uncertainty':
|
|
# Simply select highest uncertainty
|
|
scored.sort(key=lambda x: x[3], reverse=True)
|
|
|
|
elif strategy == 'pareto_uncertainty':
|
|
# Prefer Pareto-optimal designs with uncertainty
|
|
# Simple proxy: designs with low mass and high frequency predictions
|
|
for item in scored:
|
|
pred = item[2]
|
|
# Bonus for potentially good designs
|
|
pareto_score = -pred['mass'] / 1000 + pred['frequency'] / 10
|
|
# Combined score: uncertainty * pareto_potential
|
|
item = (item[0], item[1], item[2], item[3] * (1 + 0.5 * pareto_score))
|
|
scored.sort(key=lambda x: x[3], reverse=True)
|
|
|
|
elif strategy == 'diverse':
|
|
# Select diverse designs using simple greedy selection
|
|
selected = []
|
|
remaining = scored.copy()
|
|
|
|
# First, select highest uncertainty
|
|
remaining.sort(key=lambda x: x[3], reverse=True)
|
|
selected.append(remaining.pop(0))
|
|
|
|
while len(selected) < n_select and remaining:
|
|
# Find design most different from selected ones
|
|
best_idx = 0
|
|
best_min_dist = 0
|
|
|
|
for i, item in enumerate(remaining):
|
|
design = item[1]
|
|
min_dist = float('inf')
|
|
for sel_item in selected:
|
|
sel_design = sel_item[1]
|
|
dist = sum((design.get(k, 0) - sel_design.get(k, 0))**2
|
|
for k in self.design_var_names)
|
|
min_dist = min(min_dist, dist)
|
|
|
|
# Weight by uncertainty too
|
|
weighted_dist = min_dist * (1 + item[3])
|
|
if weighted_dist > best_min_dist:
|
|
best_min_dist = weighted_dist
|
|
best_idx = i
|
|
|
|
selected.append(remaining.pop(best_idx))
|
|
|
|
return [(s[0], s[1], s[3]) for s in selected]
|
|
|
|
return [(s[0], s[1], s[3]) for s in scored[:n_select]]
|
|
|
|
def update_with_validation(
|
|
self,
|
|
validated_designs: List[Dict[str, float]],
|
|
fea_results: List[Dict[str, float]]
|
|
):
|
|
"""
|
|
Update validation error tracking with new FEA results.
|
|
|
|
This doesn't retrain the model, just tracks prediction accuracy.
|
|
"""
|
|
for design, fea_result in zip(validated_designs, fea_results):
|
|
pred = self.predict(design)
|
|
|
|
errors = {}
|
|
for name in ['mass', 'frequency']:
|
|
if name in fea_result:
|
|
pred_val = pred[name]
|
|
actual_val = fea_result[name]
|
|
error = abs(pred_val - actual_val) / (abs(actual_val) + 1e-8)
|
|
errors[name] = error
|
|
|
|
self.validation_errors.append({
|
|
'design': design,
|
|
'predicted': {k: pred[k] for k in self.objective_names},
|
|
'actual': fea_result,
|
|
'errors': errors,
|
|
'uncertainty': pred['total_uncertainty']
|
|
})
|
|
|
|
# Update confidence score
|
|
self._update_confidence()
|
|
|
|
def _update_confidence(self):
|
|
"""Calculate overall confidence score based on validation history."""
|
|
if not self.validation_errors:
|
|
self.confidence_score = 0.0
|
|
return
|
|
|
|
recent_errors = self.validation_errors[-20:] # Last 20 validations
|
|
|
|
mass_errors = [e['errors'].get('mass', 1.0) for e in recent_errors]
|
|
freq_errors = [e['errors'].get('frequency', 1.0) for e in recent_errors]
|
|
|
|
# Confidence based on MAPE < 10%
|
|
mass_conf = sum(1 for e in mass_errors if e < 0.10) / len(mass_errors)
|
|
freq_conf = sum(1 for e in freq_errors if e < 0.10) / len(freq_errors)
|
|
|
|
# Combined confidence (frequency is harder, weight less)
|
|
self.confidence_score = 0.6 * mass_conf + 0.4 * freq_conf
|
|
|
|
def get_confidence_report(self) -> Dict:
|
|
"""Get detailed confidence metrics."""
|
|
if not self.validation_errors:
|
|
return {
|
|
'confidence_score': 0.0,
|
|
'n_validations': 0,
|
|
'status': 'NO_DATA',
|
|
'recommendation': 'Need FEA validation data'
|
|
}
|
|
|
|
recent = self.validation_errors[-20:]
|
|
|
|
mass_mape = np.mean([e['errors'].get('mass', 1.0) for e in recent]) * 100
|
|
freq_mape = np.mean([e['errors'].get('frequency', 1.0) for e in recent]) * 100
|
|
|
|
# Correlation between uncertainty and error
|
|
uncertainties = [e['uncertainty'] for e in recent]
|
|
total_errors = [np.mean(list(e['errors'].values())) for e in recent]
|
|
|
|
if len(set(uncertainties)) > 1 and len(set(total_errors)) > 1:
|
|
correlation = np.corrcoef(uncertainties, total_errors)[0, 1]
|
|
else:
|
|
correlation = 0.0
|
|
|
|
# Determine status
|
|
if self.confidence_score >= 0.8 and mass_mape < 5 and freq_mape < 15:
|
|
status = 'HIGH_CONFIDENCE'
|
|
recommendation = 'NN ready for optimization'
|
|
elif self.confidence_score >= 0.5:
|
|
status = 'MEDIUM_CONFIDENCE'
|
|
recommendation = 'Continue targeted FEA validation in high-uncertainty regions'
|
|
else:
|
|
status = 'LOW_CONFIDENCE'
|
|
recommendation = 'Need more FEA training data, especially in unexplored regions'
|
|
|
|
return {
|
|
'confidence_score': self.confidence_score,
|
|
'n_validations': len(self.validation_errors),
|
|
'mass_mape': mass_mape,
|
|
'freq_mape': freq_mape,
|
|
'uncertainty_error_correlation': correlation,
|
|
'status': status,
|
|
'recommendation': recommendation
|
|
}
|
|
|
|
def is_ready_for_optimization(self, threshold: float = 0.7) -> bool:
|
|
"""Check if NN is confident enough for optimization."""
|
|
return self.confidence_score >= threshold
|
|
|
|
def save(self, path: str):
|
|
"""Save the ensemble model."""
|
|
path = Path(path)
|
|
|
|
state = {
|
|
'n_ensemble': self.n_ensemble,
|
|
'hidden_dims': self.hidden_dims,
|
|
'design_var_names': self.design_var_names,
|
|
'objective_names': self.objective_names,
|
|
'input_mean': self.input_mean.tolist() if self.input_mean is not None else None,
|
|
'input_std': self.input_std.tolist() if self.input_std is not None else None,
|
|
'output_mean': self.output_mean.tolist() if self.output_mean is not None else None,
|
|
'output_std': self.output_std.tolist() if self.output_std is not None else None,
|
|
'validation_errors': self.validation_errors,
|
|
'confidence_score': self.confidence_score,
|
|
'training_history': self.training_history,
|
|
'models': [m.state_dict() for m in self.models]
|
|
}
|
|
|
|
torch.save(state, path)
|
|
logger.info(f"Saved ensemble surrogate to {path}")
|
|
|
|
@classmethod
|
|
def load(cls, path: str) -> 'ActiveLearningSurrogate':
|
|
"""Load the ensemble model."""
|
|
path = Path(path)
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Model not found: {path}")
|
|
|
|
state = torch.load(path, map_location='cpu')
|
|
|
|
surrogate = cls(
|
|
n_ensemble=state['n_ensemble'],
|
|
hidden_dims=state['hidden_dims']
|
|
)
|
|
surrogate.design_var_names = state['design_var_names']
|
|
surrogate.objective_names = state['objective_names']
|
|
surrogate.input_mean = np.array(state['input_mean']) if state['input_mean'] else None
|
|
surrogate.input_std = np.array(state['input_std']) if state['input_std'] else None
|
|
surrogate.output_mean = np.array(state['output_mean']) if state['output_mean'] else None
|
|
surrogate.output_std = np.array(state['output_std']) if state['output_std'] else None
|
|
surrogate.validation_errors = state.get('validation_errors', [])
|
|
surrogate.confidence_score = state.get('confidence_score', 0.0)
|
|
surrogate.training_history = state.get('training_history', [])
|
|
|
|
# Reconstruct models
|
|
input_dim = len(surrogate.design_var_names)
|
|
output_dim = len(surrogate.objective_names)
|
|
|
|
for model_state in state['models']:
|
|
model = EnsembleMLP(input_dim, output_dim, surrogate.hidden_dims)
|
|
model.load_state_dict(model_state)
|
|
surrogate.models.append(model)
|
|
|
|
logger.info(f"Loaded ensemble surrogate from {path}")
|
|
return surrogate
|
|
|
|
|
|
def extract_training_data_from_study(db_path: str, study_name: str):
|
|
"""Extract training data from Optuna study database."""
|
|
import optuna
|
|
|
|
storage = optuna.storages.RDBStorage(f"sqlite:///{db_path}")
|
|
study = optuna.load_study(study_name=study_name, storage=storage)
|
|
|
|
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
|
|
|
|
if not completed_trials:
|
|
raise ValueError("No completed trials found")
|
|
|
|
# Infer design variable names
|
|
design_var_names = list(completed_trials[0].params.keys())
|
|
|
|
design_params_list = []
|
|
objectives_list = []
|
|
|
|
for trial in completed_trials:
|
|
if len(trial.values) < 2:
|
|
continue
|
|
|
|
mass = trial.values[0]
|
|
raw_freq = trial.values[1]
|
|
frequency = -raw_freq if raw_freq < 0 else raw_freq
|
|
|
|
max_disp = trial.user_attrs.get('max_displacement', 0.0)
|
|
max_stress = trial.user_attrs.get('max_stress', 0.0)
|
|
|
|
# Skip invalid
|
|
if any(np.isinf(v) or np.isnan(v) for v in [mass, frequency]):
|
|
continue
|
|
if frequency <= 0:
|
|
continue
|
|
|
|
params = [trial.params.get(name, 0.0) for name in design_var_names]
|
|
design_params_list.append(params)
|
|
objectives_list.append([mass, frequency, max_disp, max_stress])
|
|
|
|
return (
|
|
np.array(design_params_list, dtype=np.float32),
|
|
np.array(objectives_list, dtype=np.float32),
|
|
design_var_names
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
project_root = Path(__file__).parent.parent
|
|
|
|
# Find database
|
|
db_path = project_root / "studies/uav_arm_optimization/2_results/study.db"
|
|
study_name = "uav_arm_optimization"
|
|
|
|
if not db_path.exists():
|
|
db_path = project_root / "studies/uav_arm_atomizerfield_test/2_results/study.db"
|
|
study_name = "uav_arm_atomizerfield_test"
|
|
|
|
print("="*60)
|
|
print("Training Active Learning Surrogate (Ensemble)")
|
|
print("="*60)
|
|
|
|
# Extract data
|
|
print(f"\nLoading data from {db_path}")
|
|
design_params, objectives, design_var_names = extract_training_data_from_study(
|
|
str(db_path), study_name
|
|
)
|
|
print(f"Loaded {len(design_params)} samples")
|
|
print(f"Design variables: {design_var_names}")
|
|
|
|
# Train ensemble
|
|
print("\nTraining 5-member ensemble...")
|
|
surrogate = ActiveLearningSurrogate(n_ensemble=5)
|
|
surrogate.train(design_params, objectives, design_var_names, epochs=200)
|
|
|
|
# Test predictions with uncertainty
|
|
print("\n" + "="*60)
|
|
print("Testing Predictions with Uncertainty")
|
|
print("="*60)
|
|
|
|
# Test on a few samples
|
|
test_designs = [
|
|
{'beam_half_core_thickness': 2.0, 'beam_face_thickness': 1.0, 'holes_diameter': 5.0, 'hole_count': 10},
|
|
{'beam_half_core_thickness': 5.0, 'beam_face_thickness': 2.0, 'holes_diameter': 20.0, 'hole_count': 8},
|
|
{'beam_half_core_thickness': 1.0, 'beam_face_thickness': 0.5, 'holes_diameter': 2.0, 'hole_count': 6}, # Low data region
|
|
]
|
|
|
|
for i, design in enumerate(test_designs):
|
|
pred = surrogate.predict(design)
|
|
print(f"\nDesign {i+1}: {design}")
|
|
print(f" Mass: {pred['mass']:.1f}g +/- {pred['mass_uncertainty']:.1f}g")
|
|
print(f" Freq: {pred['frequency']:.1f}Hz +/- {pred['frequency_uncertainty']:.1f}Hz")
|
|
print(f" Total Uncertainty: {pred['total_uncertainty']:.3f}")
|
|
|
|
# Save model
|
|
save_path = project_root / "active_learning_surrogate.pt"
|
|
surrogate.save(str(save_path))
|
|
print(f"\nSaved to {save_path}")
|
|
|
|
# Get confidence report
|
|
print("\n" + "="*60)
|
|
print("Confidence Report")
|
|
print("="*60)
|
|
report = surrogate.get_confidence_report()
|
|
for k, v in report.items():
|
|
print(f" {k}: {v}")
|