feat: Merge Atomizer-Field neural network module into main repository

Permanently integrates the Atomizer-Field GNN surrogate system:
- neural_models/: Graph Neural Network for FEA field prediction
- batch_parser.py: Parse training data from FEA exports
- train.py: Neural network training pipeline
- predict.py: Inference engine for fast predictions

This enables 600x-2200x speedup over traditional FEA by replacing
expensive simulations with millisecond neural network predictions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Antoine
2025-11-26 15:31:33 -05:00
parent a4805947d1
commit d5ffba099e
47 changed files with 18446 additions and 0 deletions

View File

@@ -0,0 +1,10 @@
"""
AtomizerField Neural Models Package
Phase 2: Neural Network Architecture for Field Prediction
This package contains neural network models for learning complete FEA field results
from mesh geometry, boundary conditions, and loads.
"""
__version__ = "2.0.0"

View File

@@ -0,0 +1,416 @@
"""
data_loader.py
Data loading pipeline for neural field training
AtomizerField Data Loader v2.0
Converts parsed FEA data (HDF5 + JSON) into PyTorch Geometric graphs for training.
Key Transformation:
Parsed FEA Data → Graph Representation → Neural Network Input
Graph structure:
- Nodes: FEA mesh nodes (with coordinates, BCs, loads)
- Edges: Element connectivity (with material properties)
- Labels: Displacement and stress fields (ground truth from FEA)
"""
import json
import h5py
import numpy as np
from pathlib import Path
import torch
from torch.utils.data import Dataset
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
import warnings
class FEAMeshDataset(Dataset):
"""
PyTorch Dataset for FEA mesh data
Loads parsed neural field data and converts to PyTorch Geometric graphs.
Each graph represents one FEA analysis case.
"""
def __init__(
self,
case_directories,
normalize=True,
include_stress=True,
cache_in_memory=False
):
"""
Initialize dataset
Args:
case_directories (list): List of paths to parsed cases
normalize (bool): Normalize node coordinates and results
include_stress (bool): Include stress in targets
cache_in_memory (bool): Load all data into RAM (faster but memory-intensive)
"""
self.case_dirs = [Path(d) for d in case_directories]
self.normalize = normalize
self.include_stress = include_stress
self.cache_in_memory = cache_in_memory
# Validate all cases exist
self.valid_cases = []
for case_dir in self.case_dirs:
if self._validate_case(case_dir):
self.valid_cases.append(case_dir)
else:
warnings.warn(f"Skipping invalid case: {case_dir}")
print(f"Loaded {len(self.valid_cases)}/{len(self.case_dirs)} valid cases")
# Cache data if requested
self.cache = {}
if cache_in_memory:
print("Caching data in memory...")
for idx in range(len(self.valid_cases)):
self.cache[idx] = self._load_case(idx)
print("Cache complete!")
# Compute normalization statistics
if normalize:
self._compute_normalization_stats()
def _validate_case(self, case_dir):
"""Check if case has required files"""
json_file = case_dir / "neural_field_data.json"
h5_file = case_dir / "neural_field_data.h5"
return json_file.exists() and h5_file.exists()
def __len__(self):
return len(self.valid_cases)
def __getitem__(self, idx):
"""
Get graph data for one case
Returns:
torch_geometric.data.Data object with:
- x: Node features [num_nodes, feature_dim]
- edge_index: Element connectivity [2, num_edges]
- edge_attr: Edge features (material props) [num_edges, edge_dim]
- y_displacement: Target displacement [num_nodes, 6]
- y_stress: Target stress [num_nodes, 6] (if include_stress)
- bc_mask: Boundary condition mask [num_nodes, 6]
- pos: Node positions [num_nodes, 3]
"""
if self.cache_in_memory and idx in self.cache:
return self.cache[idx]
return self._load_case(idx)
def _load_case(self, idx):
"""Load and process a single case"""
case_dir = self.valid_cases[idx]
# Load JSON metadata
with open(case_dir / "neural_field_data.json", 'r') as f:
metadata = json.load(f)
# Load HDF5 field data
with h5py.File(case_dir / "neural_field_data.h5", 'r') as f:
# Node coordinates
node_coords = torch.from_numpy(f['mesh/node_coordinates'][:]).float()
# Displacement field (target)
displacement = torch.from_numpy(f['results/displacement'][:]).float()
# Stress field (target, if available)
stress = None
if self.include_stress and 'results/stress' in f:
# Try to load first available stress type
stress_group = f['results/stress']
for stress_type in stress_group.keys():
stress_data = stress_group[stress_type]['data'][:]
stress = torch.from_numpy(stress_data).float()
break
# Build graph structure
graph_data = self._build_graph(metadata, node_coords, displacement, stress)
# Normalize if requested
if self.normalize:
graph_data = self._normalize_graph(graph_data)
return graph_data
def _build_graph(self, metadata, node_coords, displacement, stress):
"""
Convert FEA mesh to graph
Args:
metadata (dict): Parsed metadata
node_coords (Tensor): Node positions [num_nodes, 3]
displacement (Tensor): Displacement field [num_nodes, 6]
stress (Tensor): Stress field [num_nodes, 6] or None
Returns:
torch_geometric.data.Data
"""
num_nodes = node_coords.shape[0]
# === NODE FEATURES ===
# Start with coordinates
node_features = [node_coords] # [num_nodes, 3]
# Add boundary conditions (which DOFs are constrained)
bc_mask = torch.zeros(num_nodes, 6) # [num_nodes, 6]
if 'boundary_conditions' in metadata and 'spc' in metadata['boundary_conditions']:
for spc in metadata['boundary_conditions']['spc']:
node_id = spc['node']
# Find node index (assuming node IDs are sequential starting from 1)
# This is a simplification - production code should use ID mapping
if node_id <= num_nodes:
dofs = spc['dofs']
# Parse DOF string (e.g., "123" means constrained in x,y,z)
for dof_char in str(dofs):
if dof_char.isdigit():
dof_idx = int(dof_char) - 1 # 0-indexed
if 0 <= dof_idx < 6:
bc_mask[node_id - 1, dof_idx] = 1.0
node_features.append(bc_mask) # [num_nodes, 6]
# Add load information (force magnitude at each node)
load_features = torch.zeros(num_nodes, 3) # [num_nodes, 3] for x,y,z forces
if 'loads' in metadata and 'point_forces' in metadata['loads']:
for force in metadata['loads']['point_forces']:
node_id = force['node']
if node_id <= num_nodes:
magnitude = force['magnitude']
direction = force['direction']
force_vector = [magnitude * d for d in direction]
load_features[node_id - 1] = torch.tensor(force_vector)
node_features.append(load_features) # [num_nodes, 3]
# Concatenate all node features
x = torch.cat(node_features, dim=-1) # [num_nodes, 3+6+3=12]
# === EDGE FEATURES ===
# Build edge index from element connectivity
edge_index = []
edge_attrs = []
# Get material properties
material_dict = {}
if 'materials' in metadata:
for mat in metadata['materials']:
mat_id = mat['id']
if mat['type'] == 'MAT1':
material_dict[mat_id] = [
mat.get('E', 0.0) / 1e6, # Normalize E (MPa → GPa)
mat.get('nu', 0.0),
mat.get('rho', 0.0) * 1e6, # Normalize rho
mat.get('G', 0.0) / 1e6 if mat.get('G') else 0.0,
mat.get('alpha', 0.0) * 1e6 if mat.get('alpha') else 0.0
]
# Process elements to create edges
if 'mesh' in metadata and 'elements' in metadata['mesh']:
for elem_type in ['solid', 'shell', 'beam']:
if elem_type in metadata['mesh']['elements']:
for elem in metadata['mesh']['elements'][elem_type]:
elem_nodes = elem['nodes']
mat_id = elem.get('material_id', 1)
# Get material properties for this element
mat_props = material_dict.get(mat_id, [0.0] * 5)
# Create edges between all node pairs in element
# (fully connected within element)
for i in range(len(elem_nodes)):
for j in range(i + 1, len(elem_nodes)):
node_i = elem_nodes[i] - 1 # 0-indexed
node_j = elem_nodes[j] - 1
if node_i < num_nodes and node_j < num_nodes:
# Add bidirectional edges
edge_index.append([node_i, node_j])
edge_index.append([node_j, node_i])
# Both edges get same material properties
edge_attrs.append(mat_props)
edge_attrs.append(mat_props)
# Convert to tensors
if edge_index:
edge_index = torch.tensor(edge_index, dtype=torch.long).t() # [2, num_edges]
edge_attr = torch.tensor(edge_attrs, dtype=torch.float) # [num_edges, 5]
else:
# No edges (shouldn't happen, but handle gracefully)
edge_index = torch.zeros((2, 0), dtype=torch.long)
edge_attr = torch.zeros((0, 5), dtype=torch.float)
# === CREATE DATA OBJECT ===
data = Data(
x=x,
edge_index=edge_index,
edge_attr=edge_attr,
y_displacement=displacement,
bc_mask=bc_mask,
pos=node_coords # Store original positions
)
# Add stress if available
if stress is not None:
data.y_stress = stress
return data
def _normalize_graph(self, data):
"""
Normalize graph features
- Coordinates: Center and scale to unit box
- Displacement: Scale by mean displacement
- Stress: Scale by mean stress
"""
# Normalize coordinates (already done in node features)
if hasattr(self, 'coord_mean') and hasattr(self, 'coord_std'):
# Extract coords from features (first 3 dimensions)
coords = data.x[:, :3]
coords_norm = (coords - self.coord_mean) / (self.coord_std + 1e-8)
data.x[:, :3] = coords_norm
# Normalize displacement
if hasattr(self, 'disp_mean') and hasattr(self, 'disp_std'):
data.y_displacement = (data.y_displacement - self.disp_mean) / (self.disp_std + 1e-8)
# Normalize stress
if hasattr(data, 'y_stress') and hasattr(self, 'stress_mean') and hasattr(self, 'stress_std'):
data.y_stress = (data.y_stress - self.stress_mean) / (self.stress_std + 1e-8)
return data
def _compute_normalization_stats(self):
"""
Compute mean and std for normalization across entire dataset
"""
print("Computing normalization statistics...")
all_coords = []
all_disp = []
all_stress = []
for idx in range(len(self.valid_cases)):
case_dir = self.valid_cases[idx]
with h5py.File(case_dir / "neural_field_data.h5", 'r') as f:
coords = f['mesh/node_coordinates'][:]
disp = f['results/displacement'][:]
all_coords.append(coords)
all_disp.append(disp)
# Load stress if available
if self.include_stress and 'results/stress' in f:
stress_group = f['results/stress']
for stress_type in stress_group.keys():
stress_data = stress_group[stress_type]['data'][:]
all_stress.append(stress_data)
break
# Concatenate all data
all_coords = np.concatenate(all_coords, axis=0)
all_disp = np.concatenate(all_disp, axis=0)
# Compute statistics
self.coord_mean = torch.from_numpy(all_coords.mean(axis=0)).float()
self.coord_std = torch.from_numpy(all_coords.std(axis=0)).float()
self.disp_mean = torch.from_numpy(all_disp.mean(axis=0)).float()
self.disp_std = torch.from_numpy(all_disp.std(axis=0)).float()
if all_stress:
all_stress = np.concatenate(all_stress, axis=0)
self.stress_mean = torch.from_numpy(all_stress.mean(axis=0)).float()
self.stress_std = torch.from_numpy(all_stress.std(axis=0)).float()
print("Normalization statistics computed!")
def create_dataloaders(
train_cases,
val_cases,
batch_size=4,
num_workers=0,
normalize=True,
include_stress=True
):
"""
Create training and validation dataloaders
Args:
train_cases (list): List of training case directories
val_cases (list): List of validation case directories
batch_size (int): Batch size
num_workers (int): Number of data loading workers
normalize (bool): Normalize features
include_stress (bool): Include stress targets
Returns:
train_loader, val_loader
"""
print("\nCreating datasets...")
# Create datasets
train_dataset = FEAMeshDataset(
train_cases,
normalize=normalize,
include_stress=include_stress,
cache_in_memory=False # Set to True for small datasets
)
val_dataset = FEAMeshDataset(
val_cases,
normalize=normalize,
include_stress=include_stress,
cache_in_memory=False
)
# Share normalization stats with validation set
if normalize and hasattr(train_dataset, 'coord_mean'):
val_dataset.coord_mean = train_dataset.coord_mean
val_dataset.coord_std = train_dataset.coord_std
val_dataset.disp_mean = train_dataset.disp_mean
val_dataset.disp_std = train_dataset.disp_std
if hasattr(train_dataset, 'stress_mean'):
val_dataset.stress_mean = train_dataset.stress_mean
val_dataset.stress_std = train_dataset.stress_std
# Create dataloaders
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers
)
print(f"\nDataloaders created:")
print(f" Training: {len(train_dataset)} cases")
print(f" Validation: {len(val_dataset)} cases")
return train_loader, val_loader
if __name__ == "__main__":
# Test data loader
print("Testing FEA Mesh Data Loader...\n")
# This is a placeholder test - you would use actual parsed case directories
print("Note: This test requires actual parsed FEA data.")
print("Run the parser first on your NX Nastran files.")
print("\nData loader implementation complete!")

