refactor: Major reorganization of optimization_engine module structure

BREAKING CHANGE: Module paths have been reorganized for better maintainability.
Backwards compatibility aliases with deprecation warnings are provided.

New Structure:
- core/           - Optimization runners (runner, intelligent_optimizer, etc.)
- processors/     - Data processing
  - surrogates/   - Neural network surrogates
- nx/             - NX/Nastran integration (solver, updater, session_manager)
- study/          - Study management (creator, wizard, state, reset)
- reporting/      - Reports and analysis (visualizer, report_generator)
- config/         - Configuration management (manager, builder)
- utils/          - Utilities (logger, auto_doc, etc.)
- future/         - Research/experimental code

Migration:
- ~200 import changes across 125 files
- All __init__.py files use lazy loading to avoid circular imports
- Backwards compatibility layer supports old import paths with warnings
- All existing functionality preserved

To migrate existing code:
  OLD: from optimization_engine.nx_solver import NXSolver
  NEW: from optimization_engine.nx.solver import NXSolver

  OLD: from optimization_engine.runner import OptimizationRunner
  NEW: from optimization_engine.core.runner import OptimizationRunner

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 12:30:59 -05:00
parent 82f36689b7
commit eabcc4c3ca
120 changed files with 1127 additions and 637 deletions

View File

@@ -0,0 +1,51 @@
"""
NX Integration
==============
Siemens NX and Nastran integration modules.
Modules:
- solver: NXSolver for running simulations
- updater: NXParameterUpdater for design updates
- session_manager: NX session lifecycle management
- solve_simulation: Low-level simulation execution
"""
# Lazy imports to avoid import errors when NX modules aren't available
def __getattr__(name):
if name == 'NXSolver':
from .solver import NXSolver
return NXSolver
elif name == 'run_nx_simulation':
from .solver import run_nx_simulation
return run_nx_simulation
elif name == 'NXParameterUpdater':
from .updater import NXParameterUpdater
return NXParameterUpdater
elif name == 'update_nx_model':
from .updater import update_nx_model
return update_nx_model
elif name == 'NXSessionManager':
from .session_manager import NXSessionManager
return NXSessionManager
elif name == 'NXSessionInfo':
from .session_manager import NXSessionInfo
return NXSessionInfo
elif name == 'ModelCleanup':
from .model_cleanup import ModelCleanup
return ModelCleanup
elif name == 'cleanup_substudy':
from .model_cleanup import cleanup_substudy
return cleanup_substudy
raise AttributeError(f"module 'optimization_engine.nx' has no attribute '{name}'")
__all__ = [
'NXSolver',
'run_nx_simulation',
'NXParameterUpdater',
'update_nx_model',
'NXSessionManager',
'NXSessionInfo',
'ModelCleanup',
'cleanup_substudy',
]

View File

@@ -0,0 +1,80 @@
"""
NX Journal Script to Export Expressions to .exp File
This script exports all expressions from the work part to a .exp file.
The .exp format is NX's native expression export format and captures ALL expressions
including formulas, references, and unitless expressions.
Usage: run_journal.exe export_expressions.py <prt_file_path> <output_exp_path>
"""
import sys
import NXOpen
def main(args):
"""
Export expressions from a .prt file to .exp format.
Args:
args: Command line arguments
args[0]: .prt file path
args[1]: output .exp file path (without .exp extension)
"""
if len(args) < 2:
print("ERROR: Not enough arguments")
print("Usage: export_expressions.py <prt_file> <output_path>")
return False
prt_file_path = args[0]
output_path = args[1] # NX adds .exp automatically
print(f"[JOURNAL] Exporting expressions from: {prt_file_path}")
print(f"[JOURNAL] Output path: {output_path}.exp")
try:
theSession = NXOpen.Session.GetSession()
# Close any currently open parts
print("[JOURNAL] Closing any open parts...")
try:
partCloseResponses = [NXOpen.BasePart.CloseWholeTree]
theSession.Parts.CloseAll(partCloseResponses)
except:
pass
# Open the .prt file
print(f"[JOURNAL] Opening part file...")
basePart, partLoadStatus = theSession.Parts.OpenActiveDisplay(
prt_file_path,
NXOpen.DisplayPartOption.AllowAdditional
)
partLoadStatus.Dispose()
workPart = theSession.Parts.Work
if workPart is None:
print("[JOURNAL] ERROR: No work part loaded")
return False
# Export expressions to .exp file
print("[JOURNAL] Exporting expressions...")
workPart.Expressions.ExportToFile(
NXOpen.ExpressionCollection.ExportMode.WorkPart,
output_path,
NXOpen.ExpressionCollection.SortType.AlphaNum
)
print(f"[JOURNAL] Successfully exported expressions to: {output_path}.exp")
return True
except Exception as e:
print(f"[JOURNAL] ERROR: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == '__main__':
success = main(sys.argv[1:])
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,82 @@
"""
NX Journal: Import expressions from .exp file
Usage: run_journal.exe import_expressions.py -args <prt_file> <exp_file>
"""
import sys
import NXOpen
def main(args):
if len(args) < 2:
print("[ERROR] Usage: import_expressions.py <prt_file> <exp_file>")
sys.exit(1)
prt_file = args[0]
exp_file = args[1]
theSession = NXOpen.Session.GetSession()
# Open the part file
partLoadStatus1 = None
try:
workPart, partLoadStatus1 = theSession.Parts.OpenActiveDisplay(
prt_file,
NXOpen.DisplayPartOption.AllowAdditional
)
finally:
if partLoadStatus1:
partLoadStatus1.Dispose()
print(f"[JOURNAL] Opened part: {prt_file}")
# Import expressions from .exp file
markId1 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Import Expressions")
try:
expModified, errorMessages = workPart.Expressions.ImportFromFile(
exp_file,
NXOpen.ExpressionCollection.ImportMode.Replace
)
print(f"[JOURNAL] Imported expressions from: {exp_file}")
# expModified can be either a bool or an array depending on NX version
if isinstance(expModified, bool):
print(f"[JOURNAL] Import completed: {expModified}")
else:
print(f"[JOURNAL] Expressions modified: {len(expModified)}")
if errorMessages:
print(f"[JOURNAL] Import errors: {errorMessages}")
# Update the part to apply expression changes
markId2 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "NX update")
nErrs = theSession.UpdateManager.DoUpdate(markId2)
theSession.DeleteUndoMark(markId2, "NX update")
print(f"[JOURNAL] Part updated (errors: {nErrs})")
# Save the part
partSaveStatus = workPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue,
NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus.Dispose()
print(f"[JOURNAL] Part saved: {prt_file}")
# Close all parts to ensure changes are written to disk and not cached in memory
# This is critical so the solve journal loads the updated PRT from disk
theSession.Parts.CloseAll(NXOpen.BasePart.CloseModified.UseResponses, None)
print(f"[JOURNAL] All parts closed to release file")
except Exception as e:
print(f"[ERROR] Failed to import expressions: {e}")
sys.exit(1)
print("[JOURNAL] Expression import complete!")
if __name__ == '__main__':
main(sys.argv[1:])

View File

@@ -0,0 +1,133 @@
"""
Mesh Converter Utility
Converts Nastran BDF/OP2 files to GLTF for web visualization
"""
import json
import numpy as np
from pathlib import Path
from typing import Optional, Dict, Any
import trimesh
from pyNastran.bdf.bdf import BDF
from pyNastran.op2.op2 import OP2
def convert_study_mesh(study_dir: Path) -> Optional[Path]:
"""
Convert the mesh and results of a study to GLTF format.
Args:
study_dir: Path to the study directory
Returns:
Path to the generated GLTF file, or None if conversion failed
"""
try:
# Locate files
setup_dir = study_dir / "1_setup" / "model"
results_dir = study_dir / "2_results"
vis_dir = study_dir / "3_visualization"
vis_dir.mkdir(parents=True, exist_ok=True)
# Find BDF/DAT file
bdf_files = list(setup_dir.glob("*.dat")) + list(setup_dir.glob("*.bdf"))
if not bdf_files:
# Fallback: Generate placeholder if no BDF found
return _generate_placeholder_mesh(vis_dir)
bdf_path = bdf_files[0]
# Find OP2 file (optional)
op2_files = list(results_dir.glob("*.op2"))
op2_path = op2_files[0] if op2_files else None
# Load BDF
model = BDF()
model.read_bdf(bdf_path, xref=False)
# Extract nodes and elements
# This is a simplified extraction for shell/solid elements
# A full implementation would handle all element types
nodes = []
node_ids = []
for nid, node in model.nodes.items():
nodes.append(node.get_position())
node_ids.append(nid)
nodes = np.array(nodes)
node_map = {nid: i for i, nid in enumerate(node_ids)}
faces = []
# Process CQUAD4/CTRIA3 elements
for eid, element in model.elements.items():
if element.type == 'CQUAD4':
n = [node_map[nid] for nid in element.nodes]
faces.append([n[0], n[1], n[2]])
faces.append([n[0], n[2], n[3]])
elif element.type == 'CTRIA3':
n = [node_map[nid] for nid in element.nodes]
faces.append([n[0], n[1], n[2]])
if not faces:
# Fallback if no compatible elements found
return _generate_placeholder_mesh(vis_dir)
# Create mesh
mesh = trimesh.Trimesh(vertices=nodes, faces=faces)
# Map results if OP2 exists
if op2_path:
op2 = OP2()
op2.read_op2(op2_path)
# Example: Map displacement magnitude to vertex colors
if 1 in op2.displacements:
disp = op2.displacements[1]
# Get last timestep
t3 = disp.data[-1, :, :3] # Translation x,y,z
mag = np.linalg.norm(t3, axis=1)
# Normalize to 0-1 for coloring
if mag.max() > mag.min():
norm_mag = (mag - mag.min()) / (mag.max() - mag.min())
else:
norm_mag = np.zeros_like(mag)
# Apply colormap (simple blue-to-red)
colors = np.zeros((len(nodes), 4))
colors[:, 0] = norm_mag # R
colors[:, 2] = 1 - norm_mag # B
colors[:, 3] = 1.0 # Alpha
mesh.visual.vertex_colors = colors
# Export to GLTF
output_path = vis_dir / "model.gltf"
mesh.export(output_path)
# Save metadata
metadata = {
"node_count": len(nodes),
"element_count": len(faces),
"has_results": op2_path is not None
}
with open(vis_dir / "model.json", 'w') as f:
json.dump(metadata, f, indent=2)
return output_path
except Exception as e:
print(f"Mesh conversion error: {e}")
# Fallback on error
return _generate_placeholder_mesh(vis_dir)
def _generate_placeholder_mesh(output_dir: Path) -> Path:
"""Generate a simple box mesh for testing"""
mesh = trimesh.creation.box(extents=[10, 10, 10])
output_path = output_dir / "model.gltf"
mesh.export(output_path)
with open(output_dir / "model.json", 'w') as f:
json.dump({"placeholder": True}, f)
return output_path

