Files
Atomizer/atomizer-field/neural_field_parser.py

922 lines
37 KiB
Python
Raw Normal View History

"""
neural_field_parser.py
Parses NX Nastran files into Neural Field training data
AtomizerField Data Parser v1.0.0
Converts NX Nastran BDF/OP2 files into standardized neural field training format.
This format is designed to be future-proof for years of neural network training.
Usage:
python neural_field_parser.py <case_directory>
Example:
python neural_field_parser.py training_case_001
"""
import json
import numpy as np
import h5py
from pathlib import Path
from datetime import datetime
import hashlib
import warnings
# pyNastran imports
try:
from pyNastran.bdf.bdf import BDF
from pyNastran.op2.op2 import OP2
except ImportError:
print("ERROR: pyNastran is required. Install with: pip install pyNastran")
raise
class NastranToNeuralFieldParser:
"""
Parses Nastran BDF/OP2 files into Neural Field data structure v1.0
This parser extracts complete field data (stress, displacement, strain at every node/element)
rather than just scalar maximum values. This enables neural networks to learn complete
physics fields for 1000x faster structural optimization.
Data Structure v1.0:
-------------------
- metadata: Version, timestamps, analysis info, units
- mesh: Complete node coordinates, element connectivity
- materials: Full material properties (E, nu, rho, etc.)
- boundary_conditions: All constraints (SPC, MPC)
- loads: All loading conditions (forces, pressures, gravity, thermal)
- results: Complete field results (displacement, stress, strain at ALL points)
Attributes:
case_dir (Path): Directory containing input/output subdirectories
bdf_file (Path): Path to Nastran input deck (.bdf or .dat)
op2_file (Path): Path to Nastran binary results (.op2)
bdf (BDF): pyNastran BDF reader object
op2 (OP2): pyNastran OP2 reader object
neural_field_data (dict): Complete parsed data structure
"""
def __init__(self, case_directory):
"""
Initialize parser with case directory
Args:
case_directory (str or Path): Path to case directory containing:
- input/model.bdf (or model.dat)
- output/model.op2
"""
self.case_dir = Path(case_directory)
# Find BDF file (try both .bdf and .dat extensions)
bdf_candidates = list((self.case_dir / "input").glob("model.bdf")) + \
list((self.case_dir / "input").glob("model.dat"))
if not bdf_candidates:
raise FileNotFoundError(
f"No model.bdf or model.dat found in {self.case_dir / 'input'}/"
)
self.bdf_file = bdf_candidates[0]
# Find OP2 file
op2_candidates = list((self.case_dir / "output").glob("model.op2"))
if not op2_candidates:
raise FileNotFoundError(
f"No model.op2 found in {self.case_dir / 'output'}/"
)
self.op2_file = op2_candidates[0]
print(f"Found BDF: {self.bdf_file.name}")
print(f"Found OP2: {self.op2_file.name}")
# Initialize readers with minimal debug output
self.bdf = BDF(debug=False)
self.op2 = OP2(debug=False)
# Initialize data structure v1.0
self.neural_field_data = {
"metadata": {},
"geometry": {},
"mesh": {},
"materials": {},
"boundary_conditions": {},
"loads": {},
"results": {}
}
def parse_all(self):
"""
Main parsing function - extracts all data from BDF/OP2 files
Returns:
dict: Complete neural field data structure
"""
print("\n" + "="*60)
print("Starting AtomizerField Neural Field Parser v1.0")
print("="*60)
# Parse input deck
print("\n[1/6] Reading BDF file...")
self.bdf.read_bdf(str(self.bdf_file))
print(f" Loaded {len(self.bdf.nodes)} nodes, {len(self.bdf.elements)} elements")
# Parse results
print("\n[2/6] Reading OP2 file...")
self.op2.read_op2(str(self.op2_file))
# Check for sol attribute (may not exist in all pyNastran versions)
sol_num = getattr(self.op2, 'sol', 'Unknown')
print(f" Loaded solution: SOL {sol_num}")
# Extract all data
print("\n[3/6] Extracting metadata...")
self.extract_metadata()
print("\n[4/6] Extracting mesh data...")
self.extract_mesh()
print("\n[5/6] Extracting materials, BCs, and loads...")
self.extract_materials()
self.extract_boundary_conditions()
self.extract_loads()
print("\n[6/6] Extracting field results...")
self.extract_results()
# Save to file
print("\nSaving data to disk...")
self.save_data()
print("\n" + "="*60)
print("Parsing complete! [OK]")
print("="*60)
return self.neural_field_data
def extract_metadata(self):
"""
Extract metadata and analysis information
This includes:
- Data format version (v1.0.0)
- Timestamps
- Analysis type (SOL 101, 103, etc.)
- Units system
- File hashes for provenance
"""
# Generate file hash for data provenance
with open(self.bdf_file, 'rb') as f:
bdf_hash = hashlib.sha256(f.read()).hexdigest()
with open(self.op2_file, 'rb') as f:
op2_hash = hashlib.sha256(f.read()).hexdigest()
# Extract title if available
title = ""
if hasattr(self.bdf, 'case_control_deck') and self.bdf.case_control_deck:
if hasattr(self.bdf.case_control_deck, 'title'):
title = str(self.bdf.case_control_deck.title)
# Get solution type if available
sol_num = getattr(self.op2, 'sol', 'Unknown')
self.neural_field_data["metadata"] = {
"version": "1.0.0",
"created_at": datetime.now().isoformat(),
"source": "NX_Nastran",
"case_directory": str(self.case_dir),
"case_name": self.case_dir.name,
"analysis_type": f"SOL_{sol_num}",
"title": title,
"file_hashes": {
"bdf": bdf_hash,
"op2": op2_hash
},
"units": {
"length": "mm", # Standard NX units
"force": "N",
"stress": "MPa",
"mass": "kg",
"temperature": "C"
},
"parser_version": "1.0.0",
"notes": "Complete field data for neural network training"
}
print(f" Analysis: {self.neural_field_data['metadata']['analysis_type']}")
def extract_mesh(self):
"""
Extract complete mesh data from BDF
This preserves:
- All node coordinates (global coordinate system)
- All element connectivity
- Element types (solid, shell, beam, rigid)
- Material and property IDs for each element
"""
print(" Extracting nodes...")
# Nodes - store in sorted order for consistent indexing
nodes = []
node_ids = []
for nid, node in sorted(self.bdf.nodes.items()):
node_ids.append(nid)
# Get position in global coordinate system
pos = node.get_position()
nodes.append([pos[0], pos[1], pos[2]])
nodes_array = np.array(nodes, dtype=np.float64)
print(f" Extracted {len(nodes)} nodes")
print(f" Extracting elements...")
# Elements - organize by type for efficient neural network processing
element_data = {
"solid": [],
"shell": [],
"beam": [],
"rigid": []
}
element_type_counts = {}
for eid, elem in sorted(self.bdf.elements.items()):
elem_type = elem.type
element_type_counts[elem_type] = element_type_counts.get(elem_type, 0) + 1
# Solid elements (3D stress states)
if elem_type in ['CTETRA', 'CHEXA', 'CPENTA', 'CTETRA10', 'CHEXA20', 'CPENTA15']:
element_data["solid"].append({
"id": eid,
"type": elem_type,
"nodes": list(elem.node_ids),
"material_id": elem.pid, # Property ID which links to material
"property_id": elem.pid if hasattr(elem, 'pid') else None
})
# Shell elements (2D plane stress)
elif elem_type in ['CQUAD4', 'CTRIA3', 'CQUAD8', 'CTRIA6', 'CQUAD', 'CTRIA']:
thickness = None
try:
# Get thickness from property
if hasattr(elem, 'pid') and elem.pid in self.bdf.properties:
prop = self.bdf.properties[elem.pid]
if hasattr(prop, 't'):
thickness = prop.t
except:
pass
element_data["shell"].append({
"id": eid,
"type": elem_type,
"nodes": list(elem.node_ids),
"material_id": elem.pid,
"property_id": elem.pid,
"thickness": thickness
})
# Beam elements (1D elements)
elif elem_type in ['CBAR', 'CBEAM', 'CROD', 'CONROD']:
element_data["beam"].append({
"id": eid,
"type": elem_type,
"nodes": list(elem.node_ids),
"material_id": elem.pid if hasattr(elem, 'pid') else None,
"property_id": elem.pid if hasattr(elem, 'pid') else None
})
# Rigid elements (kinematic constraints)
elif elem_type in ['RBE2', 'RBE3', 'RBAR', 'RROD']:
element_data["rigid"].append({
"id": eid,
"type": elem_type,
"nodes": list(elem.node_ids)
})
print(f" Extracted {len(self.bdf.elements)} elements:")
for etype, count in element_type_counts.items():
print(f" {etype}: {count}")
# Calculate mesh bounding box for reference
bbox_min = nodes_array.min(axis=0)
bbox_max = nodes_array.max(axis=0)
bbox_size = bbox_max - bbox_min
# Store mesh data
self.neural_field_data["mesh"] = {
"statistics": {
"n_nodes": len(nodes),
"n_elements": len(self.bdf.elements),
"element_types": {
"solid": len(element_data["solid"]),
"shell": len(element_data["shell"]),
"beam": len(element_data["beam"]),
"rigid": len(element_data["rigid"])
},
"element_type_breakdown": element_type_counts
},
"bounding_box": {
"min": bbox_min.tolist(),
"max": bbox_max.tolist(),
"size": bbox_size.tolist()
},
"nodes": {
"ids": node_ids,
"coordinates": nodes_array.tolist(), # Will be stored in HDF5
"shape": list(nodes_array.shape),
"dtype": str(nodes_array.dtype)
},
"elements": element_data
}
def extract_materials(self):
"""
Extract all material properties
Captures complete material definitions including:
- Isotropic (MAT1): E, nu, rho, G, alpha
- Orthotropic (MAT8, MAT9): directional properties
- Stress limits for validation
"""
print(" Extracting materials...")
materials = []
for mid, mat in sorted(self.bdf.materials.items()):
mat_data = {
"id": mid,
"type": mat.type
}
if mat.type == 'MAT1': # Isotropic material
mat_data.update({
"E": float(mat.e) if mat.e is not None else None, # Young's modulus
"nu": float(mat.nu) if mat.nu is not None else None, # Poisson's ratio
"rho": float(mat.rho) if mat.rho is not None else None,# Density
"G": float(mat.g) if mat.g is not None else None, # Shear modulus
"alpha": float(mat.a) if hasattr(mat, 'a') and mat.a is not None else None, # Thermal expansion
"tref": float(mat.tref) if hasattr(mat, 'tref') and mat.tref is not None else None,
})
# Stress limits (if defined)
try:
if hasattr(mat, 'St') and callable(mat.St):
mat_data["ST"] = float(mat.St()) if mat.St() is not None else None
if hasattr(mat, 'Sc') and callable(mat.Sc):
mat_data["SC"] = float(mat.Sc()) if mat.Sc() is not None else None
if hasattr(mat, 'Ss') and callable(mat.Ss):
mat_data["SS"] = float(mat.Ss()) if mat.Ss() is not None else None
except:
pass
elif mat.type == 'MAT8': # Orthotropic shell material
mat_data.update({
"E1": float(mat.e11) if hasattr(mat, 'e11') and mat.e11 is not None else None,
"E2": float(mat.e22) if hasattr(mat, 'e22') and mat.e22 is not None else None,
"nu12": float(mat.nu12) if hasattr(mat, 'nu12') and mat.nu12 is not None else None,
"G12": float(mat.g12) if hasattr(mat, 'g12') and mat.g12 is not None else None,
"G1z": float(mat.g1z) if hasattr(mat, 'g1z') and mat.g1z is not None else None,
"G2z": float(mat.g2z) if hasattr(mat, 'g2z') and mat.g2z is not None else None,
"rho": float(mat.rho) if hasattr(mat, 'rho') and mat.rho is not None else None,
})
materials.append(mat_data)
self.neural_field_data["materials"] = materials
print(f" Extracted {len(materials)} materials")
def extract_boundary_conditions(self):
"""
Extract all boundary conditions
Includes:
- SPC: Single point constraints (fixed DOFs)
- MPC: Multi-point constraints (equations)
- SUPORT: Free body supports
"""
print(" Extracting boundary conditions...")
bcs = {
"spc": [], # Single point constraints
"mpc": [], # Multi-point constraints
"suport": [] # Free body supports
}
# SPC (fixed DOFs) - critical for neural network to understand support conditions
spc_count = 0
for spc_id, spc_list in self.bdf.spcs.items():
for spc in spc_list:
try:
# Handle different SPC types
if hasattr(spc, 'node_ids'):
nodes = spc.node_ids
elif hasattr(spc, 'node'):
nodes = [spc.node]
else:
continue
for node in nodes:
bcs["spc"].append({
"id": spc_id,
"node": int(node),
"dofs": str(spc.components) if hasattr(spc, 'components') else "123456",
"enforced_motion": float(spc.enforced) if hasattr(spc, 'enforced') and spc.enforced is not None else 0.0
})
spc_count += 1
except Exception as e:
warnings.warn(f"Could not parse SPC {spc_id}: {e}")
# MPC equations
mpc_count = 0
for mpc_id, mpc_list in self.bdf.mpcs.items():
for mpc in mpc_list:
try:
bcs["mpc"].append({
"id": mpc_id,
"nodes": list(mpc.node_ids) if hasattr(mpc, 'node_ids') else [],
"coefficients": list(mpc.coefficients) if hasattr(mpc, 'coefficients') else [],
"components": list(mpc.components) if hasattr(mpc, 'components') else []
})
mpc_count += 1
except Exception as e:
warnings.warn(f"Could not parse MPC {mpc_id}: {e}")
self.neural_field_data["boundary_conditions"] = bcs
print(f" Extracted {spc_count} SPCs, {mpc_count} MPCs")
def extract_loads(self):
"""
Extract all loading conditions
Includes:
- Point forces and moments
- Distributed pressures
- Gravity loads
- Thermal loads
"""
print(" Extracting loads...")
loads = {
"point_forces": [],
"pressure": [],
"gravity": [],
"thermal": []
}
force_count = 0
pressure_count = 0
gravity_count = 0
# Point forces, moments, and pressures
for load_id, load_list in self.bdf.loads.items():
for load in load_list:
try:
if load.type == 'FORCE':
loads["point_forces"].append({
"id": load_id,
"type": "force",
"node": int(load.node),
"magnitude": float(load.mag),
"direction": [float(load.xyz[0]), float(load.xyz[1]), float(load.xyz[2])],
"coord_system": int(load.cid) if hasattr(load, 'cid') else 0
})
force_count += 1
elif load.type == 'MOMENT':
loads["point_forces"].append({
"id": load_id,
"type": "moment",
"node": int(load.node),
"magnitude": float(load.mag),
"direction": [float(load.xyz[0]), float(load.xyz[1]), float(load.xyz[2])],
"coord_system": int(load.cid) if hasattr(load, 'cid') else 0
})
force_count += 1
elif load.type in ['PLOAD', 'PLOAD2', 'PLOAD4']:
pressure_data = {
"id": load_id,
"type": load.type
}
if hasattr(load, 'eids'):
pressure_data["elements"] = list(load.eids)
elif hasattr(load, 'eid'):
pressure_data["elements"] = [int(load.eid)]
if hasattr(load, 'pressures'):
pressure_data["pressure"] = [float(p) for p in load.pressures]
elif hasattr(load, 'pressure'):
pressure_data["pressure"] = float(load.pressure)
loads["pressure"].append(pressure_data)
pressure_count += 1
elif load.type == 'GRAV':
loads["gravity"].append({
"id": load_id,
"acceleration": float(load.scale),
"direction": [float(load.N[0]), float(load.N[1]), float(load.N[2])],
"coord_system": int(load.cid) if hasattr(load, 'cid') else 0
})
gravity_count += 1
except Exception as e:
warnings.warn(f"Could not parse load {load_id} type {load.type}: {e}")
# Temperature loads (if available)
thermal_count = 0
if hasattr(self.bdf, 'temps'):
for temp_id, temp_list in self.bdf.temps.items():
for temp in temp_list:
try:
loads["thermal"].append({
"id": temp_id,
"node": int(temp.node),
"temperature": float(temp.temperature)
})
thermal_count += 1
except Exception as e:
warnings.warn(f"Could not parse thermal load {temp_id}: {e}")
self.neural_field_data["loads"] = loads
print(f" Extracted {force_count} forces, {pressure_count} pressures, {gravity_count} gravity, {thermal_count} thermal")
def extract_results(self):
"""
Extract complete field results from OP2
This is the CRITICAL function for neural field learning.
We extract COMPLETE fields, not just maximum values:
- Displacement at every node (6 DOF)
- Stress at every element (full tensor)
- Strain at every element (full tensor)
- Reaction forces at constrained nodes
This complete field data enables the neural network to learn
the physics of how structures respond to loads.
"""
print(" Extracting field results...")
results = {}
# Determine subcase ID (usually 1 for linear static)
subcase_id = 1
if hasattr(self.op2, 'isubcase_name_map'):
available_subcases = list(self.op2.isubcase_name_map.keys())
if available_subcases:
subcase_id = available_subcases[0]
print(f" Using subcase ID: {subcase_id}")
# Displacement - complete field at all nodes
if hasattr(self.op2, 'displacements') and subcase_id in self.op2.displacements:
print(" Processing displacement field...")
disp = self.op2.displacements[subcase_id]
disp_data = disp.data[0, :, :] # [itime=0, all_nodes, 6_dofs]
# Extract node IDs
node_ids = disp.node_gridtype[:, 0].tolist()
# Calculate magnitudes for quick reference
translation_mag = np.linalg.norm(disp_data[:, :3], axis=1)
rotation_mag = np.linalg.norm(disp_data[:, 3:], axis=1)
results["displacement"] = {
"node_ids": node_ids,
"data": disp_data.tolist(), # Will be stored in HDF5
"shape": list(disp_data.shape),
"dtype": str(disp_data.dtype),
"max_translation": float(np.max(translation_mag)),
"max_rotation": float(np.max(rotation_mag)),
"units": "mm and radians"
}
print(f" Displacement: {len(node_ids)} nodes, max={results['displacement']['max_translation']:.6f} mm")
# Stress - handle different element types
stress_results = {}
# Solid element stress (CTETRA, CHEXA, etc.)
stress_attrs = ['ctetra_stress', 'chexa_stress', 'cpenta_stress']
for attr in stress_attrs:
if hasattr(self.op2, attr):
stress_obj = getattr(self.op2, attr)
if subcase_id in stress_obj:
elem_type = attr.replace('_stress', '')
print(f" Processing {elem_type} stress...")
stress = stress_obj[subcase_id]
stress_data = stress.data[0, :, :]
# Extract element IDs
element_ids = stress.element_node[:, 0].tolist()
# Von Mises stress is usually the last column
von_mises = None
if stress_data.shape[1] >= 7: # Has von Mises
von_mises = stress_data[:, -1]
max_vm = float(np.max(von_mises))
von_mises = von_mises.tolist()
else:
max_vm = None
stress_results[f"{elem_type}_stress"] = {
"element_ids": element_ids,
"data": stress_data.tolist(), # Full stress tensor
"shape": list(stress_data.shape),
"dtype": str(stress_data.dtype),
"von_mises": von_mises,
"max_von_mises": max_vm,
"units": "MPa"
}
print(f" {elem_type}: {len(element_ids)} elements, max VM={max_vm:.2f} MPa" if max_vm else f" {elem_type}: {len(element_ids)} elements")
# Shell element stress
shell_stress_attrs = ['cquad4_stress', 'ctria3_stress', 'cquad8_stress', 'ctria6_stress']
for attr in shell_stress_attrs:
if hasattr(self.op2, attr):
stress_obj = getattr(self.op2, attr)
if subcase_id in stress_obj:
elem_type = attr.replace('_stress', '')
print(f" Processing {elem_type} stress...")
stress = stress_obj[subcase_id]
stress_data = stress.data[0, :, :]
element_ids = stress.element_node[:, 0].tolist()
stress_results[f"{elem_type}_stress"] = {
"element_ids": element_ids,
"data": stress_data.tolist(),
"shape": list(stress_data.shape),
"dtype": str(stress_data.dtype),
"units": "MPa"
}
print(f" {elem_type}: {len(element_ids)} elements")
results["stress"] = stress_results
# Strain - similar to stress
strain_results = {}
strain_attrs = ['ctetra_strain', 'chexa_strain', 'cpenta_strain',
'cquad4_strain', 'ctria3_strain']
for attr in strain_attrs:
if hasattr(self.op2, attr):
strain_obj = getattr(self.op2, attr)
if subcase_id in strain_obj:
elem_type = attr.replace('_strain', '')
strain = strain_obj[subcase_id]
strain_data = strain.data[0, :, :]
element_ids = strain.element_node[:, 0].tolist()
strain_results[f"{elem_type}_strain"] = {
"element_ids": element_ids,
"data": strain_data.tolist(),
"shape": list(strain_data.shape),
"dtype": str(strain_data.dtype),
"units": "mm/mm"
}
if strain_results:
results["strain"] = strain_results
print(f" Extracted strain for {len(strain_results)} element types")
# SPC Forces (reaction forces at constraints)
if hasattr(self.op2, 'spc_forces') and subcase_id in self.op2.spc_forces:
print(" Processing reaction forces...")
spc = self.op2.spc_forces[subcase_id]
spc_data = spc.data[0, :, :]
node_ids = spc.node_gridtype[:, 0].tolist()
# Calculate total reaction force magnitude
force_mag = np.linalg.norm(spc_data[:, :3], axis=1)
moment_mag = np.linalg.norm(spc_data[:, 3:], axis=1)
results["reactions"] = {
"node_ids": node_ids,
"forces": spc_data.tolist(),
"shape": list(spc_data.shape),
"dtype": str(spc_data.dtype),
"max_force": float(np.max(force_mag)),
"max_moment": float(np.max(moment_mag)),
"units": "N and N-mm"
}
print(f" Reactions: {len(node_ids)} nodes, max force={results['reactions']['max_force']:.2f} N")
self.neural_field_data["results"] = results
def save_data(self):
"""
Save parsed data to JSON and HDF5 files
Data structure:
- neural_field_data.json: Metadata, structure, small arrays
- neural_field_data.h5: Large arrays (node coordinates, field results)
HDF5 is used for efficient storage and loading of large numerical arrays.
JSON provides human-readable metadata and structure.
"""
# Save JSON metadata
json_file = self.case_dir / "neural_field_data.json"
# Create a copy for JSON (will remove large arrays)
json_data = self._prepare_json_data()
with open(json_file, 'w') as f:
json.dump(json_data, f, indent=2, default=str)
print(f" [OK] Saved metadata to: {json_file.name}")
# Save HDF5 for large arrays
h5_file = self.case_dir / "neural_field_data.h5"
with h5py.File(h5_file, 'w') as f:
# Metadata attributes
f.attrs['version'] = '1.0.0'
f.attrs['created_at'] = self.neural_field_data['metadata']['created_at']
f.attrs['case_name'] = self.neural_field_data['metadata']['case_name']
# Save mesh data
mesh_grp = f.create_group('mesh')
node_coords = np.array(self.neural_field_data["mesh"]["nodes"]["coordinates"])
mesh_grp.create_dataset('node_coordinates',
data=node_coords,
compression='gzip',
compression_opts=4)
mesh_grp.create_dataset('node_ids',
data=np.array(self.neural_field_data["mesh"]["nodes"]["ids"]))
# Save results
if "results" in self.neural_field_data:
results_grp = f.create_group('results')
# Displacement
if "displacement" in self.neural_field_data["results"]:
disp_data = np.array(self.neural_field_data["results"]["displacement"]["data"])
results_grp.create_dataset('displacement',
data=disp_data,
compression='gzip',
compression_opts=4)
results_grp.create_dataset('displacement_node_ids',
data=np.array(self.neural_field_data["results"]["displacement"]["node_ids"]))
# Stress fields
if "stress" in self.neural_field_data["results"]:
stress_grp = results_grp.create_group('stress')
for stress_type, stress_data in self.neural_field_data["results"]["stress"].items():
type_grp = stress_grp.create_group(stress_type)
type_grp.create_dataset('data',
data=np.array(stress_data["data"]),
compression='gzip',
compression_opts=4)
type_grp.create_dataset('element_ids',
data=np.array(stress_data["element_ids"]))
# Strain fields
if "strain" in self.neural_field_data["results"]:
strain_grp = results_grp.create_group('strain')
for strain_type, strain_data in self.neural_field_data["results"]["strain"].items():
type_grp = strain_grp.create_group(strain_type)
type_grp.create_dataset('data',
data=np.array(strain_data["data"]),
compression='gzip',
compression_opts=4)
type_grp.create_dataset('element_ids',
data=np.array(strain_data["element_ids"]))
# Reactions
if "reactions" in self.neural_field_data["results"]:
reactions_data = np.array(self.neural_field_data["results"]["reactions"]["forces"])
results_grp.create_dataset('reactions',
data=reactions_data,
compression='gzip',
compression_opts=4)
results_grp.create_dataset('reaction_node_ids',
data=np.array(self.neural_field_data["results"]["reactions"]["node_ids"]))
print(f" [OK] Saved field data to: {h5_file.name}")
# Calculate and display file sizes
json_size = json_file.stat().st_size / 1024 # KB
h5_size = h5_file.stat().st_size / 1024 # KB
print(f"\n File sizes:")
print(f" JSON: {json_size:.1f} KB")
print(f" HDF5: {h5_size:.1f} KB")
print(f" Total: {json_size + h5_size:.1f} KB")
def _prepare_json_data(self):
"""
Prepare data for JSON export by removing large arrays
(they go to HDF5 instead)
"""
import copy
json_data = copy.deepcopy(self.neural_field_data)
# Remove large arrays from nodes (keep metadata)
if "mesh" in json_data and "nodes" in json_data["mesh"]:
json_data["mesh"]["nodes"]["coordinates"] = f"<stored in HDF5: shape {json_data['mesh']['nodes']['shape']}>"
# Remove large result arrays
if "results" in json_data:
if "displacement" in json_data["results"]:
shape = json_data["results"]["displacement"]["shape"]
json_data["results"]["displacement"]["data"] = f"<stored in HDF5: shape {shape}>"
if "stress" in json_data["results"]:
for stress_type in json_data["results"]["stress"]:
shape = json_data["results"]["stress"][stress_type]["shape"]
json_data["results"]["stress"][stress_type]["data"] = f"<stored in HDF5: shape {shape}>"
if "strain" in json_data["results"]:
for strain_type in json_data["results"]["strain"]:
shape = json_data["results"]["strain"][strain_type]["shape"]
json_data["results"]["strain"][strain_type]["data"] = f"<stored in HDF5: shape {shape}>"
if "reactions" in json_data["results"]:
shape = json_data["results"]["reactions"]["shape"]
json_data["results"]["reactions"]["forces"] = f"<stored in HDF5: shape {shape}>"
return json_data
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
def main():
"""
Main function to run the parser from command line
Usage:
python neural_field_parser.py <case_directory>
"""
import sys
if len(sys.argv) < 2:
print("\nAtomizerField Neural Field Parser v1.0")
print("="*60)
print("\nUsage:")
print(" python neural_field_parser.py <case_directory>")
print("\nExample:")
print(" python neural_field_parser.py training_case_001")
print("\nCase directory should contain:")
print(" input/model.bdf (or model.dat)")
print(" output/model.op2")
print("\n")
sys.exit(1)
case_dir = sys.argv[1]
# Verify directory exists
if not Path(case_dir).exists():
print(f"ERROR: Directory not found: {case_dir}")
sys.exit(1)
# Create parser
try:
parser = NastranToNeuralFieldParser(case_dir)
except FileNotFoundError as e:
print(f"\nERROR: {e}")
print("\nPlease ensure your case directory contains:")
print(" input/model.bdf (or model.dat)")
print(" output/model.op2")
sys.exit(1)
# Parse all data
try:
data = parser.parse_all()
# Print summary
print("\n" + "="*60)
print("PARSING SUMMARY")
print("="*60)
print(f"Case: {data['metadata']['case_name']}")
print(f"Analysis: {data['metadata']['analysis_type']}")
print(f"\nMesh:")
print(f" Nodes: {data['mesh']['statistics']['n_nodes']:,}")
print(f" Elements: {data['mesh']['statistics']['n_elements']:,}")
for elem_type, count in data['mesh']['statistics']['element_types'].items():
if count > 0:
print(f" {elem_type}: {count:,}")
print(f"\nMaterials: {len(data['materials'])}")
print(f"Boundary Conditions: {len(data['boundary_conditions']['spc'])} SPCs")
print(f"Loads: {len(data['loads']['point_forces'])} forces, {len(data['loads']['pressure'])} pressures")
if "displacement" in data['results']:
print(f"\nResults:")
print(f" Displacement: {len(data['results']['displacement']['node_ids'])} nodes")
print(f" Max: {data['results']['displacement']['max_translation']:.6f} mm")
if "stress" in data['results']:
for stress_type in data['results']['stress']:
if 'max_von_mises' in data['results']['stress'][stress_type]:
max_vm = data['results']['stress'][stress_type]['max_von_mises']
if max_vm is not None:
print(f" {stress_type}: Max VM = {max_vm:.2f} MPa")
print("\n[OK] Data ready for neural network training!")
print("="*60 + "\n")
except Exception as e:
print(f"\n" + "="*60)
print("ERROR DURING PARSING")
print("="*60)
print(f"{e}\n")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()