- Use optimization_engine.nx.updater.NXParameterUpdater for expression updates (.exp import method) - Use optimization_engine.nx.solver.NXSolver for journal-based solving (run_journal.exe) - Use optimization_engine.extractors for displacement, stress, and mass extraction - Displacement: extract_displacement() from pyNastran OP2 - Stress: extract_solid_stress() with cquad4 support (shell elements), kPa→MPa conversion - Mass: extract_mass_from_expression() reads from temp file written by solve journal - Support for iteration folders (HEEDS-style clean state between trials) - Proper error handling with TrialResult(success=False, error_message=...) - 600s timeout per trial (matching existing NXSolver default) - Keep stub solver and create_solver() factory working - Maintain run_doe.py interface compatibility
524 lines
20 KiB
Python
524 lines
20 KiB
Python
"""NX automation interface for Hydrotech Beam optimization.
|
||
|
||
This module uses the EXISTING Atomizer optimization engine for NX integration:
|
||
- optimization_engine.nx.updater.NXParameterUpdater (expression updates via .exp import)
|
||
- optimization_engine.nx.solver.NXSolver (journal-based solving with run_journal.exe)
|
||
- optimization_engine.extractors.* (pyNastran OP2-based result extraction)
|
||
|
||
NX Expression Names (confirmed via binary introspection — CONTEXT.md):
|
||
Design Variables:
|
||
- beam_half_core_thickness (mm, continuous)
|
||
- beam_face_thickness (mm, continuous)
|
||
- holes_diameter (mm, continuous)
|
||
- hole_count (integer, links to Pattern_p7)
|
||
Outputs:
|
||
- p173 (mass in kg, body_property147.mass)
|
||
Fixed:
|
||
- beam_lenght (⚠️ TYPO in NX — no 'h', 5000 mm)
|
||
- beam_half_height (250 mm)
|
||
- beam_half_width (150 mm)
|
||
|
||
References:
|
||
CONTEXT.md — Full expression map
|
||
OPTIMIZATION_STRATEGY.md §8.2 — Extractor requirements
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Protocol
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Add Atomizer repo root to sys.path for imports
|
||
ATOMIZER_REPO_ROOT = Path("/home/papa/repos/Atomizer")
|
||
if str(ATOMIZER_REPO_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(ATOMIZER_REPO_ROOT))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data types
|
||
# ---------------------------------------------------------------------------
|
||
@dataclass(frozen=True)
|
||
class TrialInput:
|
||
"""Design variable values for a single trial."""
|
||
|
||
beam_half_core_thickness: float # mm — DV1
|
||
beam_face_thickness: float # mm — DV2
|
||
holes_diameter: float # mm — DV3
|
||
hole_count: int # — DV4
|
||
|
||
|
||
@dataclass
|
||
class TrialResult:
|
||
"""Results extracted from NX after a trial solve.
|
||
|
||
All values populated after a successful SOL 101 solve.
|
||
On failure, success=False and error_message explains the failure.
|
||
"""
|
||
|
||
success: bool
|
||
mass: float = float("nan") # kg — from expression `p173`
|
||
tip_displacement: float = float("nan") # mm — from SOL 101 results
|
||
max_von_mises: float = float("nan") # MPa — from SOL 101 results
|
||
error_message: str = ""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# NX expression name constants
|
||
# ---------------------------------------------------------------------------
|
||
# ⚠️ These are EXACT NX expression names from binary introspection.
|
||
# Do NOT change spelling — `beam_lenght` has a typo (no 'h') in NX.
|
||
EXPR_HALF_CORE_THICKNESS = "beam_half_core_thickness"
|
||
EXPR_FACE_THICKNESS = "beam_face_thickness"
|
||
EXPR_HOLES_DIAMETER = "holes_diameter"
|
||
EXPR_HOLE_COUNT = "hole_count"
|
||
EXPR_MASS = "p173" # body_property147.mass, kg
|
||
EXPR_BEAM_LENGTH = "beam_lenght" # ⚠️ TYPO IN NX — intentional
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Interface protocol
|
||
# ---------------------------------------------------------------------------
|
||
class NXSolverInterface(Protocol):
|
||
"""Protocol for NX solver backends.
|
||
|
||
Implementors must provide the full pipeline:
|
||
1. Update expressions → 2. Rebuild model → 3. Solve SOL 101 → 4. Extract results
|
||
"""
|
||
|
||
def evaluate(self, trial_input: TrialInput) -> TrialResult:
|
||
"""Run a full NX evaluation cycle for one trial.
|
||
|
||
Args:
|
||
trial_input: Design variable values.
|
||
|
||
Returns:
|
||
TrialResult with extracted outputs or failure info.
|
||
"""
|
||
...
|
||
|
||
def close(self) -> None:
|
||
"""Clean up NX session resources.
|
||
|
||
⚠️ LAC CRITICAL: NEVER kill NX processes directly.
|
||
Use NXSessionManager.close_nx_if_allowed() only.
|
||
"""
|
||
...
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stub implementation (for development/testing without NX)
|
||
# ---------------------------------------------------------------------------
|
||
class NXStubSolver:
|
||
"""Stub NX solver for development and testing.
|
||
|
||
Returns synthetic results based on simple analytical approximations
|
||
of the beam behavior. NOT physically accurate — use only for
|
||
testing the optimization pipeline.
|
||
|
||
The stub uses rough scaling relationships:
|
||
- Mass ∝ (core + face) and inversely with hole area
|
||
- Displacement ∝ 1/I where I depends on core and face thickness
|
||
- Stress ∝ M*y/I (bending stress approximation)
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
"""Initialize stub solver."""
|
||
logger.warning(
|
||
"Using NX STUB solver — results are synthetic approximations. "
|
||
"Replace with NXOpenSolver for real evaluations."
|
||
)
|
||
|
||
def evaluate(self, trial_input: TrialInput) -> TrialResult:
|
||
"""Return synthetic results based on simplified beam mechanics.
|
||
|
||
Args:
|
||
trial_input: Design variable values.
|
||
|
||
Returns:
|
||
TrialResult with approximate values.
|
||
"""
|
||
try:
|
||
return self._compute_approximate(trial_input)
|
||
except Exception as e:
|
||
logger.error("Stub evaluation failed: %s", e)
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"Stub evaluation error: {e}",
|
||
)
|
||
|
||
def _compute_approximate(self, inp: TrialInput) -> TrialResult:
|
||
"""Simple analytical approximation of beam response.
|
||
|
||
This is a ROUGH approximation for pipeline testing only.
|
||
Real physics requires NX Nastran SOL 101.
|
||
"""
|
||
import math
|
||
|
||
# Geometry
|
||
L = 5000.0 # mm — beam length
|
||
b = 300.0 # mm — beam width (2 × beam_half_width)
|
||
h_core = inp.beam_half_core_thickness # mm — half core
|
||
t_face = inp.beam_face_thickness # mm — face thickness
|
||
d_hole = inp.holes_diameter # mm
|
||
n_holes = inp.hole_count
|
||
|
||
# Total height and section properties (simplified I-beam)
|
||
h_total = 500.0 # mm — 2 × beam_half_height (fixed)
|
||
|
||
# Approximate second moment of area (sandwich beam)
|
||
# I ≈ b*h_total^3/12 - b*(h_total-2*t_face)^3/12 + web contribution
|
||
h_inner = h_total - 2.0 * t_face
|
||
I_section = (b * h_total**3 / 12.0) - (b * max(h_inner, 0.0) ** 3 / 12.0)
|
||
|
||
# Add core contribution
|
||
I_section += 2.0 * h_core * h_total**2 / 4.0 # approximate
|
||
|
||
# Hole area reduction (mass)
|
||
hole_area = n_holes * math.pi * (d_hole / 2.0) ** 2 # mm²
|
||
|
||
# Approximate mass (steel: 7.3 g/cm³ = 7.3e-6 kg/mm³)
|
||
rho = 7.3e-6 # kg/mm³
|
||
# Gross cross-section area (very simplified)
|
||
A_gross = 2.0 * b * t_face + 2.0 * h_core * h_total
|
||
# Remove holes from web
|
||
web_thickness = 2.0 * h_core # approximate web thickness
|
||
A_holes = n_holes * math.pi * (d_hole / 2.0) ** 2
|
||
V_solid = A_gross * L
|
||
V_holes = A_holes * web_thickness
|
||
mass = rho * (V_solid - min(V_holes, V_solid * 0.8))
|
||
|
||
# Approximate tip displacement (cantilever, point load)
|
||
# δ = PL³/(3EI)
|
||
P = 10000.0 * 9.81 # 10,000 kgf → N
|
||
E = 200000.0 # MPa (steel)
|
||
if I_section > 0:
|
||
displacement = P * L**3 / (3.0 * E * I_section)
|
||
else:
|
||
displacement = 9999.0
|
||
|
||
# Approximate max bending stress
|
||
# σ = M*y/I where M = P*L, y = h_total/2
|
||
M_max = P * L # N·mm
|
||
y_max = h_total / 2.0
|
||
if I_section > 0:
|
||
stress = M_max * y_max / I_section # MPa
|
||
else:
|
||
stress = 9999.0
|
||
|
||
return TrialResult(
|
||
success=True,
|
||
mass=mass,
|
||
tip_displacement=displacement,
|
||
max_von_mises=stress,
|
||
)
|
||
|
||
def close(self) -> None:
|
||
"""No-op for stub solver."""
|
||
logger.info("Stub solver closed.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# NXOpen implementation using existing Atomizer engine
|
||
# ---------------------------------------------------------------------------
|
||
class NXOpenSolver:
|
||
"""Real NX solver using existing Atomizer optimization engine.
|
||
|
||
Uses these Atomizer components:
|
||
- optimization_engine.nx.updater.NXParameterUpdater (expression updates via .exp import)
|
||
- optimization_engine.nx.solver.NXSolver (journal-based solving with run_journal.exe)
|
||
- optimization_engine.extractors.extract_displacement.extract_displacement()
|
||
- optimization_engine.extractors.extract_von_mises_stress.extract_solid_stress()
|
||
- optimization_engine.extractors.extract_mass_from_expression.extract_mass_from_expression()
|
||
|
||
Files required in model_dir:
|
||
- Beam.prt (part file with expressions)
|
||
- Beam_sim1.sim (simulation file)
|
||
- Expected OP2 output: beam_sim1-solution_1.op2
|
||
|
||
Expression names:
|
||
- beam_half_core_thickness, beam_face_thickness, holes_diameter, hole_count
|
||
- Mass from expression: p173
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
model_dir: str | Path,
|
||
timeout: int = 600,
|
||
use_iteration_folders: bool = True,
|
||
nastran_version: str = "2412",
|
||
) -> None:
|
||
"""Initialize NXOpen solver using Atomizer engine.
|
||
|
||
Args:
|
||
model_dir: Path to directory containing Beam.prt, Beam_sim1.sim, etc.
|
||
timeout: Timeout per trial in seconds (default: 600s = 10 min)
|
||
use_iteration_folders: Use HEEDS-style iteration folders for clean state
|
||
nastran_version: NX version (e.g., "2412", "2506")
|
||
"""
|
||
self.model_dir = Path(model_dir)
|
||
self.timeout = timeout
|
||
self.use_iteration_folders = use_iteration_folders
|
||
self.nastran_version = nastran_version
|
||
|
||
if not self.model_dir.exists():
|
||
raise FileNotFoundError(f"Model directory not found: {self.model_dir}")
|
||
|
||
# Required files
|
||
self.prt_file = self.model_dir / "Beam.prt"
|
||
self.sim_file = self.model_dir / "Beam_sim1.sim"
|
||
|
||
if not self.prt_file.exists():
|
||
raise FileNotFoundError(f"Part file not found: {self.prt_file}")
|
||
if not self.sim_file.exists():
|
||
raise FileNotFoundError(f"Simulation file not found: {self.sim_file}")
|
||
|
||
# Import Atomizer components
|
||
try:
|
||
from optimization_engine.nx.updater import NXParameterUpdater
|
||
from optimization_engine.nx.solver import NXSolver
|
||
from optimization_engine.extractors.extract_displacement import extract_displacement
|
||
from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress
|
||
from optimization_engine.extractors.extract_mass_from_expression import extract_mass_from_expression
|
||
|
||
self._NXParameterUpdater = NXParameterUpdater
|
||
self._NXSolver = NXSolver
|
||
self._extract_displacement = extract_displacement
|
||
self._extract_stress = extract_solid_stress
|
||
self._extract_mass = extract_mass_from_expression
|
||
|
||
except ImportError as e:
|
||
raise ImportError(
|
||
f"Failed to import Atomizer optimization engine: {e}\n"
|
||
f"Ensure {ATOMIZER_REPO_ROOT} is accessible and contains optimization_engine/"
|
||
)
|
||
|
||
# Initialize the NX solver
|
||
self._solver = None
|
||
self._trial_counter = 0
|
||
|
||
logger.info(
|
||
"NXOpenSolver initialized with model_dir=%s, timeout=%ds",
|
||
self.model_dir,
|
||
self.timeout
|
||
)
|
||
|
||
def evaluate(self, trial_input: TrialInput) -> TrialResult:
|
||
"""Full NX evaluation pipeline using Atomizer engine.
|
||
|
||
Pipeline:
|
||
1. Initialize NX solver (if needed)
|
||
2. Create iteration folder (if using iteration folders)
|
||
3. Update expressions via NXParameterUpdater
|
||
4. Solve via NXSolver
|
||
5. Extract results via Atomizer extractors
|
||
|
||
Args:
|
||
trial_input: Design variable values.
|
||
|
||
Returns:
|
||
TrialResult with extracted outputs or failure info.
|
||
"""
|
||
self._trial_counter += 1
|
||
trial_start_time = __import__('time').time()
|
||
|
||
logger.info(f"Starting trial {self._trial_counter}")
|
||
logger.info(f" beam_half_core_thickness: {trial_input.beam_half_core_thickness} mm")
|
||
logger.info(f" beam_face_thickness: {trial_input.beam_face_thickness} mm")
|
||
logger.info(f" holes_diameter: {trial_input.holes_diameter} mm")
|
||
logger.info(f" hole_count: {trial_input.hole_count}")
|
||
|
||
try:
|
||
# Initialize solver if needed
|
||
if self._solver is None:
|
||
self._init_solver()
|
||
|
||
# Determine working directory
|
||
if self.use_iteration_folders:
|
||
# Create iteration folder with fresh model copies
|
||
iterations_dir = self.model_dir / "2_iterations"
|
||
working_dir = self._solver.create_iteration_folder(
|
||
iterations_base_dir=iterations_dir,
|
||
iteration_number=self._trial_counter,
|
||
expression_updates=self._build_expression_dict(trial_input)
|
||
)
|
||
working_prt = working_dir / "Beam.prt"
|
||
working_sim = working_dir / "Beam_sim1.sim"
|
||
else:
|
||
# Work directly in model directory
|
||
working_dir = self.model_dir
|
||
working_prt = self.prt_file
|
||
working_sim = self.sim_file
|
||
|
||
# Update expressions directly
|
||
self._update_expressions(working_prt, trial_input)
|
||
|
||
# Solve simulation
|
||
logger.info(f"Solving simulation: {working_sim.name}")
|
||
solve_result = self._solver.run_simulation(
|
||
sim_file=working_sim,
|
||
working_dir=working_dir,
|
||
cleanup=False, # Keep results for extraction
|
||
expression_updates=self._build_expression_dict(trial_input) if self.use_iteration_folders else None,
|
||
solution_name="Solution 1"
|
||
)
|
||
|
||
if not solve_result['success']:
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"NX solve failed: {'; '.join(solve_result.get('errors', ['Unknown error']))}"
|
||
)
|
||
|
||
# Extract results
|
||
op2_file = solve_result['op2_file']
|
||
if not op2_file or not op2_file.exists():
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"OP2 file not found: {op2_file}"
|
||
)
|
||
|
||
# Extract displacement (tip displacement)
|
||
try:
|
||
disp_result = self._extract_displacement(op2_file)
|
||
tip_displacement = disp_result['max_displacement'] # mm
|
||
except Exception as e:
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"Displacement extraction failed: {e}"
|
||
)
|
||
|
||
# Extract stress (max von Mises from shells - CQUAD4 elements)
|
||
try:
|
||
stress_result = self._extract_stress(
|
||
op2_file,
|
||
element_type="cquad4", # Hydrotech beam uses shell elements
|
||
convert_to_mpa=True # Convert from kPa to MPa
|
||
)
|
||
max_von_mises = stress_result['max_von_mises'] # MPa
|
||
except Exception as e:
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"Stress extraction failed: {e}"
|
||
)
|
||
|
||
# Extract mass from expression p173
|
||
try:
|
||
mass = self._extract_mass(working_prt, expression_name="p173") # kg
|
||
except Exception as e:
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"Mass extraction failed: {e}"
|
||
)
|
||
|
||
elapsed_time = __import__('time').time() - trial_start_time
|
||
logger.info(f"Trial {self._trial_counter} completed in {elapsed_time:.1f}s")
|
||
logger.info(f" mass: {mass:.6f} kg")
|
||
logger.info(f" tip_displacement: {tip_displacement:.6f} mm")
|
||
logger.info(f" max_von_mises: {max_von_mises:.3f} MPa")
|
||
|
||
return TrialResult(
|
||
success=True,
|
||
mass=mass,
|
||
tip_displacement=tip_displacement,
|
||
max_von_mises=max_von_mises,
|
||
)
|
||
|
||
except Exception as e:
|
||
elapsed_time = __import__('time').time() - trial_start_time
|
||
logger.error(f"Trial {self._trial_counter} failed after {elapsed_time:.1f}s: {e}")
|
||
return TrialResult(
|
||
success=False,
|
||
error_message=f"Trial evaluation failed: {e}"
|
||
)
|
||
|
||
def _init_solver(self) -> None:
|
||
"""Initialize the NX solver."""
|
||
logger.info("Initializing NX solver")
|
||
|
||
self._solver = self._NXSolver(
|
||
nastran_version=self.nastran_version,
|
||
timeout=self.timeout,
|
||
use_journal=True, # Always use journal mode
|
||
enable_session_management=True,
|
||
study_name="hydrotech_beam_doe",
|
||
use_iteration_folders=self.use_iteration_folders,
|
||
master_model_dir=self.model_dir if self.use_iteration_folders else None
|
||
)
|
||
|
||
def _build_expression_dict(self, trial_input: TrialInput) -> dict[str, float]:
|
||
"""Build expression dictionary for Atomizer engine."""
|
||
return {
|
||
EXPR_HALF_CORE_THICKNESS: trial_input.beam_half_core_thickness,
|
||
EXPR_FACE_THICKNESS: trial_input.beam_face_thickness,
|
||
EXPR_HOLES_DIAMETER: trial_input.holes_diameter,
|
||
EXPR_HOLE_COUNT: float(trial_input.hole_count), # NX expects float
|
||
}
|
||
|
||
def _update_expressions(self, prt_file: Path, trial_input: TrialInput) -> None:
|
||
"""Update expressions in PRT file using NXParameterUpdater."""
|
||
logger.info("Updating expressions via NXParameterUpdater")
|
||
|
||
updater = self._NXParameterUpdater(prt_file, backup=False)
|
||
expression_updates = self._build_expression_dict(trial_input)
|
||
|
||
updater.update_expressions(expression_updates, use_nx_import=True)
|
||
updater.save()
|
||
|
||
def close(self) -> None:
|
||
"""Clean up NX session resources.
|
||
|
||
⚠️ LAC CRITICAL: Uses NXSessionManager for safe shutdown.
|
||
"""
|
||
if self._solver and hasattr(self._solver, 'session_manager'):
|
||
if self._solver.session_manager:
|
||
logger.info("Closing NX session via session manager")
|
||
try:
|
||
self._solver.session_manager.cleanup_stale_locks()
|
||
except Exception as e:
|
||
logger.warning(f"Session cleanup warning: {e}")
|
||
|
||
logger.info("NXOpenSolver closed.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Factory
|
||
# ---------------------------------------------------------------------------
|
||
def create_solver(
|
||
backend: str = "stub",
|
||
model_dir: str = "",
|
||
timeout: int = 600,
|
||
use_iteration_folders: bool = True,
|
||
nastran_version: str = "2412",
|
||
) -> NXStubSolver | NXOpenSolver:
|
||
"""Create an NX solver instance.
|
||
|
||
Args:
|
||
backend: "stub" for development, "nxopen" for real NX (Windows only).
|
||
model_dir: Path to NX model directory (required for nxopen backend).
|
||
timeout: Timeout per trial in seconds (default: 600s = 10 min).
|
||
use_iteration_folders: Use HEEDS-style iteration folders for clean state.
|
||
nastran_version: NX version (e.g., "2412", "2506").
|
||
|
||
Returns:
|
||
Solver instance implementing the NXSolverInterface protocol.
|
||
|
||
Raises:
|
||
ValueError: If backend is unknown.
|
||
"""
|
||
if backend == "stub":
|
||
return NXStubSolver()
|
||
elif backend == "nxopen":
|
||
if not model_dir:
|
||
raise ValueError("model_dir required for nxopen backend")
|
||
return NXOpenSolver(
|
||
model_dir=model_dir,
|
||
timeout=timeout,
|
||
use_iteration_folders=use_iteration_folders,
|
||
nastran_version=nastran_version,
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown backend: {backend!r}. Use 'stub' or 'nxopen'.") |