567 lines
16 KiB
Python
567 lines
16 KiB
Python
|
|
"""
|
||
|
|
NX Expression Manager Hook
|
||
|
|
===========================
|
||
|
|
|
||
|
|
Provides Python functions to get and set NX expressions (parameters).
|
||
|
|
|
||
|
|
API Reference (verified via Siemens MCP docs):
|
||
|
|
- Part.Expressions() -> ExpressionCollection
|
||
|
|
- ExpressionCollection.Edit(expression, value)
|
||
|
|
- Expression.Name, Expression.Value, Expression.RightHandSide
|
||
|
|
- Expression.Units.Name
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
from optimization_engine.hooks.nx_cad import expression_manager
|
||
|
|
|
||
|
|
# Get all expressions
|
||
|
|
result = expression_manager.get_expressions("C:/path/to/part.prt")
|
||
|
|
|
||
|
|
# Get specific expression
|
||
|
|
result = expression_manager.get_expression("C:/path/to/part.prt", "thickness")
|
||
|
|
|
||
|
|
# Set expression value
|
||
|
|
result = expression_manager.set_expression("C:/path/to/part.prt", "thickness", 5.0)
|
||
|
|
|
||
|
|
# Set multiple expressions
|
||
|
|
result = expression_manager.set_expressions("C:/path/to/part.prt", {
|
||
|
|
"thickness": 5.0,
|
||
|
|
"width": 10.0
|
||
|
|
})
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import json
|
||
|
|
import subprocess
|
||
|
|
import tempfile
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Optional, Dict, Any, List, Tuple, Union
|
||
|
|
|
||
|
|
# NX installation path (configurable)
|
||
|
|
NX_BIN_PATH = os.environ.get(
|
||
|
|
"NX_BIN_PATH",
|
||
|
|
r"C:\Program Files\Siemens\NX2506\NXBIN"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Journal template for expression operations
|
||
|
|
EXPRESSION_OPERATIONS_JOURNAL = '''
|
||
|
|
# NX Open Python Journal - Expression Operations
|
||
|
|
# Auto-generated by Atomizer hooks
|
||
|
|
|
||
|
|
import NXOpen
|
||
|
|
import NXOpen.UF
|
||
|
|
import json
|
||
|
|
import sys
|
||
|
|
import os
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Execute expression operation based on command arguments."""
|
||
|
|
# Get the NX session
|
||
|
|
session = NXOpen.Session.GetSession()
|
||
|
|
|
||
|
|
# Parse arguments: operation, part_path, output_json, [extra_args...]
|
||
|
|
args = sys.argv[1:] if len(sys.argv) > 1 else []
|
||
|
|
|
||
|
|
if len(args) < 3:
|
||
|
|
raise ValueError("Usage: script.py <operation> <part_path> <output_json> [args...]")
|
||
|
|
|
||
|
|
operation = args[0]
|
||
|
|
part_path = args[1]
|
||
|
|
output_json = args[2]
|
||
|
|
extra_args = args[3:] if len(args) > 3 else []
|
||
|
|
|
||
|
|
result = {"success": False, "error": None, "data": {}}
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Ensure part is open
|
||
|
|
part = ensure_part_open(session, part_path)
|
||
|
|
|
||
|
|
if part is None:
|
||
|
|
result["error"] = f"Failed to open part: {part_path}"
|
||
|
|
elif operation == "get_all":
|
||
|
|
result = get_all_expressions(part)
|
||
|
|
elif operation == "get":
|
||
|
|
expr_name = extra_args[0] if extra_args else None
|
||
|
|
result = get_expression(part, expr_name)
|
||
|
|
elif operation == "set":
|
||
|
|
expr_name = extra_args[0] if len(extra_args) > 0 else None
|
||
|
|
expr_value = extra_args[1] if len(extra_args) > 1 else None
|
||
|
|
result = set_expression(session, part, expr_name, expr_value)
|
||
|
|
elif operation == "set_multiple":
|
||
|
|
# Extra args is a JSON string with name:value pairs
|
||
|
|
expr_dict = json.loads(extra_args[0]) if extra_args else {}
|
||
|
|
result = set_multiple_expressions(session, part, expr_dict)
|
||
|
|
else:
|
||
|
|
result["error"] = f"Unknown operation: {operation}"
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
import traceback
|
||
|
|
result["error"] = str(e)
|
||
|
|
result["traceback"] = traceback.format_exc()
|
||
|
|
|
||
|
|
# Write result to output JSON
|
||
|
|
with open(output_json, 'w') as f:
|
||
|
|
json.dump(result, f, indent=2)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def ensure_part_open(session, part_path):
|
||
|
|
"""Ensure the part is open and return it."""
|
||
|
|
# Check if already open
|
||
|
|
part_path_normalized = os.path.normpath(part_path).lower()
|
||
|
|
|
||
|
|
for part in session.Parts:
|
||
|
|
if os.path.normpath(part.FullPath).lower() == part_path_normalized:
|
||
|
|
return part
|
||
|
|
|
||
|
|
# Need to open it
|
||
|
|
if not os.path.exists(part_path):
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Set load options for the working directory
|
||
|
|
working_dir = os.path.dirname(part_path)
|
||
|
|
session.Parts.LoadOptions.ComponentLoadMethod = NXOpen.LoadOptions.LoadMethod.FromDirectory
|
||
|
|
session.Parts.LoadOptions.SetSearchDirectories([working_dir], [True])
|
||
|
|
|
||
|
|
# Use OpenActiveDisplay instead of OpenBase for better compatibility
|
||
|
|
part, load_status = session.Parts.OpenActiveDisplay(
|
||
|
|
part_path,
|
||
|
|
NXOpen.DisplayPartOption.AllowAdditional
|
||
|
|
)
|
||
|
|
load_status.Dispose()
|
||
|
|
return part
|
||
|
|
except:
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def get_all_expressions(part):
|
||
|
|
"""Get all expressions from a part.
|
||
|
|
|
||
|
|
NX Open API: Part.Expressions() -> ExpressionCollection
|
||
|
|
"""
|
||
|
|
result = {"success": False, "error": None, "data": {}}
|
||
|
|
|
||
|
|
try:
|
||
|
|
expressions = {}
|
||
|
|
|
||
|
|
for expr in part.Expressions:
|
||
|
|
try:
|
||
|
|
expr_data = {
|
||
|
|
"name": expr.Name,
|
||
|
|
"value": expr.Value,
|
||
|
|
"rhs": expr.RightHandSide,
|
||
|
|
"units": expr.Units.Name if expr.Units else None,
|
||
|
|
"type": expr.Type.ToString() if hasattr(expr.Type, 'ToString') else str(expr.Type),
|
||
|
|
}
|
||
|
|
expressions[expr.Name] = expr_data
|
||
|
|
except:
|
||
|
|
# Skip expressions that can't be read
|
||
|
|
pass
|
||
|
|
|
||
|
|
result["success"] = True
|
||
|
|
result["data"] = {
|
||
|
|
"count": len(expressions),
|
||
|
|
"expressions": expressions
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
result["error"] = str(e)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def get_expression(part, expr_name):
|
||
|
|
"""Get a specific expression by name.
|
||
|
|
|
||
|
|
NX Open API: ExpressionCollection iteration, Expression properties
|
||
|
|
"""
|
||
|
|
result = {"success": False, "error": None, "data": {}}
|
||
|
|
|
||
|
|
if not expr_name:
|
||
|
|
result["error"] = "Expression name is required"
|
||
|
|
return result
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Find the expression by name
|
||
|
|
found_expr = None
|
||
|
|
for expr in part.Expressions:
|
||
|
|
if expr.Name == expr_name:
|
||
|
|
found_expr = expr
|
||
|
|
break
|
||
|
|
|
||
|
|
if found_expr is None:
|
||
|
|
result["error"] = f"Expression not found: {expr_name}"
|
||
|
|
return result
|
||
|
|
|
||
|
|
result["success"] = True
|
||
|
|
result["data"] = {
|
||
|
|
"name": found_expr.Name,
|
||
|
|
"value": found_expr.Value,
|
||
|
|
"rhs": found_expr.RightHandSide,
|
||
|
|
"units": found_expr.Units.Name if found_expr.Units else None,
|
||
|
|
"type": found_expr.Type.ToString() if hasattr(found_expr.Type, 'ToString') else str(found_expr.Type),
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
result["error"] = str(e)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def set_expression(session, part, expr_name, expr_value):
|
||
|
|
"""Set an expression value.
|
||
|
|
|
||
|
|
NX Open API: ExpressionCollection.Edit(expression, new_rhs)
|
||
|
|
"""
|
||
|
|
result = {"success": False, "error": None, "data": {}}
|
||
|
|
|
||
|
|
if not expr_name:
|
||
|
|
result["error"] = "Expression name is required"
|
||
|
|
return result
|
||
|
|
|
||
|
|
if expr_value is None:
|
||
|
|
result["error"] = "Expression value is required"
|
||
|
|
return result
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Find the expression
|
||
|
|
found_expr = None
|
||
|
|
for expr in part.Expressions:
|
||
|
|
if expr.Name == expr_name:
|
||
|
|
found_expr = expr
|
||
|
|
break
|
||
|
|
|
||
|
|
if found_expr is None:
|
||
|
|
result["error"] = f"Expression not found: {expr_name}"
|
||
|
|
return result
|
||
|
|
|
||
|
|
# Set undo mark
|
||
|
|
mark_id = session.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Edit Expression")
|
||
|
|
|
||
|
|
# Edit the expression
|
||
|
|
# The value should be a string for the RHS
|
||
|
|
new_rhs = str(expr_value)
|
||
|
|
part.Expressions.Edit(found_expr, new_rhs)
|
||
|
|
|
||
|
|
# Update the model
|
||
|
|
session.UpdateManager.DoUpdate(mark_id)
|
||
|
|
|
||
|
|
result["success"] = True
|
||
|
|
result["data"] = {
|
||
|
|
"name": expr_name,
|
||
|
|
"old_value": found_expr.Value, # Note: this might be the new value after edit
|
||
|
|
"new_rhs": new_rhs,
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
result["error"] = str(e)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def set_multiple_expressions(session, part, expr_dict):
|
||
|
|
"""Set multiple expressions at once.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
session: NX session
|
||
|
|
part: NX part
|
||
|
|
expr_dict: Dict of expression name -> value
|
||
|
|
"""
|
||
|
|
result = {"success": False, "error": None, "data": {}}
|
||
|
|
|
||
|
|
if not expr_dict:
|
||
|
|
result["error"] = "No expressions provided"
|
||
|
|
return result
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Set undo mark for all changes
|
||
|
|
mark_id = session.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Edit Multiple Expressions")
|
||
|
|
|
||
|
|
updated = []
|
||
|
|
errors = []
|
||
|
|
|
||
|
|
for expr_name, expr_value in expr_dict.items():
|
||
|
|
# Find the expression
|
||
|
|
found_expr = None
|
||
|
|
for expr in part.Expressions:
|
||
|
|
if expr.Name == expr_name:
|
||
|
|
found_expr = expr
|
||
|
|
break
|
||
|
|
|
||
|
|
if found_expr is None:
|
||
|
|
errors.append(f"Expression not found: {expr_name}")
|
||
|
|
continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Edit the expression
|
||
|
|
new_rhs = str(expr_value)
|
||
|
|
part.Expressions.Edit(found_expr, new_rhs)
|
||
|
|
updated.append({"name": expr_name, "value": expr_value})
|
||
|
|
except Exception as e:
|
||
|
|
errors.append(f"Failed to set {expr_name}: {str(e)}")
|
||
|
|
|
||
|
|
# Update the model
|
||
|
|
session.UpdateManager.DoUpdate(mark_id)
|
||
|
|
|
||
|
|
result["success"] = len(errors) == 0
|
||
|
|
result["data"] = {
|
||
|
|
"updated": updated,
|
||
|
|
"errors": errors,
|
||
|
|
"update_count": len(updated),
|
||
|
|
"error_count": len(errors),
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
result["error"] = str(e)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|
||
|
|
'''
|
||
|
|
|
||
|
|
|
||
|
|
def _get_run_journal_exe() -> str:
|
||
|
|
"""Get the path to run_journal.exe."""
|
||
|
|
return os.path.join(NX_BIN_PATH, "run_journal.exe")
|
||
|
|
|
||
|
|
|
||
|
|
def _run_journal(journal_path: str, args: list) -> Tuple[bool, str]:
|
||
|
|
"""Run an NX journal with arguments.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (success, output_or_error)
|
||
|
|
"""
|
||
|
|
run_journal = _get_run_journal_exe()
|
||
|
|
|
||
|
|
if not os.path.exists(run_journal):
|
||
|
|
return False, f"run_journal.exe not found at {run_journal}"
|
||
|
|
|
||
|
|
cmd = [run_journal, journal_path, "-args"] + args
|
||
|
|
|
||
|
|
try:
|
||
|
|
result = subprocess.run(
|
||
|
|
cmd,
|
||
|
|
capture_output=True,
|
||
|
|
text=True,
|
||
|
|
timeout=120 # 2 minute timeout
|
||
|
|
)
|
||
|
|
|
||
|
|
if result.returncode != 0:
|
||
|
|
return False, f"Journal execution failed: {result.stderr}"
|
||
|
|
|
||
|
|
return True, result.stdout
|
||
|
|
|
||
|
|
except subprocess.TimeoutExpired:
|
||
|
|
return False, "Journal execution timed out"
|
||
|
|
except Exception as e:
|
||
|
|
return False, str(e)
|
||
|
|
|
||
|
|
|
||
|
|
def _execute_expression_operation(
|
||
|
|
operation: str,
|
||
|
|
part_path: str,
|
||
|
|
extra_args: list = None
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Execute an expression operation via NX journal.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
operation: The operation to perform (get_all, get, set, set_multiple)
|
||
|
|
part_path: Path to the part file
|
||
|
|
extra_args: Additional arguments for the operation
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with operation result
|
||
|
|
"""
|
||
|
|
# Create temporary journal file
|
||
|
|
with tempfile.NamedTemporaryFile(
|
||
|
|
mode='w',
|
||
|
|
suffix='.py',
|
||
|
|
delete=False
|
||
|
|
) as journal_file:
|
||
|
|
journal_file.write(EXPRESSION_OPERATIONS_JOURNAL)
|
||
|
|
journal_path = journal_file.name
|
||
|
|
|
||
|
|
# Create temporary output file
|
||
|
|
output_file = tempfile.NamedTemporaryFile(
|
||
|
|
mode='w',
|
||
|
|
suffix='.json',
|
||
|
|
delete=False
|
||
|
|
).name
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Build arguments
|
||
|
|
args = [operation, part_path, output_file]
|
||
|
|
if extra_args:
|
||
|
|
args.extend(extra_args)
|
||
|
|
|
||
|
|
# Run the journal
|
||
|
|
success, output = _run_journal(journal_path, args)
|
||
|
|
|
||
|
|
if not success:
|
||
|
|
return {"success": False, "error": output, "data": {}}
|
||
|
|
|
||
|
|
# Read the result
|
||
|
|
if os.path.exists(output_file):
|
||
|
|
with open(output_file, 'r') as f:
|
||
|
|
return json.load(f)
|
||
|
|
else:
|
||
|
|
return {"success": False, "error": "Output file not created", "data": {}}
|
||
|
|
|
||
|
|
finally:
|
||
|
|
# Cleanup temporary files
|
||
|
|
if os.path.exists(journal_path):
|
||
|
|
os.unlink(journal_path)
|
||
|
|
if os.path.exists(output_file):
|
||
|
|
os.unlink(output_file)
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# Public API
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
def get_expressions(part_path: str) -> Dict[str, Any]:
|
||
|
|
"""Get all expressions from an NX part.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
part_path: Full path to the .prt file
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with keys:
|
||
|
|
- success: bool
|
||
|
|
- error: Optional error message
|
||
|
|
- data: Dict with count and expressions dict
|
||
|
|
Each expression has: name, value, rhs, units, type
|
||
|
|
|
||
|
|
Example:
|
||
|
|
>>> result = get_expressions("C:/models/bracket.prt")
|
||
|
|
>>> if result["success"]:
|
||
|
|
... for name, expr in result["data"]["expressions"].items():
|
||
|
|
... print(f"{name} = {expr['value']} {expr['units']}")
|
||
|
|
"""
|
||
|
|
part_path = os.path.abspath(part_path)
|
||
|
|
|
||
|
|
if not os.path.exists(part_path):
|
||
|
|
return {
|
||
|
|
"success": False,
|
||
|
|
"error": f"Part file not found: {part_path}",
|
||
|
|
"data": {}
|
||
|
|
}
|
||
|
|
|
||
|
|
return _execute_expression_operation("get_all", part_path)
|
||
|
|
|
||
|
|
|
||
|
|
def get_expression(part_path: str, expression_name: str) -> Dict[str, Any]:
|
||
|
|
"""Get a specific expression from an NX part.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
part_path: Full path to the .prt file
|
||
|
|
expression_name: Name of the expression
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with keys:
|
||
|
|
- success: bool
|
||
|
|
- error: Optional error message
|
||
|
|
- data: Dict with name, value, rhs, units, type
|
||
|
|
|
||
|
|
Example:
|
||
|
|
>>> result = get_expression("C:/models/bracket.prt", "thickness")
|
||
|
|
>>> if result["success"]:
|
||
|
|
... print(f"thickness = {result['data']['value']}")
|
||
|
|
"""
|
||
|
|
part_path = os.path.abspath(part_path)
|
||
|
|
|
||
|
|
if not os.path.exists(part_path):
|
||
|
|
return {
|
||
|
|
"success": False,
|
||
|
|
"error": f"Part file not found: {part_path}",
|
||
|
|
"data": {}
|
||
|
|
}
|
||
|
|
|
||
|
|
return _execute_expression_operation("get", part_path, [expression_name])
|
||
|
|
|
||
|
|
|
||
|
|
def set_expression(
|
||
|
|
part_path: str,
|
||
|
|
expression_name: str,
|
||
|
|
value: Union[float, int, str]
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Set an expression value in an NX part.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
part_path: Full path to the .prt file
|
||
|
|
expression_name: Name of the expression
|
||
|
|
value: New value (will be converted to string for RHS)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with keys:
|
||
|
|
- success: bool
|
||
|
|
- error: Optional error message
|
||
|
|
- data: Dict with name, old_value, new_rhs
|
||
|
|
|
||
|
|
Example:
|
||
|
|
>>> result = set_expression("C:/models/bracket.prt", "thickness", 5.0)
|
||
|
|
>>> if result["success"]:
|
||
|
|
... print("Expression updated!")
|
||
|
|
"""
|
||
|
|
part_path = os.path.abspath(part_path)
|
||
|
|
|
||
|
|
if not os.path.exists(part_path):
|
||
|
|
return {
|
||
|
|
"success": False,
|
||
|
|
"error": f"Part file not found: {part_path}",
|
||
|
|
"data": {}
|
||
|
|
}
|
||
|
|
|
||
|
|
return _execute_expression_operation(
|
||
|
|
"set",
|
||
|
|
part_path,
|
||
|
|
[expression_name, str(value)]
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def set_expressions(
|
||
|
|
part_path: str,
|
||
|
|
expressions: Dict[str, Union[float, int, str]]
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Set multiple expressions in an NX part.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
part_path: Full path to the .prt file
|
||
|
|
expressions: Dict mapping expression names to values
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with keys:
|
||
|
|
- success: bool
|
||
|
|
- error: Optional error message
|
||
|
|
- data: Dict with updated list, errors list, counts
|
||
|
|
|
||
|
|
Example:
|
||
|
|
>>> result = set_expressions("C:/models/bracket.prt", {
|
||
|
|
... "thickness": 5.0,
|
||
|
|
... "width": 10.0,
|
||
|
|
... "height": 15.0
|
||
|
|
... })
|
||
|
|
>>> if result["success"]:
|
||
|
|
... print(f"Updated {result['data']['update_count']} expressions")
|
||
|
|
"""
|
||
|
|
part_path = os.path.abspath(part_path)
|
||
|
|
|
||
|
|
if not os.path.exists(part_path):
|
||
|
|
return {
|
||
|
|
"success": False,
|
||
|
|
"error": f"Part file not found: {part_path}",
|
||
|
|
"data": {}
|
||
|
|
}
|
||
|
|
|
||
|
|
# Convert expressions dict to JSON string
|
||
|
|
expr_json = json.dumps(expressions)
|
||
|
|
|
||
|
|
return _execute_expression_operation(
|
||
|
|
"set_multiple",
|
||
|
|
part_path,
|
||
|
|
[expr_json]
|
||
|
|
)
|