Files
Atomizer/optimization_engine/extractors/field_data_extractor.py

209 lines
6.1 KiB
Python
Raw Normal View History

"""
Generic Field Data Extractor
=============================
Reusable extractor for parsing NX exported field data (.fld files).
Works for any result type: displacement, stress, strain, temperature, etc.
Supports TWO formats:
1. NX native .fld format (exported from ResultProbe)
2. CSV format with headers
Usage:
extractor = FieldDataExtractor(field_file="export_field_dz.fld")
results = extractor.extract()
max_value = results['max_abs_value']
"""
import csv
from pathlib import Path
from typing import Dict, Any, Optional, List
import numpy as np
class FieldDataExtractor:
"""
Generic extractor for NX exported field data files (.fld or .csv format).
Supports:
- Displacement (X, Y, Z components)
- Stress (von Mises, principals, components)
- Strain
- Temperature
- Any other scalar field data
"""
def __init__(
self,
field_file: str,
result_column: str = "x(mm)", # Column name to extract (for CSV format)
aggregation: str = "max_abs", # max_abs, max, min, mean, std
):
"""
Args:
field_file: Path to .fld or .csv file
result_column: Column name containing the result values (for CSV format, ignored for .fld)
aggregation: How to aggregate values (max_abs, max, min, mean, std)
"""
self.field_file = Path(field_file)
self.result_column = result_column
self.aggregation = aggregation
def _is_nx_field_format(self) -> bool:
"""Check if file is NX field format or CSV"""
with open(self.field_file, 'r') as f:
first_line = f.readline()
return first_line.startswith('FIELD:')
def _parse_nx_field_file(self) -> List[float]:
"""
Parse NX native field export format (.fld).
Format:
FIELD: [ResultProbe] : [TABLE]
...metadata...
START DATA
step, node_id, value
0, 396, -0.086716040968895
...
END DATA
"""
values = []
in_data_section = False
with open(self.field_file, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('START DATA'):
in_data_section = True
continue
if line.startswith('END DATA'):
break
if in_data_section and line:
# Data format: step, node_id, value
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 3:
try:
value = float(parts[2]) # Third column is the value
values.append(value)
except ValueError:
continue
return values
def _parse_csv_file(self) -> List[float]:
"""
Parse CSV file with column headers.
Expected format:
node_id,x(mm),y(mm),z(mm)
1,0.0,0.0,0.5
2,0.1,0.0,0.6
...
"""
values = []
with open(self.field_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
try:
value = float(row[self.result_column])
values.append(value)
except (KeyError, ValueError) as e:
continue # Skip invalid rows
return values
def extract(self) -> Dict[str, Any]:
"""
Extract and aggregate field data.
Returns:
dict: {
'value': aggregated value,
'all_values': list of all values,
'node_count': number of nodes,
'stats': {min, max, mean, std, max_abs}
}
"""
if not self.field_file.exists():
raise FileNotFoundError(f"Field file not found: {self.field_file}")
# Detect format and parse
if self._is_nx_field_format():
values = self._parse_nx_field_file()
else:
values = self._parse_csv_file()
if not values:
raise ValueError(f"No valid data found in field file: {self.field_file}")
values_array = np.array(values)
# Calculate statistics
stats = {
'min': float(np.min(values_array)),
'max': float(np.max(values_array)),
'mean': float(np.mean(values_array)),
'std': float(np.std(values_array)),
'max_abs': float(np.max(np.abs(values_array))),
}
# Get aggregated value based on method
aggregation_map = {
'max': stats['max'],
'min': stats['min'],
'mean': stats['mean'],
'std': stats['std'],
'max_abs': stats['max_abs'],
}
aggregated_value = aggregation_map.get(self.aggregation, stats['max_abs'])
return {
'value': aggregated_value,
'all_values': values,
'node_count': len(values),
'stats': stats,
'aggregation_method': self.aggregation,
}
def extract_displacement_field(
field_file: str,
component: str = "z", # x, y, or z
aggregation: str = "max_abs"
) -> Dict[str, Any]:
"""
Convenience function for extracting displacement data.
Args:
field_file: Path to displacement field file
component: Displacement component (x, y, or z)
aggregation: Aggregation method
Returns:
Extracted displacement data
"""
column_map = {
'x': 'x(mm)',
'y': 'y(mm)',
'z': 'z(mm)',
}
column = column_map.get(component.lower(), 'x(mm)')
extractor = FieldDataExtractor(field_file, result_column=column, aggregation=aggregation)
return extractor.extract()
if __name__ == "__main__":
# Example usage
import sys
if len(sys.argv) > 1:
field_file = sys.argv[1]
results = extract_displacement_field(field_file, component="z")
print(f"Max absolute Z-displacement: {results['value']:.6f} mm")
print(f"Statistics: {results['stats']}")