Files
Atomizer/optimization_engine/hooks/nx_cad/model_introspection.py
Anto01 274081d977 refactor: Engine updates and NX hooks improvements
optimization_engine:
- Updated nx_solver.py with improvements
- Enhanced solve_simulation.py
- Updated extractors/__init__.py
- Improved NX CAD hooks (expression_manager, feature_manager,
  geometry_query, model_introspection, part_manager)
- Enhanced NX CAE solver_manager hook

Documentation:
- Updated OP_01_CREATE_STUDY.md protocol
- Updated SYS_12_EXTRACTOR_LIBRARY.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-20 13:47:21 -05:00

1164 lines
37 KiB
Python

"""
NX Model Introspection Hook
===========================
Comprehensive extraction of ALL model information from NX parts and simulations.
Provides a complete "big picture" view of what's available for optimization.
Features:
- Part introspection: expressions, mass, material, bodies, features
- Simulation introspection: solutions, BCs, loads, output requests
- OP2 introspection: available results (displacement, stress, strain, etc.)
Usage:
from optimization_engine.hooks.nx_cad import model_introspection
# Get everything from a part
result = model_introspection.introspect_part("C:/model.prt")
# Get everything from a simulation
result = model_introspection.introspect_simulation("C:/model.sim")
# Get everything from an OP2 results file
result = model_introspection.introspect_op2("C:/results.op2")
Phase 2.5 - NX Open Automation Roadmap
"""
import os
import json
import subprocess
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List
# Import NX path from centralized config
try:
from config import NX_BIN_DIR
NX_BIN_PATH = str(NX_BIN_DIR)
except ImportError:
# Fallback if config not available
NX_BIN_PATH = os.environ.get(
"NX_BIN_PATH",
r"C:\Program Files\Siemens\DesigncenterNX2512\NXBIN"
)
# =============================================================================
# NX Journal for PART Introspection
# =============================================================================
PART_INTROSPECTION_JOURNAL = '''
# NX Open Python Journal - Part Introspection
# Extracts ALL information from an NX part file
# Auto-generated by Atomizer hooks
import NXOpen
import NXOpen.UF
import json
import sys
import os
def main():
"""Extract comprehensive part information."""
session = NXOpen.Session.GetSession()
args = sys.argv[1:] if len(sys.argv) > 1 else []
if len(args) < 2:
raise ValueError("Usage: script.py <part_path> <output_json>")
part_path = args[0]
output_json = args[1]
result = {"success": False, "error": None, "data": {}}
try:
part = ensure_part_open(session, part_path)
if part is None:
result["error"] = f"Failed to open part: {part_path}"
else:
result = introspect_part(session, part, part_path)
except Exception as e:
import traceback
result["error"] = str(e)
result["traceback"] = traceback.format_exc()
with open(output_json, 'w') as f:
json.dump(result, f, indent=2)
return result
def ensure_part_open(session, part_path):
"""Open part if not already open."""
part_path_norm = os.path.normpath(part_path).lower()
for part in session.Parts:
if os.path.normpath(part.FullPath).lower() == part_path_norm:
return part
if not os.path.exists(part_path):
return None
try:
working_dir = os.path.dirname(part_path)
session.Parts.LoadOptions.ComponentLoadMethod = NXOpen.LoadOptions.LoadMethod.FromDirectory
session.Parts.LoadOptions.SetSearchDirectories([working_dir], [True])
part, load_status = session.Parts.OpenActiveDisplay(
part_path, NXOpen.DisplayPartOption.AllowAdditional
)
load_status.Dispose()
return part
except:
return None
def introspect_part(session, part, part_path):
"""Extract all information from a part."""
result = {"success": True, "error": None, "data": {}}
# Basic info
result["data"]["file_info"] = {
"path": part_path,
"name": part.Name,
"leaf": part.Leaf,
"is_modified": part.IsModified,
}
# Expressions
result["data"]["expressions"] = extract_expressions(part)
# Bodies
result["data"]["bodies"] = extract_bodies(part)
# Mass properties
result["data"]["mass_properties"] = extract_mass_properties(part)
# Features
result["data"]["features"] = extract_features(part)
# Attributes
result["data"]["attributes"] = extract_attributes(part)
# Units
result["data"]["units"] = extract_units(part)
# Summary
result["data"]["summary"] = {
"expression_count": result["data"]["expressions"]["count"],
"user_expression_count": len([e for e in result["data"]["expressions"].get("expressions", {}).values()
if not e.get("name", "").startswith("p")]),
"body_count": result["data"]["bodies"]["count"],
"solid_body_count": result["data"]["bodies"]["solid_count"],
"feature_count": result["data"]["features"]["count"],
"has_mass": result["data"]["mass_properties"].get("mass") is not None,
}
return result
def extract_expressions(part):
"""Extract all expressions from the part."""
data = {"count": 0, "expressions": {}, "by_type": {}}
try:
for expr in part.Expressions:
try:
expr_type = str(expr.Type) if hasattr(expr, 'Type') else "Unknown"
expr_data = {
"name": expr.Name,
"value": expr.Value,
"rhs": expr.RightHandSide,
"units": expr.Units.Name if expr.Units else None,
"type": expr_type,
"is_geometric": "p" in expr.Name.lower()[:2], # p0, p1, etc. are internal
}
data["expressions"][expr.Name] = expr_data
# Group by type
if expr_type not in data["by_type"]:
data["by_type"][expr_type] = []
data["by_type"][expr_type].append(expr.Name)
except:
pass
data["count"] = len(data["expressions"])
except Exception as e:
data["error"] = str(e)
return data
def extract_bodies(part):
"""Extract body information."""
data = {"count": 0, "solid_count": 0, "sheet_count": 0, "bodies": []}
try:
for body in part.Bodies:
body_data = {
"name": body.Name if hasattr(body, 'Name') else None,
"is_solid": body.IsSolidBody,
"is_sheet": body.IsSheetBody,
"material": None,
}
try:
phys_mat = body.GetPhysicalMaterial()
if phys_mat:
body_data["material"] = phys_mat.Name
except:
pass
data["bodies"].append(body_data)
data["count"] += 1
if body.IsSolidBody:
data["solid_count"] += 1
if body.IsSheetBody:
data["sheet_count"] += 1
except Exception as e:
data["error"] = str(e)
return data
def extract_mass_properties(part):
"""Extract mass properties."""
data = {}
try:
solid_bodies = [b for b in part.Bodies if b.IsSolidBody]
if not solid_bodies:
data["error"] = "No solid bodies"
return data
measure_manager = part.MeasureManager
uc = part.UnitCollection
mass_units = [
uc.GetBase("Area"),
uc.GetBase("Volume"),
uc.GetBase("Mass"),
uc.GetBase("Length")
]
mass_props = measure_manager.NewMassProperties(mass_units, 0.99, solid_bodies)
data["mass"] = mass_props.Mass
data["mass_unit"] = "kg"
data["volume"] = mass_props.Volume
data["volume_unit"] = "mm^3"
data["surface_area"] = mass_props.Area
data["area_unit"] = "mm^2"
centroid = mass_props.Centroid
data["centroid"] = {
"x": centroid.X,
"y": centroid.Y,
"z": centroid.Z,
"unit": "mm"
}
try:
pm = mass_props.PrincipalMomentsOfInertia
data["principal_moments"] = {
"Ixx": pm[0], "Iyy": pm[1], "Izz": pm[2],
"unit": "kg*mm^2"
}
except:
pass
except Exception as e:
data["error"] = str(e)
return data
def extract_features(part):
"""Extract feature information."""
data = {"count": 0, "features": [], "by_type": {}}
try:
for feat in part.Features:
try:
feat_type = feat.GetFeatureName() if hasattr(feat, 'GetFeatureName') else type(feat).__name__
feat_data = {
"name": feat.Name if hasattr(feat, 'Name') else None,
"type": feat_type,
"suppressed": feat.IsSuppressed if hasattr(feat, 'IsSuppressed') else False,
}
data["features"].append(feat_data)
if feat_type not in data["by_type"]:
data["by_type"][feat_type] = 0
data["by_type"][feat_type] += 1
except:
pass
data["count"] = len(data["features"])
except Exception as e:
data["error"] = str(e)
return data
def extract_attributes(part):
"""Extract part attributes."""
data = {"count": 0, "attributes": {}}
try:
attrs = part.GetUserAttributes()
for attr in attrs:
try:
attr_data = {
"title": attr.Title,
"type": str(attr.Type),
}
if hasattr(attr, 'StringValue'):
attr_data["value"] = attr.StringValue
elif hasattr(attr, 'RealValue'):
attr_data["value"] = attr.RealValue
elif hasattr(attr, 'IntegerValue'):
attr_data["value"] = attr.IntegerValue
data["attributes"][attr.Title] = attr_data
except:
pass
data["count"] = len(data["attributes"])
except Exception as e:
data["error"] = str(e)
return data
def extract_units(part):
"""Extract unit system information."""
data = {}
try:
uc = part.UnitCollection
data["system"] = "metric" # NX default
# Get base units
data["base_units"] = {}
for unit_type in ["Length", "Mass", "Time", "Area", "Volume"]:
try:
unit = uc.GetBase(unit_type)
data["base_units"][unit_type] = unit.Name if unit else None
except:
pass
except Exception as e:
data["error"] = str(e)
return data
if __name__ == "__main__":
main()
'''
# =============================================================================
# NX Journal for SIMULATION Introspection
# =============================================================================
SIMULATION_INTROSPECTION_JOURNAL = '''
# NX Open Python Journal - Simulation Introspection
# Extracts ALL information from an NX simulation file
# Auto-generated by Atomizer hooks
import NXOpen
import NXOpen.CAE
import json
import sys
import os
def main():
"""Extract comprehensive simulation information."""
session = NXOpen.Session.GetSession()
args = sys.argv[1:] if len(sys.argv) > 1 else []
if len(args) < 2:
raise ValueError("Usage: script.py <sim_path> <output_json>")
sim_path = args[0]
output_json = args[1]
result = {"success": False, "error": None, "data": {}}
try:
# Set load options
working_dir = os.path.dirname(sim_path)
session.Parts.LoadOptions.ComponentLoadMethod = NXOpen.LoadOptions.LoadMethod.FromDirectory
session.Parts.LoadOptions.SetSearchDirectories([working_dir], [True])
# Open simulation
basePart, loadStatus = session.Parts.OpenActiveDisplay(
sim_path, NXOpen.DisplayPartOption.AllowAdditional
)
loadStatus.Dispose()
simPart = session.Parts.Work
if not isinstance(simPart, NXOpen.CAE.SimPart):
result["error"] = f"Not a simulation part: {type(simPart)}"
else:
result = introspect_simulation(session, simPart, sim_path)
except Exception as e:
import traceback
result["error"] = str(e)
result["traceback"] = traceback.format_exc()
with open(output_json, 'w') as f:
json.dump(result, f, indent=2)
return result
def introspect_simulation(session, simPart, sim_path):
"""Extract all information from a simulation."""
result = {"success": True, "error": None, "data": {}}
sim = simPart.Simulation
# Basic info
result["data"]["file_info"] = {
"path": sim_path,
"name": simPart.Name,
"simulation_name": sim.Name if sim else None,
}
# Solutions
result["data"]["solutions"] = extract_solutions(sim)
# Boundary Conditions
result["data"]["boundary_conditions"] = extract_boundary_conditions(sim)
# Loads
result["data"]["loads"] = extract_loads(sim)
# Materials
result["data"]["materials"] = extract_sim_materials(simPart)
# Mesh info
result["data"]["mesh"] = extract_mesh_info(simPart)
# Output requests (from first solution)
result["data"]["output_requests"] = extract_output_requests(sim)
# Summary
result["data"]["summary"] = {
"solution_count": len(result["data"]["solutions"].get("solutions", [])),
"bc_count": result["data"]["boundary_conditions"].get("count", 0),
"load_count": result["data"]["loads"].get("count", 0),
"material_count": result["data"]["materials"].get("count", 0),
"node_count": result["data"]["mesh"].get("node_count", 0),
"element_count": result["data"]["mesh"].get("element_count", 0),
}
return result
def extract_solutions(sim):
"""Extract solution information."""
data = {"solutions": []}
try:
for sol in sim.Solutions:
sol_data = {
"name": sol.Name,
"type": str(sol.SolutionType) if hasattr(sol, 'SolutionType') else None,
"solver": None,
}
# Try to get solver type
try:
sol_data["solver"] = sol.SolverTypeName if hasattr(sol, 'SolverTypeName') else "Nastran"
except:
sol_data["solver"] = "Nastran"
# Try to get analysis type
try:
sol_data["analysis_type"] = str(sol.AnalysisType) if hasattr(sol, 'AnalysisType') else None
except:
pass
data["solutions"].append(sol_data)
except Exception as e:
data["error"] = str(e)
return data
def extract_boundary_conditions(sim):
"""Extract boundary condition information."""
data = {"count": 0, "boundary_conditions": [], "by_type": {}}
try:
for bc in sim.BoundaryConditions:
try:
bc_type = type(bc).__name__
bc_data = {
"name": bc.Name if hasattr(bc, 'Name') else None,
"type": bc_type,
"active": bc.IsActive if hasattr(bc, 'IsActive') else True,
}
# Try to get constraint type details
if hasattr(bc, 'ConstraintType'):
bc_data["constraint_type"] = str(bc.ConstraintType)
# Check for SPC (fixed constraints)
if 'SPC' in bc_type or 'Constraint' in bc_type or 'Fixed' in bc_type:
bc_data["category"] = "constraint"
else:
bc_data["category"] = "other"
data["boundary_conditions"].append(bc_data)
if bc_type not in data["by_type"]:
data["by_type"][bc_type] = 0
data["by_type"][bc_type] += 1
except:
pass
data["count"] = len(data["boundary_conditions"])
except Exception as e:
data["error"] = str(e)
return data
def extract_loads(sim):
"""Extract load information."""
data = {"count": 0, "loads": [], "by_type": {}}
try:
for load in sim.Loads:
try:
load_type = type(load).__name__
load_data = {
"name": load.Name if hasattr(load, 'Name') else None,
"type": load_type,
"active": load.IsActive if hasattr(load, 'IsActive') else True,
}
# Try to get load magnitude
if hasattr(load, 'Magnitude'):
try:
load_data["magnitude"] = load.Magnitude.Value
load_data["magnitude_unit"] = load.Magnitude.Units.Name if load.Magnitude.Units else None
except:
pass
# Categorize load type
if 'Force' in load_type:
load_data["category"] = "force"
elif 'Pressure' in load_type:
load_data["category"] = "pressure"
elif 'Moment' in load_type or 'Torque' in load_type:
load_data["category"] = "moment"
elif 'Temperature' in load_type or 'Thermal' in load_type:
load_data["category"] = "thermal"
elif 'Gravity' in load_type or 'Acceleration' in load_type:
load_data["category"] = "inertia"
else:
load_data["category"] = "other"
data["loads"].append(load_data)
if load_type not in data["by_type"]:
data["by_type"][load_type] = 0
data["by_type"][load_type] += 1
except:
pass
data["count"] = len(data["loads"])
except Exception as e:
data["error"] = str(e)
return data
def extract_sim_materials(simPart):
"""Extract material information from simulation."""
data = {"count": 0, "materials": []}
try:
# Try to get materials from the simulation
for mat in simPart.MaterialManager.PhysicalMaterials:
try:
mat_data = {
"name": mat.Name,
"properties": {}
}
# Common material properties
for prop_name in ["Density", "YoungsModulus", "PoissonsRatio",
"ThermalExpansionCoefficient", "ThermalConductivity"]:
try:
val = mat.GetRealPropertyValue(prop_name)
mat_data["properties"][prop_name] = val
except:
pass
data["materials"].append(mat_data)
except:
pass
data["count"] = len(data["materials"])
except Exception as e:
data["error"] = str(e)
return data
def extract_mesh_info(simPart):
"""Extract mesh information."""
data = {"node_count": 0, "element_count": 0, "element_types": {}}
try:
# Try to get mesh from FEM
fem = None
try:
# For assembly FEM, need to navigate to component
fem = simPart.FemPart.BaseFEModel if hasattr(simPart, 'FemPart') else None
except:
pass
if fem is None:
try:
# Direct access
fem = simPart.Simulation.Femodel
except:
pass
if fem:
try:
data["node_count"] = fem.MeshManager.NodeCount if hasattr(fem.MeshManager, 'NodeCount') else 0
data["element_count"] = fem.MeshManager.ElementCount if hasattr(fem.MeshManager, 'ElementCount') else 0
except:
pass
except Exception as e:
data["error"] = str(e)
return data
def extract_output_requests(sim):
"""Extract output request information (what will be in the OP2)."""
data = {"requests": [], "available_outputs": []}
try:
# Standard Nastran outputs that might be requested
standard_outputs = [
{"name": "DISPLACEMENT", "code": "DISP", "description": "Nodal displacements"},
{"name": "STRESS", "code": "STRESS", "description": "Element stresses"},
{"name": "STRAIN", "code": "STRAIN", "description": "Element strains"},
{"name": "FORCE", "code": "FORCE", "description": "Element forces"},
{"name": "SPCFORCES", "code": "SPCF", "description": "Reaction forces at constraints"},
{"name": "MPCFORCES", "code": "MPCF", "description": "Multi-point constraint forces"},
{"name": "OLOAD", "code": "OLOAD", "description": "Applied loads"},
{"name": "GPFORCE", "code": "GPFO", "description": "Grid point forces"},
{"name": "ESE", "code": "ESE", "description": "Element strain energy"},
{"name": "GPSTRESS", "code": "GPST", "description": "Grid point stresses"},
{"name": "TEMPERATURE", "code": "TEMP", "description": "Nodal temperatures"},
{"name": "VELOCITY", "code": "VELO", "description": "Nodal velocities"},
{"name": "ACCELERATION", "code": "ACCE", "description": "Nodal accelerations"},
{"name": "EIGENVECTOR", "code": "EIGV", "description": "Mode shapes"},
]
data["available_outputs"] = standard_outputs
# Try to get actual output requests from solution
for sol in sim.Solutions:
try:
# Output requests are typically in case control
req_data = {
"solution": sol.Name,
"outputs": []
}
# Check what's typically output
req_data["outputs"] = ["DISPLACEMENT", "STRESS", "SPCFORCES"] # Default
data["requests"].append(req_data)
except:
pass
except Exception as e:
data["error"] = str(e)
return data
if __name__ == "__main__":
main()
'''
def _run_journal(journal_content: str, *args) -> Dict[str, Any]:
"""Execute an NX journal script and return the result."""
run_journal_exe = Path(NX_BIN_PATH) / "run_journal.exe"
if not run_journal_exe.exists():
return {
"success": False,
"error": f"run_journal.exe not found at {run_journal_exe}",
"data": {}
}
# Create temporary files
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as journal_file:
journal_file.write(journal_content)
journal_path = journal_file.name
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as output_file:
output_path = output_file.name
try:
# Build command
cmd = [str(run_journal_exe), journal_path, "-args"]
cmd.extend(str(a) for a in args)
cmd.append(output_path)
# Execute
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180 # 3 minute timeout for introspection
)
# Read result
if os.path.exists(output_path):
with open(output_path, 'r') as f:
return json.load(f)
else:
return {
"success": False,
"error": f"No output file generated. stdout: {result.stdout[-1000:]}, stderr: {result.stderr[-500:]}",
"data": {}
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Journal execution timed out after 180 seconds",
"data": {}
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": {}
}
finally:
# Cleanup
try:
os.unlink(journal_path)
except:
pass
try:
os.unlink(output_path)
except:
pass
# =============================================================================
# OP2 Introspection (Pure Python using pyNastran)
# =============================================================================
def introspect_op2(op2_path: str) -> Dict[str, Any]:
"""
Introspect an OP2 results file to see what data is available.
Args:
op2_path: Path to OP2 file
Returns:
Dict with available results, subcases, element types, etc.
"""
result = {"success": False, "error": None, "data": {}}
if not os.path.exists(op2_path):
result["error"] = f"OP2 file not found: {op2_path}"
return result
try:
from pyNastran.op2.op2 import OP2
import logging
logging.getLogger('pyNastran').setLevel(logging.ERROR)
op2 = OP2(debug=False, log=None)
op2.read_op2(op2_path)
data = {
"file_info": {
"path": op2_path,
"size_mb": os.path.getsize(op2_path) / (1024 * 1024),
},
"subcases": [],
"results": {
"displacement": {"available": False, "subcases": []},
"velocity": {"available": False, "subcases": []},
"acceleration": {"available": False, "subcases": []},
"eigenvectors": {"available": False, "modes": 0},
"spc_forces": {"available": False, "subcases": []},
"mpc_forces": {"available": False, "subcases": []},
"stress": {"available": False, "element_types": [], "subcases": []},
"strain": {"available": False, "element_types": [], "subcases": []},
"strain_energy": {"available": False, "subcases": []},
"temperature": {"available": False, "subcases": []},
"element_forces": {"available": False, "element_types": [], "subcases": []},
},
"mesh": {
"node_count": 0,
"element_count": 0,
"element_types": {},
},
"materials": [],
"properties": [],
}
# Get subcases
subcases = set()
# Check displacements
if hasattr(op2, 'displacements') and op2.displacements:
data["results"]["displacement"]["available"] = True
for sc in op2.displacements.keys():
subcases.add(sc)
data["results"]["displacement"]["subcases"].append(sc)
# Check velocities
if hasattr(op2, 'velocities') and op2.velocities:
data["results"]["velocity"]["available"] = True
for sc in op2.velocities.keys():
subcases.add(sc)
data["results"]["velocity"]["subcases"].append(sc)
# Check accelerations
if hasattr(op2, 'accelerations') and op2.accelerations:
data["results"]["acceleration"]["available"] = True
for sc in op2.accelerations.keys():
subcases.add(sc)
data["results"]["acceleration"]["subcases"].append(sc)
# Check eigenvectors
if hasattr(op2, 'eigenvectors') and op2.eigenvectors:
data["results"]["eigenvectors"]["available"] = True
for sc, eigv in op2.eigenvectors.items():
data["results"]["eigenvectors"]["modes"] = len(eigv.modes) if hasattr(eigv, 'modes') else 0
# Check SPC forces
if hasattr(op2, 'spc_forces') and op2.spc_forces:
data["results"]["spc_forces"]["available"] = True
for sc in op2.spc_forces.keys():
subcases.add(sc)
data["results"]["spc_forces"]["subcases"].append(sc)
# Check MPC forces
if hasattr(op2, 'mpc_forces') and op2.mpc_forces:
data["results"]["mpc_forces"]["available"] = True
for sc in op2.mpc_forces.keys():
subcases.add(sc)
data["results"]["mpc_forces"]["subcases"].append(sc)
# Check stresses (various element types)
stress_attrs = [
'ctetra_stress', 'chexa_stress', 'cpenta_stress', 'cpyram_stress',
'cquad4_stress', 'cquad8_stress', 'ctria3_stress', 'ctria6_stress',
'cbar_stress', 'cbeam_stress', 'crod_stress', 'ctube_stress',
]
for attr in stress_attrs:
if hasattr(op2, attr) and getattr(op2, attr):
stress_dict = getattr(op2, attr)
elem_type = attr.replace('_stress', '').upper()
data["results"]["stress"]["available"] = True
data["results"]["stress"]["element_types"].append(elem_type)
for sc in stress_dict.keys():
subcases.add(sc)
if sc not in data["results"]["stress"]["subcases"]:
data["results"]["stress"]["subcases"].append(sc)
# Check strains
strain_attrs = [attr.replace('_stress', '_strain') for attr in stress_attrs]
for attr in strain_attrs:
if hasattr(op2, attr) and getattr(op2, attr):
strain_dict = getattr(op2, attr)
elem_type = attr.replace('_strain', '').upper()
data["results"]["strain"]["available"] = True
data["results"]["strain"]["element_types"].append(elem_type)
for sc in strain_dict.keys():
subcases.add(sc)
if sc not in data["results"]["strain"]["subcases"]:
data["results"]["strain"]["subcases"].append(sc)
# Check strain energy
if hasattr(op2, 'ctetra_strain_energy') or hasattr(op2, 'strain_energy'):
se_attrs = ['ctetra_strain_energy', 'chexa_strain_energy', 'strain_energy']
for attr in se_attrs:
if hasattr(op2, attr) and getattr(op2, attr):
data["results"]["strain_energy"]["available"] = True
se_dict = getattr(op2, attr)
for sc in se_dict.keys():
subcases.add(sc)
if sc not in data["results"]["strain_energy"]["subcases"]:
data["results"]["strain_energy"]["subcases"].append(sc)
# Check temperatures
if hasattr(op2, 'temperatures') and op2.temperatures:
data["results"]["temperature"]["available"] = True
for sc in op2.temperatures.keys():
subcases.add(sc)
data["results"]["temperature"]["subcases"].append(sc)
# Get mesh info from geometry tables
if hasattr(op2, 'nodes') and op2.nodes:
data["mesh"]["node_count"] = len(op2.nodes)
# Count elements
elem_count = 0
elem_attrs = ['elements', 'ctetra', 'chexa', 'cpenta', 'cquad4', 'ctria3']
for attr in elem_attrs:
if hasattr(op2, attr):
elem_dict = getattr(op2, attr)
if elem_dict:
if isinstance(elem_dict, dict):
for etype, elems in elem_dict.items():
count = len(elems) if hasattr(elems, '__len__') else 1
data["mesh"]["element_types"][etype] = count
elem_count += count
data["mesh"]["element_count"] = elem_count
data["subcases"] = sorted(list(subcases))
# Summary of what's extractable
data["extractable"] = []
for result_type, info in data["results"].items():
if info["available"]:
data["extractable"].append(result_type)
result["success"] = True
result["data"] = data
except ImportError:
result["error"] = "pyNastran not installed. Run: pip install pyNastran"
except Exception as e:
import traceback
result["error"] = str(e)
result["traceback"] = traceback.format_exc()
return result
# =============================================================================
# Public API
# =============================================================================
def introspect_part(part_path: str) -> Dict[str, Any]:
"""
Comprehensive introspection of an NX part file.
Extracts:
- File info (name, path, modification status)
- All expressions (with values, units, types)
- Body information (solid/sheet, materials)
- Mass properties (mass, volume, centroid, inertia)
- Features (with types, suppression status)
- Attributes
- Unit system
Args:
part_path: Path to .prt file
Returns:
Dict with comprehensive part information
Example:
>>> result = introspect_part("C:/models/bracket.prt")
>>> if result["success"]:
... print(f"Expressions: {result['data']['summary']['expression_count']}")
... print(f"Mass: {result['data']['mass_properties']['mass']} kg")
"""
part_path = str(Path(part_path).resolve())
if not os.path.exists(part_path):
return {
"success": False,
"error": f"Part file not found: {part_path}",
"data": {}
}
return _run_journal(PART_INTROSPECTION_JOURNAL, part_path)
def introspect_simulation(sim_path: str) -> Dict[str, Any]:
"""
Comprehensive introspection of an NX simulation file.
Extracts:
- File info
- All solutions (with types, solvers)
- Boundary conditions (constraints, with types)
- Loads (forces, pressures, moments, etc.)
- Materials (with properties)
- Mesh info (node/element counts)
- Output requests (what will be in results)
Args:
sim_path: Path to .sim file
Returns:
Dict with comprehensive simulation information
Example:
>>> result = introspect_simulation("C:/models/bracket.sim")
>>> if result["success"]:
... print(f"Solutions: {result['data']['summary']['solution_count']}")
... print(f"Loads: {result['data']['summary']['load_count']}")
"""
sim_path = str(Path(sim_path).resolve())
if not os.path.exists(sim_path):
return {
"success": False,
"error": f"Simulation file not found: {sim_path}",
"data": {}
}
return _run_journal(SIMULATION_INTROSPECTION_JOURNAL, sim_path)
def introspect_model(model_path: str) -> Dict[str, Any]:
"""
Unified introspection that detects file type and extracts all information.
Automatically detects:
- .prt files -> Part introspection
- .sim files -> Simulation introspection
- .op2 files -> Results introspection
Args:
model_path: Path to any NX file (.prt, .sim, .op2)
Returns:
Dict with comprehensive model information
"""
model_path = str(Path(model_path).resolve())
ext = Path(model_path).suffix.lower()
if ext == '.prt':
return introspect_part(model_path)
elif ext == '.sim':
return introspect_simulation(model_path)
elif ext == '.op2':
return introspect_op2(model_path)
else:
return {
"success": False,
"error": f"Unsupported file type: {ext}. Supported: .prt, .sim, .op2",
"data": {}
}
def introspect_study(study_dir: str) -> Dict[str, Any]:
"""
Introspect an entire Atomizer study directory.
Finds and extracts information from:
- All .prt files (CAD models)
- All .sim files (simulations)
- All .op2 files (results)
- optimization_config.json (if exists)
Args:
study_dir: Path to study directory (e.g., studies/bracket_optimization/)
Returns:
Dict with comprehensive study information
"""
study_dir = Path(study_dir)
if not study_dir.exists():
return {
"success": False,
"error": f"Study directory not found: {study_dir}",
"data": {}
}
result = {
"success": True,
"error": None,
"data": {
"study_path": str(study_dir),
"parts": [],
"simulations": [],
"results": [],
"config": None,
}
}
# Find all files
setup_dir = study_dir / "1_setup"
model_dir = setup_dir / "model" if setup_dir.exists() else study_dir
results_dir = study_dir / "2_results" if (study_dir / "2_results").exists() else study_dir
# Find parts
prt_files = list(model_dir.glob("*.prt")) if model_dir.exists() else []
for prt in prt_files:
result["data"]["parts"].append({
"path": str(prt),
"name": prt.stem,
# Full introspection would be expensive, just list for now
})
# Find simulations
sim_files = list(model_dir.glob("*.sim")) if model_dir.exists() else []
for sim in sim_files:
result["data"]["simulations"].append({
"path": str(sim),
"name": sim.stem,
})
# Find OP2 results
op2_files = list(results_dir.rglob("*.op2")) if results_dir.exists() else []
for op2 in op2_files[:10]: # Limit to 10 to avoid overwhelming
result["data"]["results"].append({
"path": str(op2),
"name": op2.stem,
})
# Load config if exists
config_path = setup_dir / "optimization_config.json"
if config_path.exists():
try:
with open(config_path, 'r') as f:
result["data"]["config"] = json.load(f)
except:
pass
# Summary
result["data"]["summary"] = {
"part_count": len(result["data"]["parts"]),
"simulation_count": len(result["data"]["simulations"]),
"results_count": len(result["data"]["results"]),
"has_config": result["data"]["config"] is not None,
}
return result
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
path = sys.argv[1]
result = introspect_model(path)
print(json.dumps(result, indent=2))
else:
print("Usage: python model_introspection.py <path_to_model>")
print("Supported: .prt, .sim, .op2 files")