View File

@@ -0,0 +1,274 @@
"""
Model Cleanup System
Intelligent cleanup of trial model files to save disk space.
Keeps top-N trials based on objective value, deletes CAD/FEM files for poor trials.
Strategy:
- Preserve ALL trial results.json files (small, contain critical data)
- Delete large CAD/FEM files (.prt, .sim, .fem, .op2, .f06) for non-top-N trials
- Keep best trial models + user-specified number of top trials
"""
from pathlib import Path
from typing import Dict, List, Optional
import json
import shutil
class ModelCleanup:
"""
Clean up trial directories to save disk space.
Deletes large model files (.prt, .sim, .fem, .op2, .f06) from trials
that are not in the top-N performers.
"""
# File extensions to delete (large CAD/FEM/result files)
CLEANUP_EXTENSIONS = {
'.prt', # NX part files
'.sim', # NX simulation files
'.fem', # FEM mesh files
'.afm', # NX assembly FEM
'.op2', # Nastran binary results
'.f06', # Nastran text results
'.dat', # Nastran input deck
'.bdf', # Nastran bulk data
'.pch', # Nastran punch file
'.log', # Nastran log
'.master', # Nastran master file
'.dball', # Nastran database
'.MASTER', # Nastran master (uppercase)
'.DBALL', # Nastran database (uppercase)
}
# Files to ALWAYS keep (small, critical data)
PRESERVE_FILES = {
'results.json',
'trial_metadata.json',
'extraction_log.txt',
}
def __init__(self, substudy_dir: Path):
"""
Initialize cleanup manager.
Args:
substudy_dir: Path to substudy directory containing trial_XXX folders
"""
self.substudy_dir = Path(substudy_dir)
self.history_file = self.substudy_dir / 'history.json'
self.cleanup_log = self.substudy_dir / 'cleanup_log.json'
def cleanup_models(
self,
keep_top_n: int = 10,
dry_run: bool = False
) -> Dict:
"""
Clean up trial model files, keeping only top-N performers.
Args:
keep_top_n: Number of best trials to keep models for
dry_run: If True, only report what would be deleted without deleting
Returns:
Dictionary with cleanup statistics
"""
if not self.history_file.exists():
raise FileNotFoundError(f"History file not found: {self.history_file}")
# Load history
with open(self.history_file, 'r') as f:
history = json.load(f)
# Sort trials by objective value (minimize)
sorted_trials = sorted(history, key=lambda x: x.get('total_objective', float('inf')))
# Identify top-N trials to keep
keep_trial_numbers = set()
for i in range(min(keep_top_n, len(sorted_trials))):
keep_trial_numbers.add(sorted_trials[i]['trial_number'])
# Cleanup statistics
stats = {
'total_trials': len(history),
'kept_trials': len(keep_trial_numbers),
'cleaned_trials': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'deleted_files': [],
'kept_trial_numbers': sorted(list(keep_trial_numbers)),
'dry_run': dry_run
}
# Process each trial directory
trial_dirs = sorted(self.substudy_dir.glob('trial_*'))
for trial_dir in trial_dirs:
if not trial_dir.is_dir():
continue
# Extract trial number from directory name
try:
trial_num = int(trial_dir.name.split('_')[-1])
except (ValueError, IndexError):
continue
# Skip if this trial should be kept
if trial_num in keep_trial_numbers:
continue
# Clean up this trial
trial_stats = self._cleanup_trial_directory(trial_dir, dry_run)
stats['files_deleted'] += trial_stats['files_deleted']
stats['space_freed_mb'] += trial_stats['space_freed_mb']
stats['deleted_files'].extend(trial_stats['deleted_files'])
if trial_stats['files_deleted'] > 0:
stats['cleaned_trials'] += 1
# Save cleanup log
if not dry_run:
with open(self.cleanup_log, 'w') as f:
json.dump(stats, f, indent=2)
return stats
def _cleanup_trial_directory(self, trial_dir: Path, dry_run: bool) -> Dict:
"""
Clean up a single trial directory.
Args:
trial_dir: Path to trial directory
dry_run: If True, don't actually delete files
Returns:
Dictionary with cleanup statistics for this trial
"""
stats = {
'files_deleted': 0,
'space_freed_mb': 0.0,
'deleted_files': []
}
for file_path in trial_dir.iterdir():
if not file_path.is_file():
continue
# Skip preserved files
if file_path.name in self.PRESERVE_FILES:
continue
# Check if file should be deleted
if file_path.suffix.lower() in self.CLEANUP_EXTENSIONS:
file_size_mb = file_path.stat().st_size / (1024 * 1024)
stats['files_deleted'] += 1
stats['space_freed_mb'] += file_size_mb
stats['deleted_files'].append(str(file_path.relative_to(self.substudy_dir)))
# Delete file (unless dry run)
if not dry_run:
try:
file_path.unlink()
except Exception as e:
print(f"Warning: Could not delete {file_path}: {e}")
return stats
def print_cleanup_report(self, stats: Dict):
"""
Print human-readable cleanup report.
Args:
stats: Cleanup statistics dictionary
"""
print("\n" + "="*70)
print("MODEL CLEANUP REPORT")
print("="*70)
if stats['dry_run']:
print("[DRY RUN - No files were actually deleted]")
print()
print(f"Total trials: {stats['total_trials']}")
print(f"Trials kept: {stats['kept_trials']}")
print(f"Trials cleaned: {stats['cleaned_trials']}")
print(f"Files deleted: {stats['files_deleted']}")
print(f"Space freed: {stats['space_freed_mb']:.2f} MB")
print()
print(f"Kept trial numbers: {stats['kept_trial_numbers']}")
print()
if stats['files_deleted'] > 0:
print("Deleted file types:")
file_types = {}
for filepath in stats['deleted_files']:
ext = Path(filepath).suffix.lower()
file_types[ext] = file_types.get(ext, 0) + 1
for ext, count in sorted(file_types.items()):
print(f" {ext:15s}: {count:4d} files")
print("="*70 + "\n")
def cleanup_substudy(
substudy_dir: Path,
keep_top_n: int = 10,
dry_run: bool = False,
verbose: bool = True
) -> Dict:
"""
Convenience function to clean up a substudy.
Args:
substudy_dir: Path to substudy directory
keep_top_n: Number of best trials to preserve models for
dry_run: If True, only report what would be deleted
verbose: If True, print cleanup report
Returns:
Cleanup statistics dictionary
"""
cleaner = ModelCleanup(substudy_dir)
stats = cleaner.cleanup_models(keep_top_n=keep_top_n, dry_run=dry_run)
if verbose:
cleaner.print_cleanup_report(stats)
return stats
if __name__ == '__main__':
import sys
import argparse
parser = argparse.ArgumentParser(
description='Clean up optimization trial model files to save disk space'
)
parser.add_argument(
'substudy_dir',
type=Path,
help='Path to substudy directory'
)
parser.add_argument(
'--keep-top-n',
type=int,
default=10,
help='Number of best trials to keep models for (default: 10)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be deleted without actually deleting'
)
args = parser.parse_args()
cleanup_substudy(
args.substudy_dir,
keep_top_n=args.keep_top_n,
dry_run=args.dry_run
)

View File

