optimization_engine: - Updated nx_solver.py with improvements - Enhanced solve_simulation.py - Updated extractors/__init__.py - Improved NX CAD hooks (expression_manager, feature_manager, geometry_query, model_introspection, part_manager) - Enhanced NX CAE solver_manager hook Documentation: - Updated OP_01_CREATE_STUDY.md protocol - Updated SYS_12_EXTRACTOR_LIBRARY.md 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
716 lines
20 KiB
Python
716 lines
20 KiB
Python
"""
|
|
NX Feature Manager Hook
|
|
=======================
|
|
|
|
Provides Python functions to manage NX features (suppress, unsuppress, etc.).
|
|
|
|
API Reference (verified via Siemens MCP docs):
|
|
- Part.Features() -> FeatureCollection
|
|
- Feature.Suppress() -> Suppresses the feature
|
|
- Feature.Unsuppress() -> Unsuppresses the feature
|
|
- Feature.Name, Feature.IsSuppressed
|
|
- Session.UpdateManager.DoUpdate() -> Update the model
|
|
|
|
Usage:
|
|
from optimization_engine.hooks.nx_cad import feature_manager
|
|
|
|
# Get all features
|
|
result = feature_manager.get_features("C:/path/to/part.prt")
|
|
|
|
# Suppress a feature
|
|
result = feature_manager.suppress_feature("C:/path/to/part.prt", "HOLE(1)")
|
|
|
|
# Unsuppress a feature
|
|
result = feature_manager.unsuppress_feature("C:/path/to/part.prt", "HOLE(1)")
|
|
|
|
# Get feature status
|
|
result = feature_manager.get_feature_status("C:/path/to/part.prt", "HOLE(1)")
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
|
|
# Import NX path from centralized config
|
|
try:
|
|
from config import NX_BIN_DIR
|
|
NX_BIN_PATH = str(NX_BIN_DIR)
|
|
except ImportError:
|
|
NX_BIN_PATH = os.environ.get(
|
|
"NX_BIN_PATH",
|
|
r"C:\Program Files\Siemens\DesigncenterNX2512\NXBIN"
|
|
)
|
|
|
|
# Journal template for feature operations
|
|
FEATURE_OPERATIONS_JOURNAL = '''
|
|
# NX Open Python Journal - Feature Operations
|
|
# Auto-generated by Atomizer hooks
|
|
#
|
|
# Based on Siemens NX Open Python API:
|
|
# - Part.Features()
|
|
# - Feature.Suppress() / Feature.Unsuppress()
|
|
# - Feature.Name, Feature.IsSuppressed
|
|
|
|
import NXOpen
|
|
import NXOpen.Features
|
|
import json
|
|
import sys
|
|
import os
|
|
|
|
def main():
|
|
"""Execute feature operation based on command arguments."""
|
|
# Get the NX session
|
|
session = NXOpen.Session.GetSession()
|
|
|
|
# Parse arguments: operation, part_path, output_json, [extra_args...]
|
|
args = sys.argv[1:] if len(sys.argv) > 1 else []
|
|
|
|
if len(args) < 3:
|
|
raise ValueError("Usage: script.py <operation> <part_path> <output_json> [args...]")
|
|
|
|
operation = args[0]
|
|
part_path = args[1]
|
|
output_json = args[2]
|
|
extra_args = args[3:] if len(args) > 3 else []
|
|
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
try:
|
|
# Ensure part is open
|
|
part = ensure_part_open(session, part_path)
|
|
|
|
if part is None:
|
|
result["error"] = f"Failed to open part: {part_path}"
|
|
elif operation == "get_all":
|
|
result = get_all_features(part)
|
|
elif operation == "get_status":
|
|
feature_name = extra_args[0] if extra_args else None
|
|
result = get_feature_status(part, feature_name)
|
|
elif operation == "suppress":
|
|
feature_name = extra_args[0] if extra_args else None
|
|
result = suppress_feature(session, part, feature_name)
|
|
elif operation == "unsuppress":
|
|
feature_name = extra_args[0] if extra_args else None
|
|
result = unsuppress_feature(session, part, feature_name)
|
|
elif operation == "suppress_multiple":
|
|
feature_names = json.loads(extra_args[0]) if extra_args else []
|
|
result = suppress_multiple_features(session, part, feature_names)
|
|
elif operation == "unsuppress_multiple":
|
|
feature_names = json.loads(extra_args[0]) if extra_args else []
|
|
result = unsuppress_multiple_features(session, part, feature_names)
|
|
else:
|
|
result["error"] = f"Unknown operation: {operation}"
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
result["error"] = str(e)
|
|
result["traceback"] = traceback.format_exc()
|
|
|
|
# Write result to output JSON
|
|
with open(output_json, 'w') as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
return result
|
|
|
|
|
|
def ensure_part_open(session, part_path):
|
|
"""Ensure the part is open and return it."""
|
|
# Check if already open
|
|
part_path_normalized = os.path.normpath(part_path).lower()
|
|
|
|
for part in session.Parts:
|
|
if os.path.normpath(part.FullPath).lower() == part_path_normalized:
|
|
return part
|
|
|
|
# Need to open it
|
|
if not os.path.exists(part_path):
|
|
return None
|
|
|
|
try:
|
|
# Set load options for the working directory
|
|
working_dir = os.path.dirname(part_path)
|
|
session.Parts.LoadOptions.ComponentLoadMethod = NXOpen.LoadOptions.LoadMethod.FromDirectory
|
|
session.Parts.LoadOptions.SetSearchDirectories([working_dir], [True])
|
|
|
|
# Use OpenActiveDisplay instead of OpenBase for better compatibility
|
|
part, load_status = session.Parts.OpenActiveDisplay(
|
|
part_path,
|
|
NXOpen.DisplayPartOption.AllowAdditional
|
|
)
|
|
load_status.Dispose()
|
|
return part
|
|
except:
|
|
return None
|
|
|
|
|
|
def find_feature_by_name(part, feature_name):
|
|
"""Find a feature by name."""
|
|
for feature in part.Features:
|
|
if feature.Name == feature_name:
|
|
return feature
|
|
return None
|
|
|
|
|
|
def get_all_features(part):
|
|
"""Get all features from a part.
|
|
|
|
NX Open API: Part.Features()
|
|
"""
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
try:
|
|
features = []
|
|
|
|
for feature in part.Features:
|
|
try:
|
|
feature_data = {
|
|
"name": feature.Name,
|
|
"type": feature.FeatureType,
|
|
"is_suppressed": feature.IsSuppressed,
|
|
"is_internal": feature.IsInternal,
|
|
}
|
|
features.append(feature_data)
|
|
except:
|
|
# Skip features that can't be read
|
|
pass
|
|
|
|
result["success"] = True
|
|
result["data"] = {
|
|
"count": len(features),
|
|
"suppressed_count": sum(1 for f in features if f["is_suppressed"]),
|
|
"features": features
|
|
}
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def get_feature_status(part, feature_name):
|
|
"""Get status of a specific feature.
|
|
|
|
NX Open API: Feature properties
|
|
"""
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
if not feature_name:
|
|
result["error"] = "Feature name is required"
|
|
return result
|
|
|
|
try:
|
|
feature = find_feature_by_name(part, feature_name)
|
|
|
|
if feature is None:
|
|
result["error"] = f"Feature not found: {feature_name}"
|
|
return result
|
|
|
|
result["success"] = True
|
|
result["data"] = {
|
|
"name": feature.Name,
|
|
"type": feature.FeatureType,
|
|
"is_suppressed": feature.IsSuppressed,
|
|
"is_internal": feature.IsInternal,
|
|
}
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def suppress_feature(session, part, feature_name):
|
|
"""Suppress a feature.
|
|
|
|
NX Open API: Feature.Suppress()
|
|
"""
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
if not feature_name:
|
|
result["error"] = "Feature name is required"
|
|
return result
|
|
|
|
try:
|
|
feature = find_feature_by_name(part, feature_name)
|
|
|
|
if feature is None:
|
|
result["error"] = f"Feature not found: {feature_name}"
|
|
return result
|
|
|
|
if feature.IsSuppressed:
|
|
result["success"] = True
|
|
result["data"] = {
|
|
"name": feature_name,
|
|
"action": "already_suppressed",
|
|
"is_suppressed": True
|
|
}
|
|
return result
|
|
|
|
# Set undo mark
|
|
mark_id = session.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Suppress Feature")
|
|
|
|
# Suppress the feature
|
|
feature.Suppress()
|
|
|
|
# Update the model
|
|
session.UpdateManager.DoUpdate(mark_id)
|
|
|
|
result["success"] = True
|
|
result["data"] = {
|
|
"name": feature_name,
|
|
"action": "suppressed",
|
|
"is_suppressed": True
|
|
}
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def unsuppress_feature(session, part, feature_name):
|
|
"""Unsuppress a feature.
|
|
|
|
NX Open API: Feature.Unsuppress()
|
|
"""
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
if not feature_name:
|
|
result["error"] = "Feature name is required"
|
|
return result
|
|
|
|
try:
|
|
feature = find_feature_by_name(part, feature_name)
|
|
|
|
if feature is None:
|
|
result["error"] = f"Feature not found: {feature_name}"
|
|
return result
|
|
|
|
if not feature.IsSuppressed:
|
|
result["success"] = True
|
|
result["data"] = {
|
|
"name": feature_name,
|
|
"action": "already_unsuppressed",
|
|
"is_suppressed": False
|
|
}
|
|
return result
|
|
|
|
# Set undo mark
|
|
mark_id = session.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Unsuppress Feature")
|
|
|
|
# Unsuppress the feature
|
|
feature.Unsuppress()
|
|
|
|
# Update the model
|
|
session.UpdateManager.DoUpdate(mark_id)
|
|
|
|
result["success"] = True
|
|
result["data"] = {
|
|
"name": feature_name,
|
|
"action": "unsuppressed",
|
|
"is_suppressed": False
|
|
}
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def suppress_multiple_features(session, part, feature_names):
|
|
"""Suppress multiple features.
|
|
|
|
Args:
|
|
session: NX session
|
|
part: NX part
|
|
feature_names: List of feature names to suppress
|
|
"""
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
if not feature_names:
|
|
result["error"] = "No feature names provided"
|
|
return result
|
|
|
|
try:
|
|
# Set undo mark for all changes
|
|
mark_id = session.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Suppress Multiple Features")
|
|
|
|
suppressed = []
|
|
errors = []
|
|
|
|
for feature_name in feature_names:
|
|
feature = find_feature_by_name(part, feature_name)
|
|
|
|
if feature is None:
|
|
errors.append(f"Feature not found: {feature_name}")
|
|
continue
|
|
|
|
try:
|
|
if not feature.IsSuppressed:
|
|
feature.Suppress()
|
|
suppressed.append(feature_name)
|
|
except Exception as e:
|
|
errors.append(f"Failed to suppress {feature_name}: {str(e)}")
|
|
|
|
# Update the model
|
|
session.UpdateManager.DoUpdate(mark_id)
|
|
|
|
result["success"] = len(errors) == 0
|
|
result["data"] = {
|
|
"suppressed": suppressed,
|
|
"errors": errors,
|
|
"suppressed_count": len(suppressed),
|
|
"error_count": len(errors),
|
|
}
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def unsuppress_multiple_features(session, part, feature_names):
|
|
"""Unsuppress multiple features.
|
|
|
|
Args:
|
|
session: NX session
|
|
part: NX part
|
|
feature_names: List of feature names to unsuppress
|
|
"""
|
|
result = {"success": False, "error": None, "data": {}}
|
|
|
|
if not feature_names:
|
|
result["error"] = "No feature names provided"
|
|
return result
|
|
|
|
try:
|
|
# Set undo mark for all changes
|
|
mark_id = session.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Unsuppress Multiple Features")
|
|
|
|
unsuppressed = []
|
|
errors = []
|
|
|
|
for feature_name in feature_names:
|
|
feature = find_feature_by_name(part, feature_name)
|
|
|
|
if feature is None:
|
|
errors.append(f"Feature not found: {feature_name}")
|
|
continue
|
|
|
|
try:
|
|
if feature.IsSuppressed:
|
|
feature.Unsuppress()
|
|
unsuppressed.append(feature_name)
|
|
except Exception as e:
|
|
errors.append(f"Failed to unsuppress {feature_name}: {str(e)}")
|
|
|
|
# Update the model
|
|
session.UpdateManager.DoUpdate(mark_id)
|
|
|
|
result["success"] = len(errors) == 0
|
|
result["data"] = {
|
|
"unsuppressed": unsuppressed,
|
|
"errors": errors,
|
|
"unsuppressed_count": len(unsuppressed),
|
|
"error_count": len(errors),
|
|
}
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
'''
|
|
|
|
|
|
def _get_run_journal_exe() -> str:
|
|
"""Get the path to run_journal.exe."""
|
|
return os.path.join(NX_BIN_PATH, "run_journal.exe")
|
|
|
|
|
|
def _run_journal(journal_path: str, args: list) -> Tuple[bool, str]:
|
|
"""Run an NX journal with arguments.
|
|
|
|
Returns:
|
|
Tuple of (success, output_or_error)
|
|
"""
|
|
run_journal = _get_run_journal_exe()
|
|
|
|
if not os.path.exists(run_journal):
|
|
return False, f"run_journal.exe not found at {run_journal}"
|
|
|
|
cmd = [run_journal, journal_path, "-args"] + args
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120 # 2 minute timeout
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return False, f"Journal execution failed: {result.stderr}"
|
|
|
|
return True, result.stdout
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return False, "Journal execution timed out"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def _execute_feature_operation(
|
|
operation: str,
|
|
part_path: str,
|
|
extra_args: list = None
|
|
) -> Dict[str, Any]:
|
|
"""Execute a feature operation via NX journal.
|
|
|
|
Args:
|
|
operation: The operation to perform
|
|
part_path: Path to the part file
|
|
extra_args: Additional arguments for the operation
|
|
|
|
Returns:
|
|
Dict with operation result
|
|
"""
|
|
# Create temporary journal file
|
|
with tempfile.NamedTemporaryFile(
|
|
mode='w',
|
|
suffix='.py',
|
|
delete=False
|
|
) as journal_file:
|
|
journal_file.write(FEATURE_OPERATIONS_JOURNAL)
|
|
journal_path = journal_file.name
|
|
|
|
# Create temporary output file
|
|
output_file = tempfile.NamedTemporaryFile(
|
|
mode='w',
|
|
suffix='.json',
|
|
delete=False
|
|
).name
|
|
|
|
try:
|
|
# Build arguments
|
|
args = [operation, part_path, output_file]
|
|
if extra_args:
|
|
args.extend(extra_args)
|
|
|
|
# Run the journal
|
|
success, output = _run_journal(journal_path, args)
|
|
|
|
if not success:
|
|
return {"success": False, "error": output, "data": {}}
|
|
|
|
# Read the result
|
|
if os.path.exists(output_file):
|
|
with open(output_file, 'r') as f:
|
|
return json.load(f)
|
|
else:
|
|
return {"success": False, "error": "Output file not created", "data": {}}
|
|
|
|
finally:
|
|
# Cleanup temporary files
|
|
if os.path.exists(journal_path):
|
|
os.unlink(journal_path)
|
|
if os.path.exists(output_file):
|
|
os.unlink(output_file)
|
|
|
|
|
|
# =============================================================================
|
|
# Public API
|
|
# =============================================================================
|
|
|
|
def get_features(part_path: str) -> Dict[str, Any]:
|
|
"""Get all features from an NX part.
|
|
|
|
Args:
|
|
part_path: Full path to the .prt file
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- success: bool
|
|
- error: Optional error message
|
|
- data: Dict with count, suppressed_count, features list
|
|
|
|
Example:
|
|
>>> result = get_features("C:/models/bracket.prt")
|
|
>>> if result["success"]:
|
|
... for f in result["data"]["features"]:
|
|
... status = "suppressed" if f["is_suppressed"] else "active"
|
|
... print(f"{f['name']} ({f['type']}): {status}")
|
|
"""
|
|
part_path = os.path.abspath(part_path)
|
|
|
|
if not os.path.exists(part_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Part file not found: {part_path}",
|
|
"data": {}
|
|
}
|
|
|
|
return _execute_feature_operation("get_all", part_path)
|
|
|
|
|
|
def get_feature_status(part_path: str, feature_name: str) -> Dict[str, Any]:
|
|
"""Get status of a specific feature.
|
|
|
|
Args:
|
|
part_path: Full path to the .prt file
|
|
feature_name: Name of the feature
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- success: bool
|
|
- error: Optional error message
|
|
- data: Dict with name, type, is_suppressed, is_internal
|
|
|
|
Example:
|
|
>>> result = get_feature_status("C:/models/bracket.prt", "HOLE(1)")
|
|
>>> if result["success"]:
|
|
... print(f"Suppressed: {result['data']['is_suppressed']}")
|
|
"""
|
|
part_path = os.path.abspath(part_path)
|
|
|
|
if not os.path.exists(part_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Part file not found: {part_path}",
|
|
"data": {}
|
|
}
|
|
|
|
return _execute_feature_operation("get_status", part_path, [feature_name])
|
|
|
|
|
|
def suppress_feature(part_path: str, feature_name: str) -> Dict[str, Any]:
|
|
"""Suppress a feature in an NX part.
|
|
|
|
Args:
|
|
part_path: Full path to the .prt file
|
|
feature_name: Name of the feature to suppress
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- success: bool
|
|
- error: Optional error message
|
|
- data: Dict with name, action, is_suppressed
|
|
|
|
Example:
|
|
>>> result = suppress_feature("C:/models/bracket.prt", "HOLE(1)")
|
|
>>> if result["success"]:
|
|
... print(f"Feature {result['data']['action']}")
|
|
"""
|
|
part_path = os.path.abspath(part_path)
|
|
|
|
if not os.path.exists(part_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Part file not found: {part_path}",
|
|
"data": {}
|
|
}
|
|
|
|
return _execute_feature_operation("suppress", part_path, [feature_name])
|
|
|
|
|
|
def unsuppress_feature(part_path: str, feature_name: str) -> Dict[str, Any]:
|
|
"""Unsuppress a feature in an NX part.
|
|
|
|
Args:
|
|
part_path: Full path to the .prt file
|
|
feature_name: Name of the feature to unsuppress
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- success: bool
|
|
- error: Optional error message
|
|
- data: Dict with name, action, is_suppressed
|
|
|
|
Example:
|
|
>>> result = unsuppress_feature("C:/models/bracket.prt", "HOLE(1)")
|
|
>>> if result["success"]:
|
|
... print(f"Feature {result['data']['action']}")
|
|
"""
|
|
part_path = os.path.abspath(part_path)
|
|
|
|
if not os.path.exists(part_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Part file not found: {part_path}",
|
|
"data": {}
|
|
}
|
|
|
|
return _execute_feature_operation("unsuppress", part_path, [feature_name])
|
|
|
|
|
|
def suppress_features(part_path: str, feature_names: List[str]) -> Dict[str, Any]:
|
|
"""Suppress multiple features in an NX part.
|
|
|
|
Args:
|
|
part_path: Full path to the .prt file
|
|
feature_names: List of feature names to suppress
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- success: bool
|
|
- error: Optional error message
|
|
- data: Dict with suppressed list, errors list, counts
|
|
|
|
Example:
|
|
>>> result = suppress_features("C:/models/bracket.prt", ["HOLE(1)", "HOLE(2)"])
|
|
>>> if result["success"]:
|
|
... print(f"Suppressed {result['data']['suppressed_count']} features")
|
|
"""
|
|
part_path = os.path.abspath(part_path)
|
|
|
|
if not os.path.exists(part_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Part file not found: {part_path}",
|
|
"data": {}
|
|
}
|
|
|
|
# Convert list to JSON string
|
|
names_json = json.dumps(feature_names)
|
|
|
|
return _execute_feature_operation("suppress_multiple", part_path, [names_json])
|
|
|
|
|
|
def unsuppress_features(part_path: str, feature_names: List[str]) -> Dict[str, Any]:
|
|
"""Unsuppress multiple features in an NX part.
|
|
|
|
Args:
|
|
part_path: Full path to the .prt file
|
|
feature_names: List of feature names to unsuppress
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- success: bool
|
|
- error: Optional error message
|
|
- data: Dict with unsuppressed list, errors list, counts
|
|
|
|
Example:
|
|
>>> result = unsuppress_features("C:/models/bracket.prt", ["HOLE(1)", "HOLE(2)"])
|
|
>>> if result["success"]:
|
|
... print(f"Unsuppressed {result['data']['unsuppressed_count']} features")
|
|
"""
|
|
part_path = os.path.abspath(part_path)
|
|
|
|
if not os.path.exists(part_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Part file not found: {part_path}",
|
|
"data": {}
|
|
}
|
|
|
|
# Convert list to JSON string
|
|
names_json = json.dumps(feature_names)
|
|
|
|
return _execute_feature_operation("unsuppress_multiple", part_path, [names_json])
|