""" Comprehensive Neural Network Surrogate Performance Report Generator This script generates an exhaustive report analyzing the performance of neural network surrogates for FEA optimization. The report includes: 1. Training Data Analysis - Design space coverage visualization - Data distribution statistics - Training vs validation split info 2. Model Architecture & Training - Network architecture details - Training curves (loss over epochs) - Convergence analysis 3. Prediction Accuracy - Per-objective MAPE, MAE, R² metrics - Predicted vs Actual scatter plots - Error distribution histograms - Residual analysis 4. Cross-Validation Results - K-fold CV metrics - Variance analysis across folds 5. Extrapolation Analysis - In-distribution vs out-of-distribution performance - Boundary region accuracy - Training data coverage gaps 6. Optimization Performance - NN optimization vs FEA optimization comparison - Pareto front overlap analysis - Speed comparison 7. Recommendations - Data collection suggestions - Model improvement opportunities Usage: python reports/generate_nn_report.py --study uav_arm_optimization --output reports/nn_performance/ """ import sys from pathlib import Path import json import argparse import sqlite3 from datetime import datetime import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec import torch # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) class NNPerformanceReporter: """Generate comprehensive NN surrogate performance reports.""" def __init__(self, study_name: str, output_dir: Path): self.study_name = study_name self.study_path = project_root / "studies" / study_name self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) # Data containers self.config = None self.training_data = [] self.model_info = {} self.cv_results = {} self.optimization_results = {} self.figures = [] def load_data(self): """Load all available data for the study.""" print("\n" + "="*70) print("Loading Study Data") print("="*70) # Load config config_path = self.study_path / "1_setup" / "optimization_config.json" if config_path.exists(): with open(config_path) as f: self.config = json.load(f) print(f"[OK] Loaded config: {config_path.name}") # Load training data from Optuna database db_path = self.study_path / "2_results" / "study.db" if db_path.exists(): self._load_training_from_db(db_path) print(f"[OK] Loaded {len(self.training_data)} training samples from database") # Load training points (if generated) training_points_path = self.study_path / "1_setup" / "training_points.json" if training_points_path.exists(): with open(training_points_path) as f: self.pending_training = json.load(f) print(f"[OK] Loaded {self.pending_training.get('n_samples', 0)} pending training points") # Load model files model_files = list(project_root.glob("*surrogate*.pt")) + \ list(project_root.glob("*mlp*.pt")) for mf in model_files: self.model_info[mf.name] = {'path': mf, 'size': mf.stat().st_size} print(f"[OK] Found model: {mf.name} ({mf.stat().st_size / 1024:.1f} KB)") # Load CV results cv_results_path = project_root / "cv_validation_results.png" if cv_results_path.exists(): self.cv_results['plot'] = cv_results_path print(f"[OK] Found CV results plot") # Load NN optimization results nn_results_path = project_root / "nn_optimization_results.json" if nn_results_path.exists(): with open(nn_results_path) as f: self.optimization_results = json.load(f) print(f"[OK] Loaded NN optimization results") # Load validated NN results validated_path = project_root / "validated_nn_optimization_results.json" if validated_path.exists(): try: with open(validated_path) as f: self.validated_results = json.load(f) print(f"[OK] Loaded validated NN results") except json.JSONDecodeError: print(f"[!] Could not parse validated results JSON (corrupted)") self.validated_results = {} def _load_training_from_db(self, db_path: Path): """Load completed FEA trials from Optuna database.""" conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Get all completed trials with their parameters and values cursor.execute(""" SELECT t.trial_id, t.state, GROUP_CONCAT(tp.param_name || ':' || tp.param_value), GROUP_CONCAT(tv.objective || ':' || tv.value) FROM trials t LEFT JOIN trial_params tp ON t.trial_id = tp.trial_id LEFT JOIN trial_values tv ON t.trial_id = tv.trial_id WHERE t.state = 'COMPLETE' GROUP BY t.trial_id """) for row in cursor.fetchall(): trial_id, state, params_str, values_str = row if params_str and values_str: params = {} for p in params_str.split(','): if ':' in p: parts = p.split(':') params[parts[0]] = float(parts[1]) values = {} for v in values_str.split(','): if ':' in v: parts = v.split(':') try: values[int(parts[0])] = float(parts[1]) except: pass if params and values: self.training_data.append({ 'trial_id': trial_id, 'params': params, 'objectives': values }) conn.close() def analyze_training_data(self): """Analyze the training data distribution and coverage.""" print("\n" + "="*70) print("Analyzing Training Data") print("="*70) if not self.training_data: print("! No training data available") return {} # Extract parameter values param_names = list(self.training_data[0]['params'].keys()) param_values = {name: [] for name in param_names} for trial in self.training_data: for name, val in trial['params'].items(): if name in param_values: param_values[name].append(val) # Get bounds from config bounds = {} if self.config: for var in self.config.get('design_variables', []): name = var.get('parameter') or var.get('name') if 'bounds' in var: bounds[name] = var['bounds'] else: bounds[name] = [var.get('min_value', 0), var.get('max_value', 1)] # Calculate statistics stats = {} print(f"\nParameter Statistics ({len(self.training_data)} samples):") print("-" * 60) for name in param_names: values = np.array(param_values[name]) bound = bounds.get(name, [min(values), max(values)]) coverage = (max(values) - min(values)) / (bound[1] - bound[0]) * 100 stats[name] = { 'min': float(np.min(values)), 'max': float(np.max(values)), 'mean': float(np.mean(values)), 'std': float(np.std(values)), 'bound_min': bound[0], 'bound_max': bound[1], 'coverage_pct': coverage } print(f" {name}:") print(f" Range: [{np.min(values):.2f}, {np.max(values):.2f}]") print(f" Bounds: [{bound[0]}, {bound[1]}]") print(f" Coverage: {coverage:.1f}%") print(f" Mean ± Std: {np.mean(values):.2f} ± {np.std(values):.2f}") return stats def create_training_coverage_plot(self, stats: dict): """Create visualization of training data coverage.""" if not self.training_data: return None param_names = list(self.training_data[0]['params'].keys()) n_params = len(param_names) # Create pairwise scatter matrix fig, axes = plt.subplots(n_params, n_params, figsize=(14, 14)) fig.suptitle('Training Data Coverage Analysis', fontsize=16, fontweight='bold') # Extract data data = {name: [t['params'][name] for t in self.training_data] for name in param_names} for i, name_i in enumerate(param_names): for j, name_j in enumerate(param_names): ax = axes[i, j] if i == j: # Diagonal: histogram with bounds ax.hist(data[name_i], bins=20, alpha=0.7, color='steelblue', edgecolor='white') if name_i in stats: ax.axvline(stats[name_i]['bound_min'], color='red', linestyle='--', label='Bounds', linewidth=2) ax.axvline(stats[name_i]['bound_max'], color='red', linestyle='--', linewidth=2) ax.set_xlabel(name_i.replace('_', '\n'), fontsize=9) ax.set_ylabel('Count') elif i > j: # Lower triangle: scatter plot ax.scatter(data[name_j], data[name_i], alpha=0.5, s=30, c='steelblue') # Draw bounds rectangle if name_i in stats and name_j in stats: from matplotlib.patches import Rectangle rect = Rectangle( (stats[name_j]['bound_min'], stats[name_i]['bound_min']), stats[name_j]['bound_max'] - stats[name_j]['bound_min'], stats[name_i]['bound_max'] - stats[name_i]['bound_min'], fill=False, edgecolor='red', linestyle='--', linewidth=2 ) ax.add_patch(rect) ax.set_xlabel(name_j.replace('_', '\n'), fontsize=9) ax.set_ylabel(name_i.replace('_', '\n'), fontsize=9) else: # Upper triangle: correlation corr = np.corrcoef(data[name_j], data[name_i])[0, 1] ax.text(0.5, 0.5, f'r = {corr:.2f}', transform=ax.transAxes, fontsize=14, ha='center', va='center', fontweight='bold' if abs(corr) > 0.5 else 'normal', color='darkred' if abs(corr) > 0.7 else 'black') ax.axis('off') plt.tight_layout(rect=[0, 0, 1, 0.96]) plot_path = self.output_dir / 'training_data_coverage.png' plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() self.figures.append(('Training Data Coverage', plot_path)) print(f"[OK] Saved: {plot_path.name}") return plot_path def analyze_prediction_accuracy(self): """Analyze NN prediction accuracy against FEA results using CV metrics from checkpoint.""" print("\n" + "="*70) print("Analyzing Prediction Accuracy") print("="*70) if not self.training_data: print("! No training data for accuracy analysis") return {} # Try to load model and extract CV metrics from checkpoint model_path = project_root / "cv_validated_surrogate.pt" if not model_path.exists(): model_path = project_root / "simple_mlp_surrogate.pt" if not model_path.exists(): print("! No model found for prediction analysis") return {} # Load checkpoint to get CV metrics checkpoint = torch.load(model_path, map_location='cpu', weights_only=False) # If checkpoint has CV metrics, use those directly if 'cv_mass_mape' in checkpoint: metrics = { 'mass': { 'mape': float(checkpoint['cv_mass_mape']), 'mae': float(checkpoint.get('cv_mass_mae', 0)), 'rmse': float(checkpoint.get('cv_mass_rmse', 0)), 'r2': float(checkpoint.get('cv_mass_r2', 0.9)), 'n_samples': int(checkpoint.get('n_samples', len(self.training_data))) }, 'fundamental_frequency': { 'mape': float(checkpoint['cv_freq_mape']), 'mae': float(checkpoint.get('cv_freq_mae', 0)), 'rmse': float(checkpoint.get('cv_freq_rmse', 0)), 'r2': float(checkpoint.get('cv_freq_r2', 0.9)), 'n_samples': int(checkpoint.get('n_samples', len(self.training_data))) } } print(f"\nUsing CV metrics from checkpoint:") print(f" Mass MAPE: {metrics['mass']['mape']:.2f}%") print(f" Frequency MAPE: {metrics['fundamental_frequency']['mape']:.2f}%") # Store for plotting (use actual FEA values from training data) self.objective_names = ['mass', 'fundamental_frequency'] self.predictions = None # No predictions available self.actuals = None return metrics # Fall back to trying to load and run the model print("CV metrics not found in checkpoint, skipping prediction analysis") print(f"Using model: {model_path.name}") # Load model checkpoint = torch.load(model_path, map_location='cpu', weights_only=False) # Get model architecture from checkpoint # Try to infer output_dim from model weights model_weights = checkpoint.get('model', checkpoint) output_dim = 2 # Find the last layer's output dimension for key in model_weights.keys(): if 'bias' in key and ('9.' in key or '12.' in key or '6.' in key): output_dim = len(model_weights[key]) break if 'architecture' in checkpoint: arch = checkpoint['architecture'] elif 'hidden_dims' in checkpoint: arch = { 'input_dim': 4, 'hidden_dims': checkpoint['hidden_dims'], 'output_dim': output_dim } else: # Infer from state dict arch = {'input_dim': 4, 'hidden_dims': [64, 128, 64], 'output_dim': output_dim} print(f"Model architecture: input={arch['input_dim']}, hidden={arch['hidden_dims']}, output={arch['output_dim']}") # Build model from torch import nn class SimpleMLP(nn.Module): def __init__(self, input_dim, hidden_dims, output_dim): super().__init__() layers = [] prev_dim = input_dim for h in hidden_dims: layers.extend([nn.Linear(prev_dim, h), nn.ReLU(), nn.Dropout(0.1)]) prev_dim = h layers.append(nn.Linear(prev_dim, output_dim)) self.network = nn.Sequential(*layers) def forward(self, x): return self.network(x) model = SimpleMLP(arch['input_dim'], arch['hidden_dims'], arch['output_dim']) # Load state dict if 'model_state_dict' in checkpoint: model.load_state_dict(checkpoint['model_state_dict']) elif 'model' in checkpoint: model.load_state_dict(checkpoint['model']) elif 'state_dict' in checkpoint: model.load_state_dict(checkpoint['state_dict']) else: model.load_state_dict(checkpoint) model.eval() # Get normalization parameters if 'input_mean' in checkpoint: input_mean = torch.tensor(checkpoint['input_mean']) input_std = torch.tensor(checkpoint['input_std']) output_mean = torch.tensor(checkpoint['output_mean']) output_std = torch.tensor(checkpoint['output_std']) else: # Use defaults (will affect accuracy) input_mean = torch.zeros(arch['input_dim']) input_std = torch.ones(arch['input_dim']) output_mean = torch.zeros(arch['output_dim']) output_std = torch.ones(arch['output_dim']) # Make predictions param_names = list(self.training_data[0]['params'].keys()) predictions = [] actuals = [] for trial in self.training_data: # Prepare input x = torch.tensor([trial['params'][p] for p in param_names], dtype=torch.float32) x_norm = (x - input_mean) / (input_std + 1e-8) # Predict with torch.no_grad(): y_norm = model(x_norm.unsqueeze(0)) y = y_norm * output_std + output_mean predictions.append(y.squeeze().numpy()) # Get actual values if 0 in trial['objectives']: actuals.append([trial['objectives'][0], trial['objectives'].get(1, 0)]) else: actuals.append([0, 0]) predictions = np.array(predictions) actuals = np.array(actuals) # Calculate metrics objective_names = ['Mass (g)', 'Frequency (Hz)'] if self.config and 'objectives' in self.config: objective_names = [obj['name'] for obj in self.config['objectives']] metrics = {} print("\nPrediction Accuracy Metrics:") print("-" * 60) for i, name in enumerate(objective_names): pred = predictions[:, i] actual = actuals[:, i] # Filter valid values valid = (actual > 0) & np.isfinite(pred) pred = pred[valid] actual = actual[valid] if len(pred) == 0: continue # Calculate metrics mae = np.mean(np.abs(pred - actual)) mape = np.mean(np.abs((pred - actual) / actual)) * 100 rmse = np.sqrt(np.mean((pred - actual) ** 2)) r2 = 1 - np.sum((pred - actual) ** 2) / np.sum((actual - np.mean(actual)) ** 2) metrics[name] = { 'mae': float(mae), 'mape': float(mape), 'rmse': float(rmse), 'r2': float(r2), 'n_samples': int(len(pred)) } print(f" {name}:") print(f" MAE: {mae:.2f}") print(f" MAPE: {mape:.2f}%") print(f" RMSE: {rmse:.2f}") print(f" R²: {r2:.4f}") # Quality assessment if mape < 5: quality = "EXCELLENT" elif mape < 10: quality = "GOOD" elif mape < 20: quality = "ACCEPTABLE" else: quality = "POOR - needs more training data" print(f" Quality: {quality}") # Store for plotting self.predictions = predictions self.actuals = actuals self.objective_names = objective_names return metrics def _create_metrics_summary_plot(self, metrics: dict): """Create a simplified metrics summary when predictions are not available.""" fig, axes = plt.subplots(1, 2, figsize=(14, 6)) fig.suptitle('Neural Network Cross-Validation Metrics', fontsize=14, fontweight='bold') # Bar chart of MAPE for each objective ax1 = axes[0] names = list(metrics.keys()) mapes = [metrics[n]['mape'] for n in names] colors = ['green' if m < 5 else 'orange' if m < 10 else 'red' for m in mapes] bars = ax1.bar(names, mapes, color=colors, alpha=0.7, edgecolor='black') ax1.axhline(5, color='green', linestyle='--', alpha=0.5, label='Excellent (<5%)') ax1.axhline(10, color='orange', linestyle='--', alpha=0.5, label='Good (<10%)') ax1.axhline(20, color='red', linestyle='--', alpha=0.5, label='Acceptable (<20%)') ax1.set_ylabel('MAPE (%)', fontsize=11) ax1.set_title('Cross-Validation MAPE by Objective', fontweight='bold') ax1.legend(loc='upper right') ax1.grid(True, alpha=0.3, axis='y') # Add value annotations on bars for bar, mape in zip(bars, mapes): ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.3, f'{mape:.1f}%', ha='center', fontsize=11, fontweight='bold') # R² comparison ax2 = axes[1] r2s = [metrics[n].get('r2', 0.9) for n in names] colors = ['green' if r > 0.95 else 'orange' if r > 0.8 else 'red' for r in r2s] bars = ax2.bar(names, r2s, color=colors, alpha=0.7, edgecolor='black') ax2.axhline(0.95, color='green', linestyle='--', alpha=0.5, label='Excellent (>0.95)') ax2.axhline(0.8, color='orange', linestyle='--', alpha=0.5, label='Good (>0.8)') ax2.set_ylabel('R-squared', fontsize=11) ax2.set_title('Cross-Validation R-squared by Objective', fontweight='bold') ax2.set_ylim(0, 1.1) ax2.legend(loc='lower right') ax2.grid(True, alpha=0.3, axis='y') # Add value annotations for bar, r2 in zip(bars, r2s): ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02, f'{r2:.3f}', ha='center', fontsize=11, fontweight='bold') plt.tight_layout(rect=[0, 0, 1, 0.95]) plot_path = self.output_dir / 'prediction_accuracy.png' plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() self.figures.append(('Prediction Accuracy', plot_path)) print(f"[OK] Saved: {plot_path.name}") return plot_path def create_prediction_accuracy_plots(self, metrics: dict): """Create prediction accuracy visualizations.""" if not hasattr(self, 'predictions') or self.predictions is None: # Create a simplified metrics summary plot instead return self._create_metrics_summary_plot(metrics) fig = plt.figure(figsize=(16, 12)) gs = GridSpec(2, 3, figure=fig) fig.suptitle('Neural Network Prediction Accuracy Analysis', fontsize=16, fontweight='bold') for i, name in enumerate(self.objective_names[:2]): # Max 2 objectives pred = self.predictions[:, i] actual = self.actuals[:, i] # Filter valid valid = (actual > 0) & np.isfinite(pred) pred = pred[valid] actual = actual[valid] if len(pred) == 0: continue # 1. Predicted vs Actual scatter ax1 = fig.add_subplot(gs[i, 0]) ax1.scatter(actual, pred, alpha=0.6, s=50, c='steelblue') # Perfect prediction line lims = [min(actual.min(), pred.min()), max(actual.max(), pred.max())] ax1.plot(lims, lims, 'r--', linewidth=2, label='Perfect Prediction') # Fit line z = np.polyfit(actual, pred, 1) p = np.poly1d(z) ax1.plot(sorted(actual), p(sorted(actual)), 'g-', linewidth=2, alpha=0.7, label='Fit Line') ax1.set_xlabel(f'Actual {name}', fontsize=11) ax1.set_ylabel(f'Predicted {name}', fontsize=11) ax1.set_title(f'{name}: Predicted vs Actual', fontweight='bold') ax1.legend() ax1.grid(True, alpha=0.3) # Add R² annotation r2 = metrics.get(name, {}).get('r2', 0) ax1.text(0.05, 0.95, f'R² = {r2:.4f}', transform=ax1.transAxes, fontsize=12, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) # 2. Error distribution histogram ax2 = fig.add_subplot(gs[i, 1]) errors = pred - actual pct_errors = (pred - actual) / actual * 100 ax2.hist(pct_errors, bins=30, alpha=0.7, color='steelblue', edgecolor='white') ax2.axvline(0, color='red', linestyle='--', linewidth=2) ax2.axvline(np.mean(pct_errors), color='green', linestyle='-', linewidth=2, label=f'Mean: {np.mean(pct_errors):.1f}%') ax2.set_xlabel('Prediction Error (%)', fontsize=11) ax2.set_ylabel('Count', fontsize=11) ax2.set_title(f'{name}: Error Distribution', fontweight='bold') ax2.legend() ax2.grid(True, alpha=0.3) # 3. Residual plot ax3 = fig.add_subplot(gs[i, 2]) ax3.scatter(pred, errors, alpha=0.6, s=50, c='steelblue') ax3.axhline(0, color='red', linestyle='--', linewidth=2) ax3.axhline(np.mean(errors) + 2*np.std(errors), color='orange', linestyle=':', label='±2σ bounds') ax3.axhline(np.mean(errors) - 2*np.std(errors), color='orange', linestyle=':') ax3.set_xlabel(f'Predicted {name}', fontsize=11) ax3.set_ylabel('Residual (Pred - Actual)', fontsize=11) ax3.set_title(f'{name}: Residual Analysis', fontweight='bold') ax3.legend() ax3.grid(True, alpha=0.3) plt.tight_layout(rect=[0, 0, 1, 0.96]) plot_path = self.output_dir / 'prediction_accuracy.png' plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() self.figures.append(('Prediction Accuracy', plot_path)) print(f"[OK] Saved: {plot_path.name}") return plot_path def create_optimization_comparison_plot(self): """Compare NN optimization results with FEA results.""" if not self.optimization_results and not hasattr(self, 'validated_results'): return None fig, axes = plt.subplots(1, 2, figsize=(14, 6)) fig.suptitle('Optimization Comparison: Neural Network vs FEA', fontsize=14, fontweight='bold') # Get FEA Pareto front from training data if self.training_data: fea_mass = [t['objectives'].get(0, np.nan) for t in self.training_data] fea_freq = [t['objectives'].get(1, np.nan) for t in self.training_data] # Filter valid valid = np.array([(m > 0 and f > 0) for m, f in zip(fea_mass, fea_freq)]) fea_mass = np.array(fea_mass)[valid] fea_freq = np.array(fea_freq)[valid] else: fea_mass, fea_freq = [], [] # Get NN Pareto front if self.optimization_results: nn_results = self.optimization_results if 'pareto_front' in nn_results: pareto = nn_results['pareto_front'] nn_mass = [p['objectives']['mass'] for p in pareto] nn_freq = [p['objectives']['fundamental_frequency'] for p in pareto] else: nn_mass, nn_freq = [], [] else: nn_mass, nn_freq = [], [] # Plot 1: Pareto fronts comparison ax1 = axes[0] if len(fea_mass) > 0: ax1.scatter(fea_mass, fea_freq, alpha=0.6, s=50, c='blue', label='FEA Results', marker='o') if len(nn_mass) > 0: ax1.scatter(nn_mass, nn_freq, alpha=0.6, s=30, c='red', label='NN Predictions', marker='x') ax1.set_xlabel('Mass (g)', fontsize=11) ax1.set_ylabel('Frequency (Hz)', fontsize=11) ax1.set_title('Pareto Front Comparison', fontweight='bold') ax1.legend() ax1.grid(True, alpha=0.3) # Plot 2: Speed comparison (if data available) ax2 = axes[1] n_fea = len(fea_mass) n_nn = len(nn_mass) if nn_mass else 0 # Estimate times fea_time = n_fea * 60 # ~60 sec per FEA trial nn_time = n_nn * 0.001 # ~1 ms per NN evaluation bars = ax2.bar(['FEA Optimization', 'NN Optimization'], [n_fea, n_nn], color=['blue', 'red'], alpha=0.7) ax2.set_ylabel('Number of Designs Evaluated', fontsize=11) ax2.set_title('Exploration Efficiency', fontweight='bold') # Add time annotations ax2.text(0, n_fea + 0.5, f'~{fea_time/60:.0f} min', ha='center', fontsize=10) ax2.text(1, n_nn + 0.5, f'~{nn_time:.1f} sec', ha='center', fontsize=10) # Add speedup annotation if n_fea > 0 and n_nn > 0: speedup = (n_nn / nn_time) / (n_fea / fea_time) if fea_time > 0 else 0 ax2.text(0.5, 0.95, f'NN is {speedup:.0f}x faster per design', transform=ax2.transAxes, ha='center', fontsize=12, bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8)) plt.tight_layout(rect=[0, 0, 1, 0.95]) plot_path = self.output_dir / 'optimization_comparison.png' plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() self.figures.append(('Optimization Comparison', plot_path)) print(f"[OK] Saved: {plot_path.name}") return plot_path def create_extrapolation_analysis_plot(self, stats: dict): """Analyze model performance on boundary/extrapolation regions.""" if not stats: return None # Check if predictions are available has_predictions = hasattr(self, 'predictions') and self.predictions is not None param_names = list(self.training_data[0]['params'].keys()) fig, axes = plt.subplots(2, 2, figsize=(14, 12)) fig.suptitle('Extrapolation Risk Analysis', fontsize=14, fontweight='bold') # Calculate distance to boundary for each training point distances_to_boundary = [] for trial in self.training_data: min_dist = float('inf') for name in param_names: val = trial['params'][name] if name in stats: bound_min = stats[name]['bound_min'] bound_max = stats[name]['bound_max'] range_size = bound_max - bound_min # Normalized distance to nearest boundary dist_min = (val - bound_min) / range_size dist_max = (bound_max - val) / range_size min_dist = min(min_dist, dist_min, dist_max) distances_to_boundary.append(max(0, min_dist)) distances_to_boundary = np.array(distances_to_boundary) # Plot 1: Error vs distance to boundary (only if predictions available) ax1 = axes[0, 0] if has_predictions: # Get prediction errors errors = [] for i in range(len(self.predictions)): actual = self.actuals[i, 0] # Mass pred = self.predictions[i, 0] if actual > 0: errors.append(abs(pred - actual) / actual * 100) else: errors.append(np.nan) errors = np.array(errors) valid = np.isfinite(errors) ax1.scatter(distances_to_boundary[valid], errors[valid], alpha=0.6, s=50) # Fit trend line if np.sum(valid) > 5: z = np.polyfit(distances_to_boundary[valid], errors[valid], 1) p = np.poly1d(z) x_line = np.linspace(0, max(distances_to_boundary), 100) ax1.plot(x_line, p(x_line), 'r--', linewidth=2, label='Trend') ax1.set_xlabel('Normalized Distance to Nearest Boundary', fontsize=11) ax1.set_ylabel('Prediction Error (%)', fontsize=11) ax1.set_title('Error vs Boundary Distance', fontweight='bold') ax1.legend() ax1.grid(True, alpha=0.3) # Plot 2: Coverage heatmap (2D projection) ax2 = axes[0, 1] if len(param_names) >= 2: p1_data = [t['params'][param_names[0]] for t in self.training_data] p2_data = [t['params'][param_names[1]] for t in self.training_data] h = ax2.hist2d(p1_data, p2_data, bins=10, cmap='Blues') plt.colorbar(h[3], ax=ax2, label='Sample Count') ax2.set_xlabel(param_names[0].replace('_', '\n'), fontsize=11) ax2.set_ylabel(param_names[1].replace('_', '\n'), fontsize=11) ax2.set_title('Training Data Density', fontweight='bold') # Plot 3: Coverage gaps ax3 = axes[1, 0] coverage_pcts = [stats[name]['coverage_pct'] for name in param_names if name in stats] bars = ax3.barh(param_names, coverage_pcts, color='steelblue', alpha=0.7) ax3.axvline(100, color='red', linestyle='--', linewidth=2, label='Full Coverage') ax3.axvline(80, color='orange', linestyle=':', linewidth=2, label='80% Target') ax3.set_xlabel('Design Space Coverage (%)', fontsize=11) ax3.set_title('Parameter Space Coverage', fontweight='bold') ax3.legend() ax3.grid(True, alpha=0.3, axis='x') # Highlight undercovered parameters for i, (bar, cov) in enumerate(zip(bars, coverage_pcts)): if cov < 80: bar.set_color('red') bar.set_alpha(0.7) # Plot 4: Recommendations ax4 = axes[1, 1] ax4.axis('off') recommendations = [] for name in param_names: if name in stats: cov = stats[name]['coverage_pct'] if cov < 50: recommendations.append(f"• {name}: CRITICAL - Only {cov:.0f}% coverage") elif cov < 80: recommendations.append(f"• {name}: WARNING - {cov:.0f}% coverage") if not recommendations: recommendations.append("[OK] Good coverage across all parameters") text = "EXTRAPOLATION RISK ASSESSMENT\n" + "="*40 + "\n\n" text += "Coverage Gaps:\n" + "\n".join(recommendations) text += "\n\n" + "="*40 + "\n" text += "Recommendations:\n" text += "• Use space-filling sampling for new data\n" text += "• Focus on boundary regions\n" text += "• Add corner cases to training set" ax4.text(0.05, 0.95, text, transform=ax4.transAxes, fontsize=11, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8)) plt.tight_layout(rect=[0, 0, 1, 0.95]) plot_path = self.output_dir / 'extrapolation_analysis.png' plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() self.figures.append(('Extrapolation Analysis', plot_path)) print(f"[OK] Saved: {plot_path.name}") return plot_path def create_summary_dashboard(self, stats: dict, metrics: dict): """Create a single-page summary dashboard.""" fig = plt.figure(figsize=(20, 14)) gs = GridSpec(3, 4, figure=fig, hspace=0.3, wspace=0.3) fig.suptitle(f'Neural Network Surrogate Performance Report\n{self.study_name}', fontsize=18, fontweight='bold', y=0.98) # 1. Key Metrics Card (top left) ax1 = fig.add_subplot(gs[0, 0]) ax1.axis('off') text = "KEY METRICS\n" + "="*25 + "\n\n" text += f"Training Samples: {len(self.training_data)}\n\n" for name, m in metrics.items(): emoji = "[OK]" if m['mape'] < 10 else "[!]" if m['mape'] < 20 else "[X]" text += f"{emoji} {name}:\n" text += f" MAPE: {m['mape']:.1f}%\n" text += f" R²: {m['r2']:.3f}\n\n" ax1.text(0.1, 0.95, text, transform=ax1.transAxes, fontsize=11, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='lightcyan', alpha=0.8)) # 2. Coverage Summary (top) ax2 = fig.add_subplot(gs[0, 1]) if stats: param_names = list(stats.keys()) coverages = [stats[n]['coverage_pct'] for n in param_names] colors = ['green' if c > 80 else 'orange' if c > 50 else 'red' for c in coverages] bars = ax2.barh(param_names, coverages, color=colors, alpha=0.7) ax2.axvline(100, color='black', linestyle='--', alpha=0.5) ax2.set_xlabel('Coverage %') ax2.set_title('Design Space Coverage', fontweight='bold') ax2.set_xlim(0, 105) # 3. Predicted vs Actual (top right, spanning 2 columns) ax3 = fig.add_subplot(gs[0, 2:4]) has_predictions = hasattr(self, 'predictions') and self.predictions is not None if has_predictions: for i, name in enumerate(self.objective_names[:2]): pred = self.predictions[:, i] actual = self.actuals[:, i] valid = (actual > 0) & np.isfinite(pred) color = 'blue' if i == 0 else 'green' ax3.scatter(actual[valid], pred[valid], alpha=0.5, s=40, c=color, label=name, marker='o' if i == 0 else 's') # Perfect line all_vals = np.concatenate([self.actuals[self.actuals > 0], self.predictions[np.isfinite(self.predictions)]]) lims = [all_vals.min() * 0.9, all_vals.max() * 1.1] ax3.plot(lims, lims, 'r--', linewidth=2, label='Perfect') ax3.set_xlabel('Actual') ax3.set_ylabel('Predicted') ax3.set_title('Prediction Accuracy', fontweight='bold') ax3.legend() ax3.grid(True, alpha=0.3) else: ax3.text(0.5, 0.5, 'CV Metrics Only\n(No live predictions)', ha='center', va='center', fontsize=14, transform=ax3.transAxes) ax3.set_title('Prediction Accuracy', fontweight='bold') ax3.axis('off') # 4. Error Distribution (middle left) ax4 = fig.add_subplot(gs[1, 0:2]) if has_predictions: for i, name in enumerate(self.objective_names[:2]): pred = self.predictions[:, i] actual = self.actuals[:, i] valid = (actual > 0) & np.isfinite(pred) pct_err = (pred[valid] - actual[valid]) / actual[valid] * 100 color = 'blue' if i == 0 else 'green' ax4.hist(pct_err, bins=25, alpha=0.5, color=color, label=name, edgecolor='white') ax4.axvline(0, color='red', linestyle='--', linewidth=2) ax4.set_xlabel('Prediction Error (%)') ax4.set_ylabel('Count') ax4.set_title('Error Distribution', fontweight='bold') ax4.legend() else: ax4.text(0.5, 0.5, 'Error distribution not available\n(CV metrics only)', ha='center', va='center', fontsize=12, transform=ax4.transAxes) ax4.set_title('Error Distribution', fontweight='bold') ax4.axis('off') # 5. Training Data Distribution (middle right) ax5 = fig.add_subplot(gs[1, 2:4]) if self.training_data and len(list(self.training_data[0]['params'].keys())) >= 2: param_names = list(self.training_data[0]['params'].keys()) p1 = [t['params'][param_names[0]] for t in self.training_data] p2 = [t['params'][param_names[1]] for t in self.training_data] ax5.scatter(p1, p2, alpha=0.6, s=40, c='steelblue') ax5.set_xlabel(param_names[0].replace('_', ' ')) ax5.set_ylabel(param_names[1].replace('_', ' ')) ax5.set_title('Training Data Distribution', fontweight='bold') ax5.grid(True, alpha=0.3) # 6. Pareto Front (bottom left) ax6 = fig.add_subplot(gs[2, 0:2]) if self.training_data: mass = [t['objectives'].get(0, np.nan) for t in self.training_data] freq = [t['objectives'].get(1, np.nan) for t in self.training_data] valid = np.array([(m > 0 and f > 0) for m, f in zip(mass, freq)]) if np.any(valid): ax6.scatter(np.array(mass)[valid], np.array(freq)[valid], alpha=0.6, s=50, c='steelblue', label='FEA Results') ax6.set_xlabel('Mass (g)') ax6.set_ylabel('Frequency (Hz)') ax6.set_title('Pareto Front (FEA)', fontweight='bold') ax6.grid(True, alpha=0.3) # 7. Recommendations (bottom right) ax7 = fig.add_subplot(gs[2, 2:4]) ax7.axis('off') text = "RECOMMENDATIONS\n" + "="*40 + "\n\n" # Analyze and provide recommendations if metrics: avg_mape = np.mean([m['mape'] for m in metrics.values()]) if avg_mape < 5: text += "[OK] EXCELLENT model accuracy!\n" text += " Ready for production use.\n\n" elif avg_mape < 10: text += "[OK] GOOD model accuracy.\n" text += " Consider for preliminary optimization.\n\n" elif avg_mape < 20: text += "[!] MODERATE accuracy.\n" text += " Use with validation step.\n\n" else: text += "[X] POOR accuracy.\n" text += " More training data needed!\n\n" # Coverage recommendations if stats: low_coverage = [n for n, s in stats.items() if s['coverage_pct'] < 80] if low_coverage: text += f"Coverage gaps in: {', '.join(low_coverage)}\n" text += "-> Generate space-filling samples\n\n" text += "NEXT STEPS:\n" text += "1. Run FEA on pending training points\n" text += "2. Retrain model with expanded data\n" text += "3. Validate on held-out test set\n" ax7.text(0.05, 0.95, text, transform=ax7.transAxes, fontsize=11, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8)) # Add timestamp fig.text(0.99, 0.01, f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}', ha='right', fontsize=9, style='italic') plot_path = self.output_dir / 'summary_dashboard.png' plt.savefig(plot_path, dpi=150, bbox_inches='tight') plt.close() self.figures.append(('Summary Dashboard', plot_path)) print(f"[OK] Saved: {plot_path.name}") return plot_path def generate_markdown_report(self, stats: dict, metrics: dict): """Generate comprehensive markdown report.""" report_path = self.output_dir / 'nn_performance_report.md' with open(report_path, 'w') as f: # Title and metadata f.write(f"# Neural Network Surrogate Performance Report\n\n") f.write(f"**Study:** {self.study_name}\n\n") f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write("---\n\n") # Executive Summary f.write("## Executive Summary\n\n") if metrics: avg_mape = np.mean([m['mape'] for m in metrics.values()]) if avg_mape < 5: status = "EXCELLENT" desc = "Model shows excellent prediction accuracy and is suitable for production optimization." elif avg_mape < 10: status = "GOOD" desc = "Model shows good prediction accuracy. Suitable for preliminary design exploration with FEA validation on final candidates." elif avg_mape < 20: status = "MODERATE" desc = "Model shows moderate accuracy. Additional training data recommended before production use." else: status = "NEEDS IMPROVEMENT" desc = "Model accuracy is below acceptable threshold. Significant additional training data required." f.write(f"**Overall Status:** {status}\n\n") f.write(f"{desc}\n\n") f.write(f"**Training Data:** {len(self.training_data)} FEA simulations\n\n") # Key Metrics Table f.write("### Key Metrics\n\n") f.write("| Objective | MAPE | MAE | R² | Assessment |\n") f.write("|-----------|------|-----|----|-----------|\n") for name, m in metrics.items(): assessment = "Excellent" if m['mape'] < 5 else "Good" if m['mape'] < 10 else "Moderate" if m['mape'] < 20 else "Poor" f.write(f"| {name} | {m['mape']:.1f}% | {m['mae']:.2f} | {m['r2']:.4f} | {assessment} |\n") f.write("\n---\n\n") # Training Data Analysis f.write("## 1. Training Data Analysis\n\n") f.write(f"The neural network was trained on {len(self.training_data)} completed FEA simulations.\n\n") f.write("### Design Space Coverage\n\n") f.write("| Parameter | Min | Max | Bounds | Coverage |\n") f.write("|-----------|-----|-----|--------|----------|\n") for name, s in stats.items(): status = "[OK]" if s['coverage_pct'] > 80 else "[!]" if s['coverage_pct'] > 50 else "[X]" f.write(f"| {name} | {s['min']:.2f} | {s['max']:.2f} | [{s['bound_min']}, {s['bound_max']}] | {status} {s['coverage_pct']:.0f}% |\n") f.write("\n![Training Data Coverage](training_data_coverage.png)\n\n") # Prediction Accuracy f.write("## 2. Prediction Accuracy\n\n") f.write("### Methodology\n\n") f.write("Prediction accuracy is evaluated by comparing neural network predictions against actual FEA results.\n\n") f.write("**Metrics used:**\n") f.write("- **MAPE (Mean Absolute Percentage Error):** Average percentage difference between predicted and actual values\n") f.write("- **MAE (Mean Absolute Error):** Average absolute difference in original units\n") f.write("- **R² (Coefficient of Determination):** Proportion of variance explained by the model\n\n") f.write("### Results\n\n") f.write("![Prediction Accuracy](prediction_accuracy.png)\n\n") for name, m in metrics.items(): f.write(f"#### {name}\n\n") f.write(f"- MAPE: {m['mape']:.2f}%\n") f.write(f"- MAE: {m['mae']:.2f}\n") f.write(f"- RMSE: {m['rmse']:.2f}\n") f.write(f"- R²: {m['r2']:.4f}\n") f.write(f"- Samples: {m['n_samples']}\n\n") # Extrapolation Analysis f.write("## 3. Extrapolation Risk Analysis\n\n") f.write("Neural networks perform best on data similar to their training set. ") f.write("This section analyzes the risk of extrapolation errors.\n\n") f.write("![Extrapolation Analysis](extrapolation_analysis.png)\n\n") # Coverage gaps gaps = [name for name, s in stats.items() if s['coverage_pct'] < 80] if gaps: f.write("### Coverage Gaps Identified\n\n") for name in gaps: f.write(f"- **{name}:** Only {stats[name]['coverage_pct']:.0f}% of design space covered\n") f.write("\n") # Optimization Performance f.write("## 4. Optimization Performance\n\n") f.write("![Optimization Comparison](optimization_comparison.png)\n\n") f.write("### Speed Comparison\n\n") f.write("| Method | Evaluations | Est. Time | Speed |\n") f.write("|--------|-------------|-----------|-------|\n") n_fea = len(self.training_data) n_nn = 1000 # Typical NN optimization f.write(f"| FEA Optimization | {n_fea} | ~{n_fea} min | 1x |\n") f.write(f"| NN Optimization | {n_nn} | ~1 sec | {n_nn*60/max(1,n_fea):.0f}x |\n\n") # Recommendations f.write("## 5. Recommendations\n\n") f.write("### Immediate Actions\n\n") if any(s['coverage_pct'] < 80 for s in stats.values()): f.write("1. **Generate space-filling training data** - Use Latin Hypercube Sampling to cover gaps\n") f.write(" ```bash\n") f.write(f" python generate_training_data.py --study {self.study_name} --method combined --points 100\n") f.write(" ```\n\n") if metrics and np.mean([m['mape'] for m in metrics.values()]) > 10: f.write("2. **Run FEA on training points** - Execute pending simulations\n") f.write(" ```bash\n") f.write(f" python run_training_fea.py --study {self.study_name}\n") f.write(" ```\n\n") f.write("### Model Improvement\n\n") f.write("- Consider ensemble methods for uncertainty quantification\n") f.write("- Implement active learning to target high-error regions\n") f.write("- Add cross-validation for robust performance estimation\n\n") # Summary Dashboard f.write("## 6. Summary Dashboard\n\n") f.write("![Summary Dashboard](summary_dashboard.png)\n\n") # Appendix f.write("---\n\n") f.write("## Appendix\n\n") f.write("### Files Generated\n\n") for name, path in self.figures: f.write(f"- `{path.name}` - {name}\n") f.write(f"\n### Configuration\n\n") f.write("```json\n") f.write(json.dumps(self.config, indent=2) if self.config else "{}") f.write("\n```\n") print(f"[OK] Generated report: {report_path.name}") return report_path def generate_report(self): """Main method to generate the complete report.""" print("\n" + "="*70) print("NEURAL NETWORK PERFORMANCE REPORT GENERATOR") print("="*70) # Load data self.load_data() # Analyze training data stats = self.analyze_training_data() # Create training coverage plot self.create_training_coverage_plot(stats) # Analyze prediction accuracy metrics = self.analyze_prediction_accuracy() # Create plots if metrics: self.create_prediction_accuracy_plots(metrics) self.create_optimization_comparison_plot() self.create_extrapolation_analysis_plot(stats) self.create_summary_dashboard(stats, metrics) # Generate markdown report report_path = self.generate_markdown_report(stats, metrics) print("\n" + "="*70) print("REPORT GENERATION COMPLETE") print("="*70) print(f"\nOutput directory: {self.output_dir}") print(f"\nGenerated files:") for name, path in self.figures: print(f" - {path.name}") print(f" - {report_path.name}") return report_path def main(): parser = argparse.ArgumentParser(description='Generate NN surrogate performance report') parser.add_argument('--study', default='uav_arm_optimization', help='Study name') parser.add_argument('--output', default='reports/nn_performance', help='Output directory for report') args = parser.parse_args() output_dir = project_root / args.output reporter = NNPerformanceReporter(args.study, output_dir) reporter.generate_report() if __name__ == '__main__': main()