@@ -0,0 +1,442 @@
"""
NX Session Manager - Prevents conflicts when multiple optimizations run concurrently.
This module ensures that NX sessions don't interfere with each other when:
1. Multiple optimizations are running simultaneously
2. User has NX open for manual work
3. Multiple Atomizer instances are running
Key Features:
- Session detection (running NX processes)
- File locking (prevents concurrent access to same model)
- Process queuing (waits if NX is busy with another optimization)
- Batch mode isolation (uses dedicated NX instances)
"""
import psutil
import time
import os
from pathlib import Path
from typing import Optional, List
from contextlib import contextmanager
from dataclasses import dataclass
import json
# Platform-specific imports
if os.name != 'nt': # Unix/Linux/Mac
import fcntl
else: # Windows
import msvcrt
@dataclass
class NXSessionInfo:
"""Information about a running NX session."""
pid: int
name: str
cmdline: List[str]
working_dir: Optional[str]
create_time: float
class NXSessionManager:
"""
Manages NX sessions to prevent conflicts between concurrent optimizations.
Strategy:
1. Detect running NX processes
2. Use file locks to ensure exclusive model access
3. Queue optimization trials if NX is busy
4. Isolate batch mode sessions from interactive sessions
"""
def __init__(
self,
lock_dir: Optional[Path] = None,
max_concurrent_sessions: int = 1,
wait_timeout: int = 300,
verbose: bool = True
):
"""
Initialize session manager.
Args:
lock_dir: Directory for lock files (default: temp)
max_concurrent_sessions: Maximum concurrent NX optimization sessions
wait_timeout: Maximum wait time for NX to become available (seconds)
verbose: Print session management info
"""
self.lock_dir = Path(lock_dir) if lock_dir else Path.home() / ".atomizer" / "locks"
self.lock_dir.mkdir(parents=True, exist_ok=True)
self.max_concurrent = max_concurrent_sessions
self.wait_timeout = wait_timeout
self.verbose = verbose
# Track active sessions
self.session_lock_file = self.lock_dir / "nx_sessions.json"
self.global_lock_file = self.lock_dir / "nx_global.lock"
def get_running_nx_sessions(self) -> List[NXSessionInfo]:
"""
Detect all running NX processes.
Returns:
List of NX session info objects
"""
nx_sessions = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd', 'create_time']):
try:
name = proc.info['name']
# Check if this is an NX process
if name and any(nx_exe in name.lower() for nx_exe in ['ugraf.exe', 'nx.exe', 'run_journal.exe', 'nxmgr_inter.exe']):
session = NXSessionInfo(
pid=proc.info['pid'],
name=name,
cmdline=proc.info['cmdline'] or [],
working_dir=proc.info['cwd'],
create_time=proc.info['create_time']
)
nx_sessions.append(session)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return nx_sessions
def is_nx_interactive_session_running(self) -> bool:
"""
Check if user has NX open interactively (not batch mode).
Returns:
True if interactive NX session detected
"""
sessions = self.get_running_nx_sessions()
for session in sessions:
# Interactive sessions are typically ugraf.exe or nx.exe without -batch
if 'ugraf.exe' in session.name.lower() or 'nx.exe' in session.name.lower():
# Check if it's not a batch session
if '-batch' not in ' '.join(session.cmdline).lower():
return True
return False
@contextmanager
def acquire_model_lock(self, model_file: Path, study_name: str):
"""
Acquire exclusive lock for a specific model file.
This prevents two optimizations from modifying the same model simultaneously.
Args:
model_file: Path to the model file (.prt)
study_name: Name of the study (for logging)
Yields:
Lock context
Raises:
TimeoutError: If lock cannot be acquired within timeout
"""
# Create lock file for this specific model
model_hash = str(abs(hash(str(model_file))))
lock_file = self.lock_dir / f"model_{model_hash}.lock"
if self.verbose:
print(f"\n[SESSION MGR] Acquiring lock for model: {model_file.name}")
lock_fd = None
start_time = time.time()
try:
# Try to acquire lock with timeout
while True:
try:
lock_fd = open(lock_file, 'w')
# Try to acquire exclusive lock (non-blocking)
if os.name == 'nt': # Windows
import msvcrt
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
else: # Unix
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Write lock info
lock_info = {
'study_name': study_name,
'model_file': str(model_file),
'pid': os.getpid(),
'timestamp': time.time()
}
lock_fd.write(json.dumps(lock_info, indent=2))
lock_fd.flush()
if self.verbose:
print(f"[SESSION MGR] Lock acquired successfully")
break # Lock acquired!
except (IOError, OSError):
# Lock is held by another process
elapsed = time.time() - start_time
if elapsed > self.wait_timeout:
raise TimeoutError(
f"Could not acquire model lock for {model_file.name} "
f"after {self.wait_timeout}s. Another optimization may be using this model."
)
if self.verbose and elapsed % 10 == 0: # Print every 10 seconds
print(f"[SESSION MGR] Waiting for model lock... ({elapsed:.0f}s)")
time.sleep(1)
yield # Lock acquired, user code runs here
finally:
# Release lock
if lock_fd:
try:
if os.name == 'nt':
import msvcrt
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
else:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
if self.verbose:
print(f"[SESSION MGR] Lock released for model: {model_file.name}")
except Exception as e:
if self.verbose:
print(f"[SESSION MGR] Warning: Failed to release lock: {e}")
# Clean up lock file
try:
if lock_file.exists():
lock_file.unlink()
except:
pass
@contextmanager
def acquire_nx_session(self, study_name: str):
"""
Acquire permission to run an NX batch session.
This ensures we don't exceed max_concurrent_sessions and
don't interfere with interactive NX sessions.
Args:
study_name: Name of the study (for logging)
Yields:
Session context
Raises:
TimeoutError: If session cannot be acquired within timeout
"""
if self.verbose:
print(f"\n[SESSION MGR] Requesting NX batch session for study: {study_name}")
# Check for interactive NX sessions
if self.is_nx_interactive_session_running():
if self.verbose:
print(f"[SESSION MGR] WARNING: Interactive NX session detected!")
print(f"[SESSION MGR] Batch operations may conflict with user's work.")
print(f"[SESSION MGR] Recommend closing interactive NX before running optimization.")
start_time = time.time()
session_acquired = False
try:
# Wait for available session slot
while True:
active_sessions = self._count_active_sessions()
if active_sessions < self.max_concurrent:
# Register this session
self._register_session(study_name)
session_acquired = True
if self.verbose:
print(f"[SESSION MGR] NX session acquired (active: {active_sessions + 1}/{self.max_concurrent})")
break
# Check timeout
elapsed = time.time() - start_time
if elapsed > self.wait_timeout:
raise TimeoutError(
f"Could not acquire NX session after {self.wait_timeout}s. "
f"Max concurrent sessions ({self.max_concurrent}) reached."
)
if self.verbose and elapsed % 10 == 0:
print(f"[SESSION MGR] Waiting for NX session... ({elapsed:.0f}s, active: {active_sessions})")
time.sleep(2)
yield # Session acquired, user code runs here
finally:
# Unregister session
if session_acquired:
self._unregister_session(study_name)
if self.verbose:
print(f"[SESSION MGR] NX session released for study: {study_name}")
def _count_active_sessions(self) -> int:
"""Count active optimization sessions."""
if not self.session_lock_file.exists():
return 0
try:
with open(self.session_lock_file, 'r') as f:
sessions = json.load(f)
# Clean up stale sessions (processes that no longer exist)
active_sessions = []
for session in sessions:
pid = session.get('pid')
if pid and psutil.pid_exists(pid):
active_sessions.append(session)
# Update file with only active sessions
with open(self.session_lock_file, 'w') as f:
json.dump(active_sessions, f, indent=2)
return len(active_sessions)
except Exception as e:
if self.verbose:
print(f"[SESSION MGR] Warning: Failed to count sessions: {e}")
return 0
def _register_session(self, study_name: str):
"""Register a new optimization session."""
sessions = []
if self.session_lock_file.exists():
try:
with open(self.session_lock_file, 'r') as f:
sessions = json.load(f)
except:
sessions = []
# Add new session
sessions.append({
'study_name': study_name,
'pid': os.getpid(),
'start_time': time.time(),
'timestamp': time.time()
})
# Save
with open(self.session_lock_file, 'w') as f:
json.dump(sessions, f, indent=2)
def _unregister_session(self, study_name: str):
"""Unregister an optimization session."""
if not self.session_lock_file.exists():
return
try:
with open(self.session_lock_file, 'r') as f:
sessions = json.load(f)
# Remove this session
pid = os.getpid()
sessions = [s for s in sessions if s.get('pid') != pid]
# Save
with open(self.session_lock_file, 'w') as f:
json.dump(sessions, f, indent=2)
except Exception as e:
if self.verbose:
print(f"[SESSION MGR] Warning: Failed to unregister session: {e}")
def cleanup_stale_locks(self):
"""Remove lock files from crashed processes."""
if not self.lock_dir.exists():
return
cleaned = 0
for lock_file in self.lock_dir.glob("*.lock"):
try:
# Try to read lock info
with open(lock_file, 'r') as f:
lock_info = json.load(f)
pid = lock_info.get('pid')
# Check if process still exists
if pid and not psutil.pid_exists(pid):
lock_file.unlink()
cleaned += 1
if self.verbose:
print(f"[SESSION MGR] Cleaned stale lock: {lock_file.name}")
except Exception:
# If we can't read lock file, it might be corrupted - remove it
try:
lock_file.unlink()
cleaned += 1
except:
pass
if self.verbose and cleaned > 0:
print(f"[SESSION MGR] Cleaned {cleaned} stale lock file(s)")
def get_status_report(self) -> str:
"""Generate status report of NX sessions and locks."""
report = "\n" + "="*70 + "\n"
report += " NX SESSION MANAGER STATUS\n"
report += "="*70 + "\n"
# Running NX sessions
nx_sessions = self.get_running_nx_sessions()
report += f"\n Running NX Processes: {len(nx_sessions)}\n"
for session in nx_sessions:
report += f" PID {session.pid}: {session.name}\n"
if session.working_dir:
report += f" Working dir: {session.working_dir}\n"
# Interactive session warning
if self.is_nx_interactive_session_running():
report += f"\n WARNING: Interactive NX session detected!\n"
report += f" Batch operations may conflict with user's work.\n"
# Active optimization sessions
active_count = self._count_active_sessions()
report += f"\n Active Optimization Sessions: {active_count}/{self.max_concurrent}\n"
if self.session_lock_file.exists():
try:
with open(self.session_lock_file, 'r') as f:
sessions = json.load(f)
for session in sessions:
study = session.get('study_name', 'Unknown')
pid = session.get('pid', 'Unknown')
report += f" {study} (PID {pid})\n"
except:
pass
# Lock files
lock_files = list(self.lock_dir.glob("*.lock"))
report += f"\n Active Lock Files: {len(lock_files)}\n"
report += "="*70 + "\n"
return report

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,116 @@
"""
Simple NX Journal Script to Just Solve Simulation
This is a simplified version that just opens and solves the simulation
without trying to update linked parts (for simple models).
Usage: run_journal.exe solve_simulation_simple.py <sim_file_path>
"""
import sys
import NXOpen
import NXOpen.CAE
def main(args):
"""
Open and solve a simulation file without updates.
Args:
args: Command line arguments
args[0]: .sim file path
"""
if len(args) < 1:
print("ERROR: No .sim file path provided")
return False
sim_file_path = args[0]
print(f"[JOURNAL] Opening simulation: {sim_file_path}")
try:
theSession = NXOpen.Session.GetSession()
# Set load options to load linked parts from directory
print("[JOURNAL] Setting load options for linked parts...")
import os
working_dir = os.path.dirname(os.path.abspath(sim_file_path))
# Complete load options setup
theSession.Parts.LoadOptions.LoadLatest = False
theSession.Parts.LoadOptions.ComponentLoadMethod = NXOpen.LoadOptions.LoadMethod.FromDirectory
searchDirectories = [working_dir]
searchSubDirs = [True]
theSession.Parts.LoadOptions.SetSearchDirectories(searchDirectories, searchSubDirs)
theSession.Parts.LoadOptions.ComponentsToLoad = NXOpen.LoadOptions.LoadComponents.All
theSession.Parts.LoadOptions.PartLoadOption = NXOpen.LoadOptions.LoadOption.FullyLoad
theSession.Parts.LoadOptions.SetInterpartData(True, NXOpen.LoadOptions.Parent.All)
theSession.Parts.LoadOptions.AllowSubstitution = False
theSession.Parts.LoadOptions.GenerateMissingPartFamilyMembers = True
theSession.Parts.LoadOptions.AbortOnFailure = False
referenceSets = ["As Saved", "Use Simplified", "Use Model", "Entire Part", "Empty"]
theSession.Parts.LoadOptions.SetDefaultReferenceSets(referenceSets)
theSession.Parts.LoadOptions.ReferenceSetOverride = False
print(f"[JOURNAL] Load directory set to: {working_dir}")
# Close any currently open parts
print("[JOURNAL] Closing any open parts...")
try:
partCloseResponses1 = [NXOpen.BasePart.CloseWholeTree]
theSession.Parts.CloseAll(partCloseResponses1)
except:
pass
# Open the .sim file
print(f"[JOURNAL] Opening simulation...")
basePart1, partLoadStatus1 = theSession.Parts.OpenActiveDisplay(
sim_file_path,
NXOpen.DisplayPartOption.AllowAdditional
)
workSimPart = theSession.Parts.BaseWork
partLoadStatus1.Dispose()
# Switch to simulation application
theSession.ApplicationSwitchImmediate("UG_APP_SFEM")
simPart1 = workSimPart
theSession.Post.UpdateUserGroupsFromSimPart(simPart1)
# Solve the simulation directly
print("[JOURNAL] Starting solve...")
markId3 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Start")
theSession.SetUndoMarkName(markId3, "Solve Dialog")
markId5 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "Solve")
theCAESimSolveManager = NXOpen.CAE.SimSolveManager.GetSimSolveManager(theSession)
# Get the first solution from the simulation
simSimulation1 = workSimPart.FindObject("Simulation")
simSolution1 = simSimulation1.FindObject("Solution[Solution 1]")
solution_solves = [simSolution1]
print("[JOURNAL] Submitting solve...")
theCAESimSolveManager.SubmitSolves(solution_solves)
theSession.DeleteUndoMark(markId5, "Solve")
print("[JOURNAL] Solve submitted successfully!")
return True
except Exception as e:
print(f"[JOURNAL] ERROR: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == '__main__':
success = main(sys.argv[1:])
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,817 @@
"""
NX Nastran Solver Integration
Executes NX Nastran solver in batch mode for optimization loops.
Includes session management to prevent conflicts with concurrent optimizations.
"""
from pathlib import Path
from typing import Optional, Dict, Any
import subprocess
import time
import shutil
import os
from optimization_engine.nx.session_manager import NXSessionManager
class NXSolver:
"""
Wrapper for NX Nastran batch solver execution.
Supports:
- Running .sim files through NX Nastran
- Monitoring solver progress
- Detecting completion and errors
- Cleaning up temporary files
- Per-iteration model copies (HEEDS-style isolation)
"""
# Model file extensions to copy for each iteration
MODEL_EXTENSIONS = {'.prt', '.fem', '.afm', '.sim', '.exp'}
def __init__(
self,
nx_install_dir: Optional[Path] = None,
nastran_version: str = "2412",
timeout: int = 600,
use_journal: bool = True,
enable_session_management: bool = True,
study_name: str = "default_study",
use_iteration_folders: bool = False,
master_model_dir: Optional[Path] = None
):
"""
Initialize NX Solver.
Args:
nx_install_dir: Path to NX installation (auto-detected if None)
nastran_version: NX version (e.g., "2412", "2506")
timeout: Maximum solver time in seconds (default: 10 minutes)
use_journal: Use NX journal for solving (recommended for licensing)
enable_session_management: Enable session conflict prevention (default: True)
study_name: Name of the study (used for session tracking)
use_iteration_folders: Create Iter1, Iter2, etc. folders with fresh model copies
master_model_dir: Source directory for master model files (required if use_iteration_folders=True)
"""
self.nastran_version = nastran_version
self.timeout = timeout
self.use_journal = use_journal
self.study_name = study_name
self.use_iteration_folders = use_iteration_folders
self.master_model_dir = Path(master_model_dir) if master_model_dir else None
self._iteration_counter = 0
# Initialize session manager
self.session_manager = None
if enable_session_management:
self.session_manager = NXSessionManager(verbose=True)
# Clean up any stale locks from crashed processes
self.session_manager.cleanup_stale_locks()
# Auto-detect NX installation
if nx_install_dir is None:
nx_install_dir = self._find_nx_installation()
self.nx_install_dir = Path(nx_install_dir)
# Set up solver executable
if use_journal:
self.solver_exe = self._find_journal_runner()
else:
self.solver_exe = self._find_solver_executable()
if not self.solver_exe.exists():
raise FileNotFoundError(
f"NX solver/runner not found at: {self.solver_exe}\n"
f"Please check NX installation at: {self.nx_install_dir}"
)
def _find_nx_installation(self) -> Path:
"""Auto-detect NX installation directory."""
# Common installation paths
possible_paths = [
Path(f"C:/Program Files/Siemens/NX{self.nastran_version}"),
Path(f"C:/Program Files/Siemens/Simcenter3D_{self.nastran_version}"),
Path(f"C:/Program Files (x86)/Siemens/NX{self.nastran_version}"),
]
for path in possible_paths:
if path.exists():
return path
raise FileNotFoundError(
f"Could not auto-detect NX {self.nastran_version} installation.\n"
f"Checked: {[str(p) for p in possible_paths]}\n"
f"Please specify nx_install_dir manually."
)
def _find_journal_runner(self) -> Path:
"""Find the NX journal runner executable."""
# First check the provided nx_install_dir
if self.nx_install_dir:
direct_path = self.nx_install_dir / "NXBIN" / "run_journal.exe"
if direct_path.exists():
return direct_path
# Fallback: check common installation paths
possible_exes = [
Path(f"C:/Program Files/Siemens/Simcenter3D_{self.nastran_version}/NXBIN/run_journal.exe"),
Path(f"C:/Program Files/Siemens/NX{self.nastran_version}/NXBIN/run_journal.exe"),
Path(f"C:/Program Files/Siemens/DesigncenterNX{self.nastran_version}/NXBIN/run_journal.exe"),
]
for exe in possible_exes:
if exe.exists():
return exe
# Return the direct path (will error in __init__ if doesn't exist)
return self.nx_install_dir / "NXBIN" / "run_journal.exe" if self.nx_install_dir else possible_exes[0]
def _find_solver_executable(self) -> Path:
"""Find the Nastran solver executable."""
# Use NX Nastran (not Simcenter) - has different licensing
# Priority: Use NX installation, not Simcenter
possible_exes = [
self.nx_install_dir / "NXNASTRAN" / "bin" / "nastran.exe",
self.nx_install_dir / "NXNASTRAN" / "nastran.exe",
self.nx_install_dir / "bin" / "nastran.exe",
]
for exe in possible_exes:
if exe.exists():
return exe
# If not found in NX, try Simcenter as fallback
simcenter_paths = [
Path(f"C:/Program Files/Siemens/Simcenter3D_{self.nastran_version}"),
]
for simcenter_dir in simcenter_paths:
if simcenter_dir.exists():
solve_exe = simcenter_dir / "NXNASTRAN" / "bin" / "nastran.exe"
if solve_exe.exists():
return solve_exe
# Return first guess (will error in __init__ if doesn't exist)
return possible_exes[0]
def create_iteration_folder(
self,
iterations_base_dir: Path,
iteration_number: Optional[int] = None,
expression_updates: Optional[Dict[str, float]] = None
) -> Path:
"""
Create a fresh iteration folder with copies of all model files.
HEEDS-style approach: Each iteration gets its own complete copy of model files.
This ensures clean state and avoids model corruption between iterations.
Folder structure created:
iterN/
├── *.prt, *.fem, *.afm, *.sim (copied from master)
├── params.exp (generated from expression_updates)
├── *.op2, *.f06 (generated by solver)
└── results/ (for processed outputs like CSVs, HTMLs)
Args:
iterations_base_dir: Base directory for iteration folders (e.g., study/2_iterations)
iteration_number: Specific iteration number, or auto-increment if None
expression_updates: Dict of expression name -> value for params.exp file
Returns:
Path to the created iteration folder (e.g., 2_iterations/iter1)
"""
if not self.master_model_dir:
raise ValueError("master_model_dir must be set to use iteration folders")
if not self.master_model_dir.exists():
raise FileNotFoundError(f"Master model directory not found: {self.master_model_dir}")
# Auto-increment iteration number if not provided
if iteration_number is None:
self._iteration_counter += 1
iteration_number = self._iteration_counter
# Create iteration folder: iter1, iter2, etc. (lowercase per user spec)
iter_folder = iterations_base_dir / f"iter{iteration_number}"
# Clean up if folder exists from a previous failed run
if iter_folder.exists():
print(f"[NX SOLVER] Cleaning up existing iteration folder: {iter_folder}")
try:
shutil.rmtree(iter_folder)
except Exception as e:
print(f"[NX SOLVER] WARNING: Could not clean up {iter_folder}: {e}")
# Create fresh folder and results subfolder
iter_folder.mkdir(parents=True, exist_ok=True)
results_folder = iter_folder / "results"
results_folder.mkdir(exist_ok=True)
# Copy all NX model files from master
print(f"[NX SOLVER] Creating iteration folder: {iter_folder.name}")
print(f"[NX SOLVER] Copying from: {self.master_model_dir}")
copied_files = []
for ext in self.MODEL_EXTENSIONS:
for src_file in self.master_model_dir.glob(f"*{ext}"):
dst_file = iter_folder / src_file.name
try:
shutil.copy2(src_file, dst_file)
copied_files.append(src_file.name)
except Exception as e:
print(f"[NX SOLVER] WARNING: Could not copy {src_file.name}: {e}")
print(f"[NX SOLVER] Copied {len(copied_files)} model files")
# Generate params.exp file if expression updates provided
if expression_updates:
exp_file = iter_folder / "params.exp"
self._write_expression_file(exp_file, expression_updates)
print(f"[NX SOLVER] Generated params.exp with {len(expression_updates)} expressions")
print(f"[NX SOLVER] Created results/ subfolder for processed outputs")
return iter_folder
def _write_expression_file(self, exp_path: Path, expressions: Dict[str, float]):
"""
Write expressions to .exp file format for NX import.
Format: [unit]name=value
Example: [mm]whiffle_min=42.5
"""
# Default unit mapping - MUST match NX model expression units exactly
# Verified against working turbo V1 runs
UNIT_MAPPING = {
# Length parameters (mm)
'whiffle_min': 'mm',
'whiffle_triangle_closeness': 'mm',
'inner_circular_rib_dia': 'mm',
'outer_circular_rib_offset_from_outer': 'mm',
'Pocket_Radius': 'mm',
'center_thickness': 'mm',
# Lateral pivot/closeness - mm in NX model (verified from V1)
'lateral_outer_pivot': 'mm',
'lateral_inner_pivot': 'mm',
'lateral_middle_pivot': 'mm',
'lateral_closeness': 'mm',
# Rib/face thickness parameters (mm)
'rib_thickness': 'mm',
'ribs_circular_thk': 'mm',
'rib_thickness_lateral_truss': 'mm',
'mirror_face_thickness': 'mm',
# Angle parameters (Degrees) - verified from working V1 runs
'whiffle_outer_to_vertical': 'Degrees', # NX expects Degrees (verified V1)
'lateral_inner_angle': 'Degrees',
'lateral_outer_angle': 'Degrees',
'blank_backface_angle': 'Degrees',
}
with open(exp_path, 'w') as f:
for name, value in expressions.items():
# Get unit, default to mm for unknown parameters
unit = UNIT_MAPPING.get(name, 'mm')
f.write(f"[{unit}]{name}={value}\n")
def cleanup_iteration_folder(
self,
iter_folder: Path,
keep_results: bool = True
):
"""
Clean up an iteration folder after extracting results.
Args:
iter_folder: Path to iteration folder
keep_results: If True, keeps .op2 and .f06 files; if False, deletes everything
"""
if not iter_folder.exists():
return
if keep_results:
# Delete everything except result files
keep_extensions = {'.op2', '.f06', '.log'}
for file in iter_folder.iterdir():
if file.is_file() and file.suffix.lower() not in keep_extensions:
try:
file.unlink()
except Exception:
pass
print(f"[NX SOLVER] Cleaned iteration folder (kept results): {iter_folder.name}")
else:
# Delete entire folder
try:
shutil.rmtree(iter_folder)
print(f"[NX SOLVER] Deleted iteration folder: {iter_folder.name}")
except Exception as e:
print(f"[NX SOLVER] WARNING: Could not delete {iter_folder}: {e}")
def run_simulation(
self,
sim_file: Path,
working_dir: Optional[Path] = None,
cleanup: bool = True,
expression_updates: Optional[Dict[str, float]] = None,
solution_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Run NX Nastran simulation.
Args:
sim_file: Path to .sim file
working_dir: Working directory for solver (defaults to sim file dir)
cleanup: Remove intermediate files after solving
expression_updates: Dict of expression name -> value to update
(only used in journal mode)
e.g., {'tip_thickness': 22.5, 'support_angle': 35.0}
solution_name: Specific solution to solve (e.g., "Solution_Normal_Modes")
If None, solves all solutions. Only used in journal mode.
Returns:
Dictionary with:
- success: bool
- op2_file: Path to output .op2 file
- log_file: Path to .log file
- elapsed_time: Solve time in seconds
- errors: List of error messages (if any)
- solution_name: Name of the solution that was solved
"""
sim_file = Path(sim_file)
if not sim_file.exists():
raise FileNotFoundError(f"Simulation file not found: {sim_file}")
if working_dir is None:
working_dir = sim_file.parent
else:
working_dir = Path(working_dir)
# Check if we need to find/use .dat file (only in direct mode, not journal mode)
# .sim files require NX GUI, but .dat files can be run directly with Nastran
dat_file = None
if not self.use_journal and sim_file.suffix == '.sim':
# Look for corresponding .dat file (created by NX when solving)
# Pattern: Bracket_sim1.sim -> bracket_sim1-solution_1.dat
base = sim_file.stem.lower()
possible_dats = list(working_dir.glob(f"{base}-solution_*.dat"))
if possible_dats:
# Use the most recent .dat file
dat_file = max(possible_dats, key=lambda p: p.stat().st_mtime)
print(f"\n[NX SOLVER] Found .dat file: {dat_file.name}")
print(f" Using .dat instead of .sim for better compatibility")
sim_file = dat_file
# Prepare output file names
# When using journal mode with .sim files, output is named: <base>-solution_name.op2
# When using direct mode with .dat files, output is named: <base>.op2
base_name = sim_file.stem
if self.use_journal and sim_file.suffix == '.sim':
# Journal mode: determine solution-specific output name
if solution_name:
# Convert solution name to lowercase and replace spaces with underscores
# E.g., "Solution_Normal_Modes" -> "solution_normal_modes"
solution_suffix = solution_name.lower().replace(' ', '_')
output_base = f"{base_name.lower()}-{solution_suffix}"
else:
# Default to solution_1
output_base = f"{base_name.lower()}-solution_1"
else:
# Direct mode or .dat file
output_base = base_name
op2_file = working_dir / f"{output_base}.op2"
log_file = working_dir / f"{output_base}.log"
f06_file = working_dir / f"{output_base}.f06"
print(f"\n[NX SOLVER] Starting simulation...")
print(f" Input file: {sim_file.name}")
print(f" Working dir: {working_dir}")
print(f" Mode: {'Journal' if self.use_journal else 'Direct'}")
# Record timestamps of old files BEFORE solving
# We'll verify files are regenerated by checking timestamps AFTER solve
# This is more reliable than deleting (which can fail due to file locking on Windows)
old_op2_time = op2_file.stat().st_mtime if op2_file.exists() else None
old_f06_time = f06_file.stat().st_mtime if f06_file.exists() else None
old_log_time = log_file.stat().st_mtime if log_file.exists() else None
if old_op2_time:
print(f" Found existing OP2 (modified: {time.ctime(old_op2_time)})")
print(f" Will verify NX regenerates it with newer timestamp")
# Build command based on mode
if self.use_journal and sim_file.suffix == '.sim':
# Use NX journal for .sim files (handles licensing properly)
# Generate a temporary journal file with the correct sim file path
journal_template = Path(__file__).parent / "solve_simulation.py"
temp_journal = working_dir / "_temp_solve_journal.py"
# Read template and replace placeholder with actual path
with open(journal_template, 'r') as f:
journal_content = f.read()
# Create a custom journal that passes the sim file path, solution name, and expression values
# Build argv list with expression updates
argv_list = [f"r'{sim_file.absolute()}'"]
# Add solution name if provided (passed as second argument)
if solution_name:
argv_list.append(f"'{solution_name}'")
else:
argv_list.append("None")
# Add expression values if provided
# Pass all expressions as key=value pairs
if expression_updates:
for expr_name, expr_value in expression_updates.items():
argv_list.append(f"'{expr_name}={expr_value}'")
argv_str = ', '.join(argv_list)
custom_journal = f'''# Auto-generated journal for solving {sim_file.name}
import sys
sys.argv = ['', {argv_str}] # Set argv for the main function
{journal_content}
'''
with open(temp_journal, 'w') as f:
f.write(custom_journal)
cmd = [
str(self.solver_exe), # run_journal.exe
str(temp_journal.absolute()) # Use absolute path to avoid path issues
]
else:
# Direct Nastran batch command for .dat files or direct mode
# IMPORTANT: prog=bundle enables bundle licensing (required for desktop licenses)
cmd = [
str(self.solver_exe),
str(sim_file),
"prog=bundle",
"old=no",
"scratch=yes"
]
# Set up environment for Simcenter/NX
env = os.environ.copy()
# Get SPLM_LICENSE_SERVER - prefer system registry (most up-to-date) over process env
license_server = ''
# First try system-level environment (Windows registry) - this is the authoritative source
import subprocess as sp
try:
result = sp.run(
['reg', 'query', 'HKLM\\SYSTEM\\CurrentControlSet\\Control\\Session Manager\\Environment', '/v', 'SPLM_LICENSE_SERVER'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
# Parse: " SPLM_LICENSE_SERVER REG_SZ value"
for line in result.stdout.splitlines():
if 'SPLM_LICENSE_SERVER' in line:
parts = line.split('REG_SZ')
if len(parts) > 1:
license_server = parts[1].strip()
break
except Exception:
pass
# Fall back to process environment if registry query failed
if not license_server:
license_server = env.get('SPLM_LICENSE_SERVER', '')
if license_server:
env['SPLM_LICENSE_SERVER'] = license_server
print(f"[NX SOLVER] Using license server: {license_server}")
else:
env['SPLM_LICENSE_SERVER'] = '29000@localhost'
print(f"[NX SOLVER] WARNING: SPLM_LICENSE_SERVER not set, using default: {env['SPLM_LICENSE_SERVER']}")
# Add NX/Simcenter paths to environment
nx_bin = self.nx_install_dir / "NXBIN"
if nx_bin.exists():
env['PATH'] = f"{nx_bin};{env.get('PATH', '')}"
nastran_bin = self.solver_exe.parent
if nastran_bin.exists():
env['PATH'] = f"{nastran_bin};{env.get('PATH', '')}"
# Run solver
start_time = time.time()
try:
result = subprocess.run(
cmd,
cwd=str(working_dir),
capture_output=True,
text=True,
timeout=self.timeout,
env=env # Use modified environment
)
elapsed_time = time.time() - start_time
# Display journal output for debugging
if self.use_journal:
if result.stdout and result.stdout.strip():
print("[JOURNAL OUTPUT]")
for line in result.stdout.strip().split('\n'):
print(f" {line}")
# Check for journal errors
if self.use_journal and result.stderr and "error" in result.stderr.lower():
print("[JOURNAL ERRORS]")
for line in result.stderr.strip().split('\n')[:5]:
print(f" {line}")
# Wait for output files to appear AND be regenerated (journal mode runs solve in background)
if self.use_journal:
max_wait = 30 # seconds - background solves can take time
wait_start = time.time()
print("[NX SOLVER] Waiting for solve to complete...")
# Wait for files to exist AND have newer timestamps than before
while (time.time() - wait_start) < max_wait:
files_exist = f06_file.exists() and op2_file.exists()
if files_exist:
# Verify files were regenerated (newer timestamps)
new_op2_time = op2_file.stat().st_mtime
new_f06_time = f06_file.stat().st_mtime
# If no old files, or new files are newer, we're done!
if (old_op2_time is None or new_op2_time > old_op2_time) and \
(old_f06_time is None or new_f06_time > old_f06_time):
elapsed = time.time() - wait_start
print(f"[NX SOLVER] Fresh output files detected after {elapsed:.1f}s")
if old_op2_time:
print(f" OP2 regenerated: {time.ctime(old_op2_time)} -> {time.ctime(new_op2_time)}")
break
time.sleep(0.5)
if (time.time() - wait_start) % 2 < 0.5: # Print every 2 seconds
elapsed = time.time() - wait_start
print(f" Waiting for fresh results... ({elapsed:.0f}s)")
# Final check - FAIL if files weren't regenerated
op2_is_fresh = True
if op2_file.exists():
current_op2_time = op2_file.stat().st_mtime
if old_op2_time and current_op2_time <= old_op2_time:
print(f" ERROR: OP2 file was NOT regenerated! (Still has old timestamp)")
print(f" Old: {time.ctime(old_op2_time)}, Current: {time.ctime(current_op2_time)}")
print(f" The solve failed - cannot use stale results!")
op2_is_fresh = False
else:
print(f" ERROR: OP2 file does not exist!")
op2_is_fresh = False
# Check for completion - also require fresh OP2
success = self._check_solution_success(f06_file, log_file) and op2_is_fresh
errors = []
if not success:
errors = self._extract_errors(f06_file, log_file)
# Clean up intermediate files if requested
if cleanup and success:
self._cleanup_temp_files(working_dir, base_name)
# Clean up temporary journal file if it was created
temp_journal_path = working_dir / "_temp_solve_journal.py"
if temp_journal_path.exists():
try:
temp_journal_path.unlink()
except Exception:
pass
print(f"[NX SOLVER] Complete in {elapsed_time:.1f}s")
if success:
print(f"[NX SOLVER] Results: {op2_file.name}")
else:
print(f"[NX SOLVER] FAILED - check {f06_file.name}")
for error in errors:
print(f" ERROR: {error}")
return {
'success': success,
'op2_file': op2_file if op2_file.exists() else None,
'log_file': log_file if log_file.exists() else None,
'f06_file': f06_file if f06_file.exists() else None,
'elapsed_time': elapsed_time,
'errors': errors,
'return_code': result.returncode,
'solution_name': solution_name
}
except subprocess.TimeoutExpired:
elapsed_time = time.time() - start_time
print(f"[NX SOLVER] TIMEOUT after {elapsed_time:.1f}s")
return {
'success': False,
'op2_file': None,
'log_file': log_file if log_file.exists() else None,
'elapsed_time': elapsed_time,
'errors': [f'Solver timeout after {self.timeout}s'],
'return_code': -1,
'solution_name': solution_name
}
except Exception as e:
elapsed_time = time.time() - start_time
print(f"[NX SOLVER] ERROR: {e}")
return {
'success': False,
'op2_file': None,
'log_file': None,
'elapsed_time': elapsed_time,
'errors': [str(e)],
'return_code': -1,
'solution_name': solution_name
}
def _check_solution_success(self, f06_file: Path, log_file: Path) -> bool:
"""
Check if solution completed successfully.
Looks for completion markers in .f06 and .log files.
"""
# Check .f06 file for completion
if f06_file.exists():
try:
with open(f06_file, 'r', encoding='latin-1', errors='ignore') as f:
content = f.read()
# Look for successful completion markers
if 'NORMAL TERMINATION' in content or 'USER INFORMATION MESSAGE' in content:
return True
# Check for fatal errors
if 'FATAL MESSAGE' in content or 'EXECUTION TERMINATED' in content:
return False
except Exception:
pass
# Fallback: check if OP2 was created recently
op2_file = f06_file.with_suffix('.op2')
if op2_file.exists():
# If OP2 was modified within last minute, assume success
if (time.time() - op2_file.stat().st_mtime) < 60:
return True
return False
def _extract_errors(self, f06_file: Path, log_file: Path) -> list:
"""Extract error messages from output files."""
errors = []
if f06_file.exists():
try:
with open(f06_file, 'r', encoding='latin-1', errors='ignore') as f:
for line in f:
if 'FATAL' in line or 'ERROR' in line:
errors.append(line.strip())
except Exception:
pass
return errors[:10] # Limit to first 10 errors
def discover_model(self, sim_file: Path) -> Dict[str, Any]:
"""
Discover model information without solving.
This scans the NX simulation file and reports:
- All solutions (names, types)
- All expressions (potential design variables)
- Mesh info
- Linked geometry parts
Args:
sim_file: Path to .sim file
Returns:
Dictionary with discovered model info
"""
import json
sim_file = Path(sim_file)
if not sim_file.exists():
return {'success': False, 'error': f'Sim file not found: {sim_file}'}
# Use the discover_model journal
discover_journal = Path(__file__).parent.parent / "nx_journals" / "discover_model.py"
if not discover_journal.exists():
return {'success': False, 'error': f'Discovery journal not found: {discover_journal}'}
print(f"\n[NX SOLVER] Discovering model: {sim_file.name}")
print(f" Using journal: {discover_journal.name}")
try:
cmd = [str(self.solver_exe), str(discover_journal), '--', str(sim_file.absolute())]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60, # 1 minute timeout for discovery
cwd=str(sim_file.parent)
)
# Print stderr (debug/progress messages)
if result.stderr:
for line in result.stderr.strip().split('\n'):
print(f" {line}")
# Parse stdout as JSON
if result.stdout:
try:
discovery_result = json.loads(result.stdout)
return discovery_result
except json.JSONDecodeError as e:
return {
'success': False,
'error': f'Failed to parse discovery output: {e}',
'raw_output': result.stdout[:1000]
}
else:
return {
'success': False,
'error': 'No output from discovery journal',
'stderr': result.stderr
}
except subprocess.TimeoutExpired:
return {'success': False, 'error': 'Discovery timeout (60s)'}
except Exception as e:
return {'success': False, 'error': str(e)}
def _cleanup_temp_files(self, working_dir: Path, base_name: str):
"""Remove temporary solver files."""
# Files to keep
keep_extensions = {'.op2', '.f06', '.log'}
# Files to remove
remove_patterns = [
f"{base_name}.f04",
f"{base_name}.dat",
f"{base_name}.diag",
f"{base_name}.master",
f"{base_name}.dball",
f"{base_name}.MASTER",
f"{base_name}.DBALL",
f"{base_name}_*.png",
f"{base_name}_*.html",
]
for pattern in remove_patterns:
for file in working_dir.glob(pattern):
try:
file.unlink()
except Exception:
pass
# Convenience function for optimization loops
def run_nx_simulation(
sim_file: Path,
nastran_version: str = "2412",
timeout: int = 600,
cleanup: bool = True,
use_journal: bool = True,
expression_updates: Optional[Dict[str, float]] = None,
solution_name: Optional[str] = None
) -> Path:
"""
Convenience function to run NX simulation and return OP2 file path.
Args:
sim_file: Path to .sim file
nastran_version: NX version
timeout: Solver timeout in seconds
cleanup: Remove temp files
use_journal: Use NX journal for solving (recommended for licensing)
expression_updates: Dict of expression name -> value to update in journal
solution_name: Specific solution to solve (e.g., "Solution_Normal_Modes")
Returns:
Path to output .op2 file
Raises:
RuntimeError: If simulation fails
"""
solver = NXSolver(nastran_version=nastran_version, timeout=timeout, use_journal=use_journal)
result = solver.run_simulation(
sim_file,
cleanup=cleanup,
expression_updates=expression_updates,
solution_name=solution_name
)
if not result['success']:
error_msg = '\n'.join(result['errors']) if result['errors'] else 'Unknown error'
raise RuntimeError(f"NX simulation failed:\n{error_msg}")
if not result['op2_file'] or not result['op2_file'].exists():
raise RuntimeError("Simulation completed but OP2 file not found")
return result['op2_file']

View File

@@ -0,0 +1,534 @@
"""
NX Parameter Updater
Updates design variable values in NX .prt files.
This module can read expressions in two ways:
1. Parse .exp files (NX native export format) - RECOMMENDED, captures ALL expressions
2. Parse binary .prt files directly - fallback method, may miss some expressions
For updating values:
1. Binary .prt file modification (current implementation)
2. Future: Use NXOpen API if NX is running
The .exp format is preferred for reading because it captures:
- All expression types (formulas, references, constants)
- Unitless expressions
- Complete accuracy
"""
from pathlib import Path
from typing import Dict, List, Optional
import re
import shutil
import subprocess
from datetime import datetime
import sys
# Import centralized configuration
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import NX_RUN_JOURNAL
class NXParameterUpdater:
"""
Updates parametric expression values in NX .prt files.
NX Expression Formats:
Binary .prt format:
#(Number [mm]) tip_thickness: 20.0;
*(Number [degrees]) support_angle: 30.0;
.exp export format (RECOMMENDED for reading):
[MilliMeter]beam_length=5000
[Kilogram]mass=973.968443678471
hole_count=10
Pattern_p7=hole_count
"""
def __init__(self, prt_file_path: Path, backup: bool = True, nx_run_journal_path: Optional[Path] = None):
"""
Initialize updater for a specific .prt file.
Args:
prt_file_path: Path to NX .prt file
backup: If True, create backup before modifying
nx_run_journal_path: Path to NX run_journal.exe (for .exp export)
If None, uses default NX 2412 path
"""
self.prt_path = Path(prt_file_path)
if not self.prt_path.exists():
raise FileNotFoundError(f".prt file not found: {prt_file_path}")
self.backup_enabled = backup
self.content = None
self.text_content = None
# Default NX run_journal.exe path
if nx_run_journal_path is None:
self.nx_run_journal_path = NX_RUN_JOURNAL
else:
self.nx_run_journal_path = Path(nx_run_journal_path)
self._load_file()
def _load_file(self):
"""Load .prt file as binary."""
with open(self.prt_path, 'rb') as f:
self.content = bytearray(f.read())
# Decode as latin-1 for text operations (preserves all bytes)
self.text_content = self.content.decode('latin-1', errors='ignore')
def _create_backup(self):
"""Create timestamped backup of original file."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.prt_path.with_suffix(f'.prt.bak_{timestamp}')
shutil.copy2(self.prt_path, backup_path)
print(f"Backup created: {backup_path}")
return backup_path
def find_expressions(self) -> List[Dict[str, any]]:
"""
Find all expressions in the .prt file.
Returns:
List of dicts with name, value, units
"""
expressions = []
# Pattern for NX expressions (with optional units):
# #(Number [mm]) tip_thickness: 20.0; - with units
# *(Number [mm]) p3: 10.0; - with units
# ((Number [degrees]) support_angle: 30.0; - with units
# (Number) hole_count: 5.0; - without units (unitless)
pattern = r'[#*\(]*\((\w+)(?:\s*\[([^\]]*)\])?\)\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)'
for match in re.finditer(pattern, self.text_content):
expr_type, units, name, value = match.groups()
expressions.append({
'name': name,
'value': float(value),
'units': units if units else '', # Empty string if no units
'type': expr_type
})
return expressions
def export_expressions_to_exp(self, output_path: Optional[Path] = None) -> Path:
"""
Export expressions to .exp file using NX journal.
This is the RECOMMENDED method for reading expressions because it:
- Captures ALL expressions (formulas, references, constants)
- Includes unitless expressions
- Uses NX's native export, ensuring 100% accuracy
Args:
output_path: Path for .exp file (without .exp extension)
If None, uses temp file in same directory as .prt
Returns:
Path to the .exp file created
"""
if output_path is None:
# Create temp file in same directory
output_path = self.prt_path.with_suffix('') # Remove .prt
output_path = Path(str(output_path) + "_expressions")
# Get paths
journal_script = Path(__file__).parent / "export_expressions.py"
if not journal_script.exists():
raise FileNotFoundError(f"Export journal script not found: {journal_script}")
if not self.nx_run_journal_path.exists():
raise FileNotFoundError(f"NX run_journal.exe not found: {self.nx_run_journal_path}")
# Run NX journal to export expressions
print(f"[NX] Exporting expressions from {self.prt_path.name} to .exp format...")
# NX run_journal.exe syntax: run_journal.exe <journal-file> -args <arg1> <arg2> ...
# Build command string with proper quoting
cmd_str = f'"{self.nx_run_journal_path}" "{journal_script}" -args "{self.prt_path}" "{output_path}"'
result = subprocess.run(cmd_str, capture_output=True, text=True, shell=True)
exp_file = Path(str(output_path) + ".exp")
# NOTE: NX run_journal.exe treats sys.exit(0) as a "syntax error" even though
# it's a successful exit. We check if the file was created instead of return code.
if not exp_file.exists():
print(f"[ERROR] NX journal failed to create .exp file:")
print(result.stdout)
print(result.stderr)
raise FileNotFoundError(f"Expected .exp file not created: {exp_file}")
print(f"[OK] Expressions exported to: {exp_file}")
return exp_file
def parse_exp_file(self, exp_file_path: Path) -> Dict[str, Dict[str, any]]:
"""
Parse a .exp file and return all expressions.
.exp format examples:
[MilliMeter]beam_length=5000
[Kilogram]p173=973.968443678471
hole_count=10
Pattern_p7=hole_count
Args:
exp_file_path: Path to .exp file
Returns:
Dict mapping expression name to info dict with 'value', 'units', 'formula'
"""
expressions = {}
with open(exp_file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('//'):
continue
# Pattern: [Unit]name=value or name=value
# [MilliMeter]beam_length=5000
# hole_count=10
# Pattern_p7=hole_count (formula reference)
match = re.match(r'(?:\[([^\]]+)\])?([a-zA-Z_][a-zA-Z0-9_]*)=(.*)', line)
if match:
units, name, value_str = match.groups()
# Try to parse as number
try:
value = float(value_str)
formula = None
except ValueError:
# It's a formula/reference (e.g., "hole_count")
value = None
formula = value_str
expressions[name] = {
'value': value,
'units': units if units else '',
'formula': formula,
'type': 'Number' # All .exp expressions are Number type
}
return expressions
def get_all_expressions(self, use_exp_export: bool = True) -> Dict[str, Dict[str, any]]:
"""
Get all expressions as a dictionary.
Args:
use_exp_export: If True, uses NX .exp export (RECOMMENDED)
If False, uses binary .prt parsing (may miss expressions)
Returns:
Dict mapping expression name to info dict with 'value', 'units', 'type', 'formula'
"""
if use_exp_export:
# Use NX native .exp export (captures ALL expressions)
try:
exp_file = self.export_expressions_to_exp()
expressions = self.parse_exp_file(exp_file)
# Clean up temp file
exp_file.unlink()
return expressions
except Exception as e:
print(f"[WARNING] .exp export failed: {e}")
print("[WARNING] Falling back to binary .prt parsing...")
# Fall through to binary parsing
# Fallback: Binary .prt parsing
expressions_list = self.find_expressions()
return {
expr['name']: {
'value': expr['value'],
'units': expr['units'],
'type': expr['type'],
'formula': None # Binary .prt files don't have formulas accessible
}
for expr in expressions_list
}
def update_expression(self, name: str, new_value: float) -> bool:
"""
Update a single expression value.
Args:
name: Expression name
new_value: New value
Returns:
True if updated, False if not found
"""
# Find the expression pattern
# Match: (Type [units]) name: old_value; OR (Type) name: old_value; (unitless)
# We need to be careful to match the exact name and preserve formatting
# Pattern that captures the full expression line
# Units are optional (unitless expressions like hole_count don't have [units])
pattern = rf'([#*\(]*\(\w+(?:\s*\[[^\]]*\])?\)\s*)({re.escape(name)})\s*:\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)'
matches = list(re.finditer(pattern, self.text_content))
if not matches:
print(f"Warning: Expression '{name}' not found in .prt file")
return False
if len(matches) > 1:
print(f"Warning: Multiple matches for '{name}', updating first occurrence")
# Get the first match
match = matches[0]
prefix, expr_name, old_value = match.groups()
# Format new value (preserve decimal places if possible)
# Check if old value had decimal point
if '.' in old_value or 'e' in old_value.lower():
# Use same precision as old value
decimal_places = len(old_value.split('.')[-1]) if '.' in old_value else 2
new_value_str = f"{new_value:.{decimal_places}f}"
else:
# Integer format
new_value_str = f"{int(new_value)}"
# Build replacement string
replacement = f"{prefix}{expr_name}: {new_value_str}"
# Replace in text content
old_match = match.group(0)
self.text_content = self.text_content.replace(old_match, replacement, 1)
# Also update in binary content
old_bytes = old_match.encode('latin-1')
new_bytes = replacement.encode('latin-1')
# Find and replace in binary content
start_pos = self.content.find(old_bytes)
if start_pos != -1:
# Replace bytes
self.content[start_pos:start_pos+len(old_bytes)] = new_bytes
print(f"Updated: {name} = {old_value} -> {new_value_str}")
return True
else:
print(f"Warning: Could not update binary content for '{name}'")
return False
def update_expressions(self, updates: Dict[str, float], use_nx_import: bool = True):
"""
Update multiple expressions at once.
Args:
updates: Dict mapping expression name to new value
{'tip_thickness': 22.5, 'support_angle': 35.0}
use_nx_import: If True, uses NX journal to import .exp file (RECOMMENDED for all expressions)
If False, uses binary .prt editing (may miss some expressions)
"""
if use_nx_import:
# Use NX journal to import expressions
return self.update_expressions_via_import(updates)
# Fallback: Binary .prt editing
print(f"\nUpdating {len(updates)} expressions in {self.prt_path.name}:")
updated_count = 0
for name, value in updates.items():
if self.update_expression(name, value):
updated_count += 1
print(f"Successfully updated {updated_count}/{len(updates)} expressions")
def update_expressions_via_import(self, updates: Dict[str, float]):
"""
Update expressions by creating a .exp file and importing it via NX journal.
This method works for ALL expressions including those not stored in text format
in the binary .prt file (like hole_count).
Args:
updates: Dict mapping expression name to new value
"""
print(f"\nUpdating {len(updates)} expressions via NX .exp import:")
# Get all expressions to determine units
all_expressions = self.get_all_expressions(use_exp_export=True)
# Create .exp file with ONLY the study variables
exp_file = self.prt_path.parent / f"{self.prt_path.stem}_study_variables.exp"
with open(exp_file, 'w', encoding='utf-8') as f:
for name, value in updates.items():
if name in all_expressions:
units = all_expressions[name].get('units', '')
if units:
# Expression with units: [MilliMeter]beam_length=5000
f.write(f"[{units}]{name}={value}\n")
else:
# Unitless expression: hole_count=10
f.write(f"{name}={value}\n")
print(f" {name}: {value} {units if units else ''}")
else:
print(f" Warning: {name} not found in part expressions, skipping")
print(f"\n[EXP] Created: {exp_file}")
# Run NX journal to import expressions
journal_script = Path(__file__).parent / "import_expressions.py"
if not journal_script.exists():
raise FileNotFoundError(f"Import journal script not found: {journal_script}")
if not self.nx_run_journal_path.exists():
raise FileNotFoundError(f"NX run_journal.exe not found: {self.nx_run_journal_path}")
print(f"[NX] Importing expressions into {self.prt_path.name}...")
# Build command
cmd_str = f'"{self.nx_run_journal_path}" "{journal_script}" -args "{self.prt_path}" "{exp_file}"'
result = subprocess.run(cmd_str, capture_output=True, text=True, shell=True)
# Clean up .exp file
exp_file.unlink()
# Check if import succeeded
if result.returncode != 0 and "successfully" not in result.stdout.lower():
print(f"[ERROR] NX journal failed:")
print(result.stdout)
print(result.stderr)
raise RuntimeError(f"Expression import failed")
print(f"[OK] All {len(updates)} expressions updated successfully!")
def save(self, output_path: Path = None):
"""
Save modified .prt file.
Args:
output_path: Optional different path to save to.
If None, overwrites original (with backup if enabled)
"""
if output_path is None:
output_path = self.prt_path
if self.backup_enabled:
self._create_backup()
# Write updated binary content
with open(output_path, 'wb') as f:
f.write(self.content)
print(f"Saved to: {output_path}")
def verify_update(self, name: str, expected_value: float, tolerance: float = 1e-6) -> bool:
"""
Verify that an expression was updated correctly.
Args:
name: Expression name
expected_value: Expected value
tolerance: Acceptable difference
Returns:
True if value matches (within tolerance)
"""
expressions = self.find_expressions()
expr = next((e for e in expressions if e['name'] == name), None)
if expr is None:
print(f"Expression '{name}' not found")
return False
actual_value = expr['value']
difference = abs(actual_value - expected_value)
if difference <= tolerance:
print(f"OK Verified: {name} = {actual_value} (expected {expected_value})")
return True
else:
print(f"FAIL Verification failed: {name} = {actual_value}, expected {expected_value} (diff: {difference})")
return False
# Convenience function for optimization loop
def update_nx_model(prt_file_path: Path, design_variables: Dict[str, float], backup: bool = False):
"""
Convenience function to update NX model parameters.
Args:
prt_file_path: Path to .prt file
design_variables: Dict of parameter name -> value
backup: Whether to create backup
Example:
>>> update_nx_model(
... Path("Bracket.prt"),
... {'tip_thickness': 22.5, 'support_angle': 35.0}
... )
"""
updater = NXParameterUpdater(prt_file_path, backup=backup)
updater.update_expressions(design_variables)
updater.save()
# Example usage
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python nx_updater.py <path_to_prt_file>")
sys.exit(1)
prt_path = Path(sys.argv[1])
# Test: Find and print all expressions
print("="*60)
print("NX PARAMETER UPDATER TEST")
print("="*60)
updater = NXParameterUpdater(prt_path, backup=True)
print("\nCurrent expressions in file:")
expressions = updater.find_expressions()
for expr in expressions:
print(f" {expr['name']}: {expr['value']} {expr['units']}")
# Test update (if expressions found)
if expressions:
print("\n" + "="*60)
print("TEST UPDATE")
print("="*60)
# Update first expression
first_expr = expressions[0]
test_name = first_expr['name']
test_new_value = first_expr['value'] * 1.1 # Increase by 10%
print(f"\nUpdating {test_name} from {first_expr['value']} to {test_new_value}")
updater.update_expression(test_name, test_new_value)
# Save to test file
test_output = prt_path.with_suffix('.prt.test')
updater.save(test_output)
# Verify by re-reading
print("\n" + "="*60)
print("VERIFICATION")
print("="*60)
verifier = NXParameterUpdater(test_output, backup=False)
verifier.verify_update(test_name, test_new_value)
print(f"\nTest complete. Modified file: {test_output}")
else:
print("\nNo expressions found in file. Nothing to test.")