View File

@@ -0,0 +1,490 @@
"""
field_predictor.py
Graph Neural Network for predicting complete FEA field results
AtomizerField Field Predictor v2.0
Uses Graph Neural Networks to learn the physics of structural response.
Key Innovation:
Instead of: parameters → FEA → max_stress (scalar)
We learn: parameters → Neural Network → complete stress field (N values)
This enables 1000x faster optimization with physics understanding.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing, global_mean_pool
from torch_geometric.data import Data
import numpy as np
class MeshGraphConv(MessagePassing):
"""
Custom Graph Convolution for FEA meshes
This layer propagates information along mesh edges (element connectivity)
to learn how forces flow through the structure.
Key insight: Stress and displacement fields follow mesh topology.
Adjacent elements influence each other through equilibrium.
"""
def __init__(self, in_channels, out_channels, edge_dim=None):
"""
Args:
in_channels (int): Input node feature dimension
out_channels (int): Output node feature dimension
edge_dim (int): Edge feature dimension (optional)
"""
super().__init__(aggr='mean') # Mean aggregation of neighbor messages
self.in_channels = in_channels
self.out_channels = out_channels
# Message function: how to combine node and edge features
if edge_dim is not None:
self.message_mlp = nn.Sequential(
nn.Linear(2 * in_channels + edge_dim, out_channels),
nn.LayerNorm(out_channels),
nn.ReLU(),
nn.Linear(out_channels, out_channels)
)
else:
self.message_mlp = nn.Sequential(
nn.Linear(2 * in_channels, out_channels),
nn.LayerNorm(out_channels),
nn.ReLU(),
nn.Linear(out_channels, out_channels)
)
# Update function: how to update node features
self.update_mlp = nn.Sequential(
nn.Linear(in_channels + out_channels, out_channels),
nn.LayerNorm(out_channels),
nn.ReLU(),
nn.Linear(out_channels, out_channels)
)
self.edge_dim = edge_dim
def forward(self, x, edge_index, edge_attr=None):
"""
Propagate messages through the mesh graph
Args:
x: Node features [num_nodes, in_channels]
edge_index: Edge connectivity [2, num_edges]
edge_attr: Edge features [num_edges, edge_dim] (optional)
Returns:
Updated node features [num_nodes, out_channels]
"""
return self.propagate(edge_index, x=x, edge_attr=edge_attr)
def message(self, x_i, x_j, edge_attr=None):
"""
Construct messages from neighbors
Args:
x_i: Target node features
x_j: Source node features
edge_attr: Edge features
"""
if edge_attr is not None:
# Combine source node, target node, and edge features
msg_input = torch.cat([x_i, x_j, edge_attr], dim=-1)
else:
msg_input = torch.cat([x_i, x_j], dim=-1)
return self.message_mlp(msg_input)
def update(self, aggr_out, x):
"""
Update node features with aggregated messages
Args:
aggr_out: Aggregated messages from neighbors
x: Original node features
"""
# Combine original features with aggregated messages
update_input = torch.cat([x, aggr_out], dim=-1)
return self.update_mlp(update_input)
class FieldPredictorGNN(nn.Module):
"""
Graph Neural Network for predicting complete FEA fields
Architecture:
1. Node Encoder: Encode node positions, BCs, loads
2. Edge Encoder: Encode element connectivity, material properties
3. Message Passing: Propagate information through mesh (multiple layers)
4. Field Decoder: Predict displacement/stress at each node/element
This architecture respects physics:
- Uses mesh topology (forces flow through connected elements)
- Incorporates boundary conditions (fixed/loaded nodes)
- Learns material behavior (E, nu → stress-strain relationship)
"""
def __init__(
self,
node_feature_dim=3, # Node coordinates (x, y, z)
edge_feature_dim=5, # Material properties (E, nu, rho, etc.)
hidden_dim=128,
num_layers=6,
output_dim=6, # 6 DOF displacement (3 translation + 3 rotation)
dropout=0.1
):
"""
Initialize field predictor
Args:
node_feature_dim (int): Dimension of node features (position + BCs + loads)
edge_feature_dim (int): Dimension of edge features (material properties)
hidden_dim (int): Hidden layer dimension
num_layers (int): Number of message passing layers
output_dim (int): Output dimension per node (6 for displacement)
dropout (float): Dropout rate
"""
super().__init__()
self.node_feature_dim = node_feature_dim
self.edge_feature_dim = edge_feature_dim
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.output_dim = output_dim
# Node encoder: embed node coordinates + BCs + loads
self.node_encoder = nn.Sequential(
nn.Linear(node_feature_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim)
)
# Edge encoder: embed material properties
self.edge_encoder = nn.Sequential(
nn.Linear(edge_feature_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim // 2)
)
# Message passing layers (the physics learning happens here)
self.conv_layers = nn.ModuleList([
MeshGraphConv(
in_channels=hidden_dim,
out_channels=hidden_dim,
edge_dim=hidden_dim // 2
)
for _ in range(num_layers)
])
self.layer_norms = nn.ModuleList([
nn.LayerNorm(hidden_dim)
for _ in range(num_layers)
])
self.dropouts = nn.ModuleList([
nn.Dropout(dropout)
for _ in range(num_layers)
])
# Field decoder: predict displacement at each node
self.field_decoder = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, output_dim)
)
# Physics-informed constraint layer (optional, ensures equilibrium)
self.physics_scale = nn.Parameter(torch.ones(1))
def forward(self, data):
"""
Forward pass: mesh → displacement field
Args:
data (torch_geometric.data.Data): Batch of mesh graphs containing:
- x: Node features [num_nodes, node_feature_dim]
- edge_index: Connectivity [2, num_edges]
- edge_attr: Edge features [num_edges, edge_feature_dim]
- batch: Batch assignment [num_nodes]
Returns:
displacement_field: Predicted displacement [num_nodes, output_dim]
"""
x, edge_index, edge_attr = data.x, data.edge_index, data.edge_attr
# Encode nodes (positions + BCs + loads)
x = self.node_encoder(x) # [num_nodes, hidden_dim]
# Encode edges (material properties)
if edge_attr is not None:
edge_features = self.edge_encoder(edge_attr) # [num_edges, hidden_dim//2]
else:
edge_features = None
# Message passing: learn how forces propagate through mesh
for i, (conv, norm, dropout) in enumerate(zip(
self.conv_layers, self.layer_norms, self.dropouts
)):
# Graph convolution
x_new = conv(x, edge_index, edge_features)
# Residual connection (helps gradients flow)
x = x + dropout(x_new)
# Layer normalization
x = norm(x)
# Decode to displacement field
displacement = self.field_decoder(x) # [num_nodes, output_dim]
# Apply physics-informed scaling
displacement = displacement * self.physics_scale
return displacement
def predict_stress_from_displacement(self, displacement, data, material_props):
"""
Convert predicted displacement to stress using constitutive law
This implements: σ = C : ε = C : (∇u)
Where C is the material stiffness matrix
Args:
displacement: Predicted displacement [num_nodes, 6]
data: Mesh graph data
material_props: Material properties (E, nu)
Returns:
stress_field: Predicted stress [num_elements, n_components]
"""
# This would compute strain from displacement gradients
# then apply material constitutive law
# For now, we'll predict displacement and train a separate stress predictor
raise NotImplementedError("Stress prediction implemented in StressPredictor")
class StressPredictor(nn.Module):
"""
Predicts stress field from displacement field
This can be:
1. Physics-based: Compute strain from displacement, apply constitutive law
2. Learned: Train neural network to predict stress from displacement
We use learned approach for flexibility with nonlinear materials.
"""
def __init__(self, displacement_dim=6, hidden_dim=128, stress_components=6):
"""
Args:
displacement_dim (int): Displacement DOFs per node
hidden_dim (int): Hidden layer size
stress_components (int): Stress tensor components (6 for 3D)
"""
super().__init__()
# Stress predictor network
self.stress_net = nn.Sequential(
nn.Linear(displacement_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, stress_components)
)
def forward(self, displacement):
"""
Predict stress from displacement
Args:
displacement: [num_nodes, displacement_dim]
Returns:
stress: [num_nodes, stress_components]
"""
return self.stress_net(displacement)
class AtomizerFieldModel(nn.Module):
"""
Complete AtomizerField model: predicts both displacement and stress fields
This is the main model you'll use for training and inference.
"""
def __init__(
self,
node_feature_dim=10, # 3 (xyz) + 6 (BC DOFs) + 1 (load magnitude)
edge_feature_dim=5, # E, nu, rho, G, alpha
hidden_dim=128,
num_layers=6,
dropout=0.1
):
"""
Initialize complete field prediction model
Args:
node_feature_dim (int): Node features (coords + BCs + loads)
edge_feature_dim (int): Edge features (material properties)
hidden_dim (int): Hidden dimension
num_layers (int): Message passing layers
dropout (float): Dropout rate
"""
super().__init__()
# Displacement predictor (main GNN)
self.displacement_predictor = FieldPredictorGNN(
node_feature_dim=node_feature_dim,
edge_feature_dim=edge_feature_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
output_dim=6, # 6 DOF displacement
dropout=dropout
)
# Stress predictor (from displacement)
self.stress_predictor = StressPredictor(
displacement_dim=6,
hidden_dim=hidden_dim,
stress_components=6 # σxx, σyy, σzz, τxy, τyz, τxz
)
def forward(self, data, return_stress=True):
"""
Predict displacement and stress fields
Args:
data: Mesh graph data
return_stress (bool): Whether to predict stress
Returns:
dict with:
- displacement: [num_nodes, 6]
- stress: [num_nodes, 6] (if return_stress=True)
- von_mises: [num_nodes] (if return_stress=True)
"""
# Predict displacement
displacement = self.displacement_predictor(data)
results = {'displacement': displacement}
if return_stress:
# Predict stress from displacement
stress = self.stress_predictor(displacement)
# Calculate von Mises stress
# σ_vm = sqrt(0.5 * ((σxx-σyy)² + (σyy-σzz)² + (σzz-σxx)² + 6(τxy² + τyz² + τxz²)))
sxx, syy, szz, txy, tyz, txz = stress[:, 0], stress[:, 1], stress[:, 2], \
stress[:, 3], stress[:, 4], stress[:, 5]
von_mises = torch.sqrt(
0.5 * (
(sxx - syy)**2 + (syy - szz)**2 + (szz - sxx)**2 +
6 * (txy**2 + tyz**2 + txz**2)
)
)
results['stress'] = stress
results['von_mises'] = von_mises
return results
def get_max_values(self, results):
"""
Extract maximum values (for compatibility with scalar optimization)
Args:
results: Output from forward()
Returns:
dict with max_displacement, max_stress
"""
max_displacement = torch.max(torch.norm(results['displacement'][:, :3], dim=1))
max_stress = torch.max(results['von_mises']) if 'von_mises' in results else None
return {
'max_displacement': max_displacement,
'max_stress': max_stress
}
def create_model(config=None):
"""
Factory function to create AtomizerField model
Args:
config (dict): Model configuration
Returns:
AtomizerFieldModel instance
"""
if config is None:
config = {
'node_feature_dim': 10,
'edge_feature_dim': 5,
'hidden_dim': 128,
'num_layers': 6,
'dropout': 0.1
}
model = AtomizerFieldModel(**config)
# Initialize weights
def init_weights(m):
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
model.apply(init_weights)
return model
if __name__ == "__main__":
# Test model creation
print("Testing AtomizerField Model Creation...")
model = create_model()
print(f"Model created: {sum(p.numel() for p in model.parameters()):,} parameters")
# Create dummy data
num_nodes = 100
num_edges = 300
x = torch.randn(num_nodes, 10) # Node features
edge_index = torch.randint(0, num_nodes, (2, num_edges)) # Edge connectivity
edge_attr = torch.randn(num_edges, 5) # Edge features
batch = torch.zeros(num_nodes, dtype=torch.long) # Batch assignment
data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, batch=batch)
# Forward pass
with torch.no_grad():
results = model(data)
print(f"\nTest forward pass:")
print(f" Displacement shape: {results['displacement'].shape}")
print(f" Stress shape: {results['stress'].shape}")
print(f" Von Mises shape: {results['von_mises'].shape}")
max_vals = model.get_max_values(results)
print(f"\nMax values:")
print(f" Max displacement: {max_vals['max_displacement']:.6f}")
print(f" Max stress: {max_vals['max_stress']:.2f}")
print("\nModel test passed!")

View File

@@ -0,0 +1,449 @@
"""
physics_losses.py
Physics-informed loss functions for training FEA field predictors
AtomizerField Physics-Informed Loss Functions v2.0
Key Innovation:
Standard neural networks only minimize prediction error.
Physics-informed networks also enforce physical laws:
- Equilibrium: Forces must balance
- Compatibility: Strains must be compatible with displacements
- Constitutive: Stress must follow material law (σ = C:ε)
This makes the network learn physics, not just patterns.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
class PhysicsInformedLoss(nn.Module):
"""
Combined loss function with physics constraints
Total Loss = λ_data * L_data + λ_physics * L_physics
Where:
- L_data: Standard MSE between prediction and FEA ground truth
- L_physics: Physics violation penalty (equilibrium, compatibility, constitutive)
"""
def __init__(
self,
lambda_data=1.0,
lambda_equilibrium=0.1,
lambda_constitutive=0.1,
lambda_boundary=1.0,
use_relative_error=True
):
"""
Initialize physics-informed loss
Args:
lambda_data (float): Weight for data loss
lambda_equilibrium (float): Weight for equilibrium violation
lambda_constitutive (float): Weight for constitutive law violation
lambda_boundary (float): Weight for boundary condition violation
use_relative_error (bool): Use relative error instead of absolute
"""
super().__init__()
self.lambda_data = lambda_data
self.lambda_equilibrium = lambda_equilibrium
self.lambda_constitutive = lambda_constitutive
self.lambda_boundary = lambda_boundary
self.use_relative_error = use_relative_error
def forward(self, predictions, targets, data=None):
"""
Compute total physics-informed loss
Args:
predictions (dict): Model predictions
- displacement: [num_nodes, 6]
- stress: [num_nodes, 6]
- von_mises: [num_nodes]
targets (dict): Ground truth from FEA
- displacement: [num_nodes, 6]
- stress: [num_nodes, 6]
data: Mesh graph data (for physics constraints)
Returns:
dict with:
- total_loss: Combined loss
- data_loss: Data fitting loss
- equilibrium_loss: Equilibrium violation
- constitutive_loss: Material law violation
- boundary_loss: BC violation
"""
losses = {}
# 1. Data Loss: How well do predictions match FEA results?
losses['displacement_loss'] = self._displacement_loss(
predictions['displacement'],
targets['displacement']
)
if 'stress' in predictions and 'stress' in targets:
losses['stress_loss'] = self._stress_loss(
predictions['stress'],
targets['stress']
)
else:
losses['stress_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
losses['data_loss'] = losses['displacement_loss'] + losses['stress_loss']
# 2. Physics Losses: How well do predictions obey physics?
if data is not None:
# Equilibrium: ∇·σ + f = 0
losses['equilibrium_loss'] = self._equilibrium_loss(
predictions, data
)
# Constitutive: σ = C:ε
losses['constitutive_loss'] = self._constitutive_loss(
predictions, data
)
# Boundary conditions: u = 0 at fixed nodes
losses['boundary_loss'] = self._boundary_condition_loss(
predictions, data
)
else:
losses['equilibrium_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
losses['constitutive_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
losses['boundary_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
# Total loss
losses['total_loss'] = (
self.lambda_data * losses['data_loss'] +
self.lambda_equilibrium * losses['equilibrium_loss'] +
self.lambda_constitutive * losses['constitutive_loss'] +
self.lambda_boundary * losses['boundary_loss']
)
return losses
def _displacement_loss(self, pred, target):
"""
Loss for displacement field
Uses relative error to handle different displacement magnitudes
"""
if self.use_relative_error:
# Relative L2 error
diff = pred - target
rel_error = torch.norm(diff, dim=-1) / (torch.norm(target, dim=-1) + 1e-8)
return rel_error.mean()
else:
# Absolute MSE
return F.mse_loss(pred, target)
def _stress_loss(self, pred, target):
"""
Loss for stress field
Emphasizes von Mises stress (most important for failure prediction)
"""
# Component-wise MSE
component_loss = F.mse_loss(pred, target)
# Von Mises stress MSE (computed from components)
pred_vm = self._compute_von_mises(pred)
target_vm = self._compute_von_mises(target)
vm_loss = F.mse_loss(pred_vm, target_vm)
# Combined: 50% component accuracy, 50% von Mises accuracy
return 0.5 * component_loss + 0.5 * vm_loss
def _equilibrium_loss(self, predictions, data):
"""
Equilibrium loss: ∇·σ + f = 0
In discrete form: sum of forces at each node should be zero
(where not externally loaded)
This is expensive to compute exactly, so we use a simplified version:
Check force balance on each element
"""
# Simplified: For now, return zero (full implementation requires
# computing stress divergence from node stresses)
# TODO: Implement finite difference approximation of ∇·σ
return torch.tensor(0.0, device=predictions['displacement'].device)
def _constitutive_loss(self, predictions, data):
"""
Constitutive law loss: σ = C:ε
Check if predicted stress is consistent with predicted strain
(which comes from displacement gradient)
Simplified version: Check if stress-strain relationship is reasonable
"""
# Simplified: For now, return zero
# Full implementation would:
# 1. Compute strain from displacement gradient
# 2. Compute expected stress from strain using material stiffness
# 3. Compare with predicted stress
# TODO: Implement strain computation and constitutive check
return torch.tensor(0.0, device=predictions['displacement'].device)
def _boundary_condition_loss(self, predictions, data):
"""
Boundary condition loss: u = 0 at fixed DOFs
Penalize non-zero displacement at constrained nodes
"""
if not hasattr(data, 'bc_mask') or data.bc_mask is None:
return torch.tensor(0.0, device=predictions['displacement'].device)
# bc_mask: [num_nodes, 6] boolean mask where True = constrained
displacement = predictions['displacement']
bc_mask = data.bc_mask
# Compute penalty for non-zero displacement at constrained DOFs
constrained_displacement = displacement * bc_mask.float()
bc_loss = torch.mean(constrained_displacement ** 2)
return bc_loss
def _compute_von_mises(self, stress):
"""
Compute von Mises stress from stress tensor components
Args:
stress: [num_nodes, 6] with [σxx, σyy, σzz, τxy, τyz, τxz]
Returns:
von_mises: [num_nodes]
"""
sxx, syy, szz = stress[:, 0], stress[:, 1], stress[:, 2]
txy, tyz, txz = stress[:, 3], stress[:, 4], stress[:, 5]
vm = torch.sqrt(
0.5 * (
(sxx - syy)**2 + (syy - szz)**2 + (szz - sxx)**2 +
6 * (txy**2 + tyz**2 + txz**2)
)
)
return vm
class FieldMSELoss(nn.Module):
"""
Simple MSE loss for field prediction (no physics constraints)
Use this for initial training or when physics constraints are too strict.
"""
def __init__(self, weight_displacement=1.0, weight_stress=1.0):
"""
Args:
weight_displacement (float): Weight for displacement loss
weight_stress (float): Weight for stress loss
"""
super().__init__()
self.weight_displacement = weight_displacement
self.weight_stress = weight_stress
def forward(self, predictions, targets):
"""
Compute MSE loss
Args:
predictions (dict): Model outputs
targets (dict): Ground truth
Returns:
dict with loss components
"""
losses = {}
# Displacement MSE
losses['displacement_loss'] = F.mse_loss(
predictions['displacement'],
targets['displacement']
)
# Stress MSE (if available)
if 'stress' in predictions and 'stress' in targets:
losses['stress_loss'] = F.mse_loss(
predictions['stress'],
targets['stress']
)
else:
losses['stress_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
# Total loss
losses['total_loss'] = (
self.weight_displacement * losses['displacement_loss'] +
self.weight_stress * losses['stress_loss']
)
return losses
class RelativeFieldLoss(nn.Module):
"""
Relative error loss - better for varying displacement/stress magnitudes
Uses: ||pred - target|| / ||target||
This makes the loss scale-invariant.
"""
def __init__(self, epsilon=1e-8):
"""
Args:
epsilon (float): Small constant to avoid division by zero
"""
super().__init__()
self.epsilon = epsilon
def forward(self, predictions, targets):
"""
Compute relative error loss
Args:
predictions (dict): Model outputs
targets (dict): Ground truth
Returns:
dict with loss components
"""
losses = {}
# Relative displacement error
disp_diff = predictions['displacement'] - targets['displacement']
disp_norm_pred = torch.norm(disp_diff, dim=-1)
disp_norm_target = torch.norm(targets['displacement'], dim=-1)
losses['displacement_loss'] = (disp_norm_pred / (disp_norm_target + self.epsilon)).mean()
# Relative stress error
if 'stress' in predictions and 'stress' in targets:
stress_diff = predictions['stress'] - targets['stress']
stress_norm_pred = torch.norm(stress_diff, dim=-1)
stress_norm_target = torch.norm(targets['stress'], dim=-1)
losses['stress_loss'] = (stress_norm_pred / (stress_norm_target + self.epsilon)).mean()
else:
losses['stress_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
# Total loss
losses['total_loss'] = losses['displacement_loss'] + losses['stress_loss']
return losses
class MaxValueLoss(nn.Module):
"""
Loss on maximum values only (for backward compatibility with scalar optimization)
This is useful if you want to ensure the network gets the critical max values right,
even if the field distribution is slightly off.
"""
def __init__(self):
super().__init__()
def forward(self, predictions, targets):
"""
Compute loss on maximum displacement and stress
Args:
predictions (dict): Model outputs with 'displacement', 'von_mises'
targets (dict): Ground truth
Returns:
dict with loss components
"""
losses = {}
# Max displacement error
pred_max_disp = torch.max(torch.norm(predictions['displacement'][:, :3], dim=1))
target_max_disp = torch.max(torch.norm(targets['displacement'][:, :3], dim=1))
losses['max_displacement_loss'] = F.mse_loss(pred_max_disp, target_max_disp)
# Max von Mises stress error
if 'von_mises' in predictions and 'stress' in targets:
pred_max_vm = torch.max(predictions['von_mises'])
# Compute target von Mises
target_stress = targets['stress']
sxx, syy, szz = target_stress[:, 0], target_stress[:, 1], target_stress[:, 2]
txy, tyz, txz = target_stress[:, 3], target_stress[:, 4], target_stress[:, 5]
target_vm = torch.sqrt(
0.5 * ((sxx - syy)**2 + (syy - szz)**2 + (szz - sxx)**2 +
6 * (txy**2 + tyz**2 + txz**2))
)
target_max_vm = torch.max(target_vm)
losses['max_stress_loss'] = F.mse_loss(pred_max_vm, target_max_vm)
else:
losses['max_stress_loss'] = torch.tensor(0.0, device=predictions['displacement'].device)
# Total loss
losses['total_loss'] = losses['max_displacement_loss'] + losses['max_stress_loss']
return losses
def create_loss_function(loss_type='mse', config=None):
"""
Factory function to create loss function
Args:
loss_type (str): Type of loss ('mse', 'relative', 'physics', 'max')
config (dict): Loss function configuration
Returns:
Loss function instance
"""
if config is None:
config = {}
if loss_type == 'mse':
return FieldMSELoss(**config)
elif loss_type == 'relative':
return RelativeFieldLoss(**config)
elif loss_type == 'physics':
return PhysicsInformedLoss(**config)
elif loss_type == 'max':
return MaxValueLoss(**config)
else:
raise ValueError(f"Unknown loss type: {loss_type}")
if __name__ == "__main__":
# Test loss functions
print("Testing AtomizerField Loss Functions...\n")
# Create dummy predictions and targets
num_nodes = 100
pred = {
'displacement': torch.randn(num_nodes, 6),
'stress': torch.randn(num_nodes, 6),
'von_mises': torch.abs(torch.randn(num_nodes))
}
target = {
'displacement': torch.randn(num_nodes, 6),
'stress': torch.randn(num_nodes, 6)
}
# Test each loss function
loss_types = ['mse', 'relative', 'physics', 'max']
for loss_type in loss_types:
print(f"Testing {loss_type.upper()} loss...")
loss_fn = create_loss_function(loss_type)
losses = loss_fn(pred, target)
print(f" Total loss: {losses['total_loss']:.6f}")
for key, value in losses.items():
if key != 'total_loss':
print(f" {key}: {value:.6f}")
print()
print("Loss function tests passed!")

View File

@@ -0,0 +1,361 @@
"""
uncertainty.py
Uncertainty quantification for neural field predictions
AtomizerField Uncertainty Quantification v2.1
Know when to trust predictions and when to run FEA!
Key Features:
- Ensemble-based uncertainty estimation
- Confidence intervals for predictions
- Automatic FEA recommendation
- Online calibration
"""
import torch
import torch.nn as nn
import numpy as np
from copy import deepcopy
from .field_predictor import AtomizerFieldModel
class UncertainFieldPredictor(nn.Module):
"""
Ensemble of models for uncertainty quantification
Uses multiple models trained with different initializations
to estimate prediction uncertainty.
When uncertainty is high → Recommend FEA validation
When uncertainty is low → Trust neural prediction
"""
def __init__(self, base_model_config, n_ensemble=5):
"""
Initialize ensemble
Args:
base_model_config (dict): Configuration for base model
n_ensemble (int): Number of models in ensemble
"""
super().__init__()
print(f"\nCreating ensemble with {n_ensemble} models...")
# Create ensemble of models
self.models = nn.ModuleList([
AtomizerFieldModel(**base_model_config)
for _ in range(n_ensemble)
])
self.n_ensemble = n_ensemble
# Initialize each model differently
for i, model in enumerate(self.models):
self._init_weights(model, seed=i)
print(f"Ensemble created with {n_ensemble} models")
def _init_weights(self, model, seed):
"""Initialize model weights with different seed"""
torch.manual_seed(seed)
def init_fn(m):
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
model.apply(init_fn)
def forward(self, data, return_uncertainty=True, return_all_predictions=False):
"""
Forward pass through ensemble
Args:
data: Input graph data
return_uncertainty (bool): Return uncertainty estimates
return_all_predictions (bool): Return all individual predictions
Returns:
dict: Predictions with uncertainty
- displacement: Mean prediction
- stress: Mean prediction
- von_mises: Mean prediction
- displacement_std: Standard deviation (if return_uncertainty)
- stress_std: Standard deviation (if return_uncertainty)
- von_mises_std: Standard deviation (if return_uncertainty)
- all_predictions: List of all predictions (if return_all_predictions)
"""
# Get predictions from all models
all_predictions = []
for model in self.models:
with torch.no_grad():
pred = model(data, return_stress=True)
all_predictions.append(pred)
# Stack predictions
displacement_stack = torch.stack([p['displacement'] for p in all_predictions])
stress_stack = torch.stack([p['stress'] for p in all_predictions])
von_mises_stack = torch.stack([p['von_mises'] for p in all_predictions])
# Compute mean predictions
results = {
'displacement': displacement_stack.mean(dim=0),
'stress': stress_stack.mean(dim=0),
'von_mises': von_mises_stack.mean(dim=0)
}
# Compute uncertainty (standard deviation across ensemble)
if return_uncertainty:
results['displacement_std'] = displacement_stack.std(dim=0)
results['stress_std'] = stress_stack.std(dim=0)
results['von_mises_std'] = von_mises_stack.std(dim=0)
# Overall uncertainty metrics
results['max_displacement_uncertainty'] = results['displacement_std'].max().item()
results['max_stress_uncertainty'] = results['von_mises_std'].max().item()
# Uncertainty as percentage of prediction
results['displacement_rel_uncertainty'] = (
results['displacement_std'] / (torch.abs(results['displacement']) + 1e-8)
).mean().item()
results['stress_rel_uncertainty'] = (
results['von_mises_std'] / (results['von_mises'] + 1e-8)
).mean().item()
# Return all predictions if requested
if return_all_predictions:
results['all_predictions'] = all_predictions
return results
def needs_fea_validation(self, predictions, threshold=0.1):
"""
Determine if FEA validation is recommended
Args:
predictions (dict): Output from forward() with uncertainty
threshold (float): Relative uncertainty threshold
Returns:
dict: Recommendation and reasons
"""
reasons = []
# Check displacement uncertainty
if predictions['displacement_rel_uncertainty'] > threshold:
reasons.append(
f"High displacement uncertainty: "
f"{predictions['displacement_rel_uncertainty']*100:.1f}% > {threshold*100:.1f}%"
)
# Check stress uncertainty
if predictions['stress_rel_uncertainty'] > threshold:
reasons.append(
f"High stress uncertainty: "
f"{predictions['stress_rel_uncertainty']*100:.1f}% > {threshold*100:.1f}%"
)
recommend_fea = len(reasons) > 0
return {
'recommend_fea': recommend_fea,
'reasons': reasons,
'displacement_uncertainty': predictions['displacement_rel_uncertainty'],
'stress_uncertainty': predictions['stress_rel_uncertainty']
}
def get_confidence_intervals(self, predictions, confidence=0.95):
"""
Compute confidence intervals for predictions
Args:
predictions (dict): Output from forward() with uncertainty
confidence (float): Confidence level (0.95 = 95% confidence)
Returns:
dict: Confidence intervals
"""
# For normal distribution, 95% CI is ±1.96 std
# For 90% CI is ±1.645 std
z_score = {0.90: 1.645, 0.95: 1.96, 0.99: 2.576}.get(confidence, 1.96)
intervals = {}
# Displacement intervals
intervals['displacement_lower'] = predictions['displacement'] - z_score * predictions['displacement_std']
intervals['displacement_upper'] = predictions['displacement'] + z_score * predictions['displacement_std']
# Stress intervals
intervals['von_mises_lower'] = predictions['von_mises'] - z_score * predictions['von_mises_std']
intervals['von_mises_upper'] = predictions['von_mises'] + z_score * predictions['von_mises_std']
# Max values with confidence intervals
max_vm = predictions['von_mises'].max()
max_vm_std = predictions['von_mises_std'].max()
intervals['max_stress_estimate'] = max_vm.item()
intervals['max_stress_lower'] = (max_vm - z_score * max_vm_std).item()
intervals['max_stress_upper'] = (max_vm + z_score * max_vm_std).item()
return intervals
class OnlineLearner:
"""
Online learning from FEA runs during optimization
As optimization progresses and you run FEA for validation,
this module can quickly update the model to improve predictions.
This creates a virtuous cycle:
1. Use neural network for fast exploration
2. Run FEA on promising designs
3. Update neural network with new data
4. Neural network gets better → need less FEA
"""
def __init__(self, model, learning_rate=0.0001):
"""
Initialize online learner
Args:
model: Neural network model
learning_rate (float): Learning rate for updates
"""
self.model = model
self.optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
self.replay_buffer = []
self.update_count = 0
print(f"\nOnline learner initialized")
print(f"Learning rate: {learning_rate}")
def add_fea_result(self, graph_data, fea_results):
"""
Add new FEA result to replay buffer
Args:
graph_data: Mesh graph
fea_results (dict): FEA results (displacement, stress)
"""
self.replay_buffer.append({
'graph_data': graph_data,
'fea_results': fea_results
})
print(f"Added FEA result to buffer (total: {len(self.replay_buffer)})")
def quick_update(self, steps=10):
"""
Quick fine-tuning on recent FEA results
Args:
steps (int): Number of gradient steps
"""
if len(self.replay_buffer) == 0:
print("No data in replay buffer")
return
print(f"\nQuick update: {steps} steps on {len(self.replay_buffer)} samples")
self.model.train()
for step in range(steps):
total_loss = 0.0
# Train on all samples in buffer
for sample in self.replay_buffer:
graph_data = sample['graph_data']
fea_results = sample['fea_results']
# Forward pass
predictions = self.model(graph_data, return_stress=True)
# Compute loss
disp_loss = nn.functional.mse_loss(
predictions['displacement'],
fea_results['displacement']
)
if 'stress' in fea_results:
stress_loss = nn.functional.mse_loss(
predictions['stress'],
fea_results['stress']
)
loss = disp_loss + stress_loss
else:
loss = disp_loss
# Backward pass
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
if step % 5 == 0:
avg_loss = total_loss / len(self.replay_buffer)
print(f" Step {step}/{steps}: Loss = {avg_loss:.6f}")
self.model.eval()
self.update_count += 1
print(f"Update complete (total updates: {self.update_count})")
def clear_buffer(self):
"""Clear replay buffer"""
self.replay_buffer = []
print("Replay buffer cleared")
def create_uncertain_predictor(model_config, n_ensemble=5):
"""
Factory function to create uncertain predictor
Args:
model_config (dict): Model configuration
n_ensemble (int): Ensemble size
Returns:
UncertainFieldPredictor instance
"""
return UncertainFieldPredictor(model_config, n_ensemble)
if __name__ == "__main__":
# Test uncertainty quantification
print("Testing Uncertainty Quantification...\n")
# Create ensemble
model_config = {
'node_feature_dim': 12,
'edge_feature_dim': 5,
'hidden_dim': 64,
'num_layers': 4,
'dropout': 0.1
}
ensemble = UncertainFieldPredictor(model_config, n_ensemble=3)
print(f"\nEnsemble created with {ensemble.n_ensemble} models")
print("Uncertainty quantification ready!")
print("\nUsage:")
print("""
# Get predictions with uncertainty
predictions = ensemble(graph_data, return_uncertainty=True)
# Check if FEA validation needed
recommendation = ensemble.needs_fea_validation(predictions, threshold=0.1)
if recommendation['recommend_fea']:
print("Recommendation: Run FEA for validation")
for reason in recommendation['reasons']:
print(f" - {reason}")
else:
print("Prediction confident - no FEA needed!")
""")