"""NX automation interface for Hydrotech Beam optimization. Integrates with the existing Atomizer optimization_engine: - NXSolver for journal-based solve (run_journal.exe → solve_simulation.py) - pyNastran OP2 extractors for displacement + stress - Expression-based mass extraction via journal temp file The proven SAT3 pipeline: write .exp → NX journal updates + solves → pyNastran reads OP2. NX Expression Names (confirmed via binary introspection — CONTEXT.md): Design Variables: - beam_half_core_thickness (mm, continuous) - beam_face_thickness (mm, continuous) - holes_diameter (mm, continuous) - hole_count (integer, links to Pattern_p7) Outputs: - p173 (mass in kg, body_property147.mass) Fixed: - beam_lenght (⚠️ TYPO in NX — no 'h', 5000 mm) - beam_half_height (250 mm) - beam_half_width (150 mm) References: CONTEXT.md — Full expression map optimization_engine/nx/solver.py — NXSolver class optimization_engine/extractors/ — pyNastran-based result extractors studies/M1_Mirror/SAT3_Trajectory_V7/run_optimization.py — proven pattern """ from __future__ import annotations import logging import os import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Protocol logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- @dataclass(frozen=True) class TrialInput: """Design variable values for a single trial.""" beam_half_core_thickness: float # mm — DV1 beam_face_thickness: float # mm — DV2 holes_diameter: float # mm — DV3 hole_count: int # — DV4 @dataclass class TrialResult: """Results extracted from NX after a trial solve. All values populated after a successful SOL 101 solve. On failure, success=False and error_message explains the failure. """ success: bool mass: float = float("nan") # kg — from expression `p173` tip_displacement: float = float("nan") # mm — from SOL 101 results max_von_mises: float = float("nan") # MPa — from SOL 101 results solve_time: float = 0.0 # seconds error_message: str = "" iteration_dir: Optional[str] = None # path to iteration folder # --------------------------------------------------------------------------- # NX expression name constants # --------------------------------------------------------------------------- # ⚠️ These are EXACT NX expression names from binary introspection. # Do NOT change spelling — `beam_lenght` has a typo (no 'h') in NX. EXPR_HALF_CORE_THICKNESS = "beam_half_core_thickness" EXPR_FACE_THICKNESS = "beam_face_thickness" EXPR_HOLES_DIAMETER = "holes_diameter" EXPR_HOLE_COUNT = "hole_count" EXPR_MASS = "p173" # body_property147.mass, kg EXPR_BEAM_LENGTH = "beam_lenght" # ⚠️ TYPO IN NX — intentional # Unit mapping for .exp file generation (NXSolver._write_expression_file) UNIT_MAPPING = { EXPR_HALF_CORE_THICKNESS: "mm", EXPR_FACE_THICKNESS: "mm", EXPR_HOLES_DIAMETER: "mm", EXPR_HOLE_COUNT: "Constant", # integer — no unit } # --------------------------------------------------------------------------- # Interface protocol # --------------------------------------------------------------------------- class NXSolverInterface(Protocol): """Protocol for NX solver backends (enables stub/real swap).""" def solve(self, trial: TrialInput) -> TrialResult: ... def close(self) -> None: ... # --------------------------------------------------------------------------- # Factory # --------------------------------------------------------------------------- def create_solver(backend: str = "stub", **kwargs: Any) -> NXSolverInterface: """Create the appropriate solver backend. Args: backend: "stub" for testing, "nxopen" for real NX runs. **kwargs: Passed to solver constructor. For "nxopen": model_dir: Path to NX model files (required) nx_version: NX version string (default: "2412") timeout: Max solve time in seconds (default: 600) use_iteration_folders: HEEDS-style per-trial folders (default: True) Returns: Solver instance implementing NXSolverInterface. """ if backend == "stub": return StubSolver(**kwargs) elif backend == "nxopen": return AtomizerNXSolver(**kwargs) else: raise ValueError(f"Unknown backend: {backend!r}. Use 'stub' or 'nxopen'.") # --------------------------------------------------------------------------- # Real solver — wraps optimization_engine # --------------------------------------------------------------------------- class AtomizerNXSolver: """Production solver using Atomizer's optimization_engine. Pipeline (proven in SAT3): 1. Create iteration folder with fresh model copies 2. Write .exp file with updated expression values 3. NX journal: open .sim → import .exp → update geometry → solve SOL 101 4. Journal writes mass to _temp_mass.txt 5. pyNastran reads .op2 → extract displacement + stress 6. Return results """ def __init__( self, model_dir: str | Path = ".", nx_version: str = "2412", timeout: int = 600, use_iteration_folders: bool = True, ): model_dir = Path(model_dir) if not model_dir.exists(): raise FileNotFoundError(f"Model directory not found: {model_dir}") self.model_dir = model_dir self.nx_version = nx_version self.timeout = timeout self.use_iteration_folders = use_iteration_folders self._iteration = 0 # Set up iteration base directory self.iterations_dir = model_dir.parent / "2_iterations" self.iterations_dir.mkdir(parents=True, exist_ok=True) # Find the .sim file sim_files = list(model_dir.glob("*.sim")) if not sim_files: raise FileNotFoundError(f"No .sim file found in {model_dir}") self.sim_file = sim_files[0] logger.info("SIM file: %s", self.sim_file.name) # Find the .prt file (for mass extraction) prt_files = [f for f in model_dir.glob("*.prt") if "_i." not in f.name] if not prt_files: raise FileNotFoundError(f"No .prt file found in {model_dir}") self.prt_file = prt_files[0] logger.info("PRT file: %s", self.prt_file.name) # Add Atomizer root to path for imports atomizer_root = self._find_atomizer_root() if atomizer_root and str(atomizer_root) not in sys.path: sys.path.insert(0, str(atomizer_root)) logger.info("Added Atomizer root to path: %s", atomizer_root) # Import optimization_engine components try: from optimization_engine.nx.solver import NXSolver self._nx_solver = NXSolver( nastran_version=nx_version, timeout=timeout, use_journal=True, study_name="hydrotech_beam_doe", use_iteration_folders=use_iteration_folders, master_model_dir=model_dir, ) logger.info( "NXSolver initialized (NX %s, timeout=%ds, journal mode)", nx_version, timeout, ) except ImportError as e: raise ImportError( f"Could not import optimization_engine. " f"Ensure Atomizer repo root is on PYTHONPATH.\n" f"Error: {e}" ) from e # Import extractors try: from optimization_engine.extractors.extract_displacement import ( extract_displacement, ) from optimization_engine.extractors.extract_von_mises_stress import ( extract_solid_stress, ) from optimization_engine.extractors.extract_mass_from_expression import ( extract_mass_from_expression, ) self._extract_displacement = extract_displacement self._extract_stress = extract_solid_stress self._extract_mass = extract_mass_from_expression logger.info("Extractors loaded: displacement, von_mises, mass") except ImportError as e: raise ImportError( f"Could not import extractors from optimization_engine.\n" f"Error: {e}" ) from e def _find_atomizer_root(self) -> Optional[Path]: """Walk up from model_dir to find the Atomizer repo root.""" # Look for optimization_engine directory candidate = self.model_dir for _ in range(10): candidate = candidate.parent if (candidate / "optimization_engine").is_dir(): return candidate if candidate == candidate.parent: break # Fallback: common paths for path in [ Path("C:/Users/antoi/Atomizer"), Path("/home/papa/repos/Atomizer"), ]: if (path / "optimization_engine").is_dir(): return path logger.warning("Could not find Atomizer root with optimization_engine/") return None def solve(self, trial: TrialInput) -> TrialResult: """Run a single trial through the NX pipeline. Args: trial: Design variable values. Returns: TrialResult with mass, displacement, stress (or failure info). """ self._iteration += 1 start_time = time.time() # Build expression update dict expressions: Dict[str, float] = { EXPR_HALF_CORE_THICKNESS: trial.beam_half_core_thickness, EXPR_FACE_THICKNESS: trial.beam_face_thickness, EXPR_HOLES_DIAMETER: trial.holes_diameter, EXPR_HOLE_COUNT: float(trial.hole_count), # NX expects float in .exp } logger.info( "Trial %d: core=%.2f face=%.2f dia=%.1f count=%d", self._iteration, trial.beam_half_core_thickness, trial.beam_face_thickness, trial.holes_diameter, trial.hole_count, ) try: # Step 1: Create iteration folder with fresh model copies + .exp file if self.use_iteration_folders: iter_dir = self._nx_solver.create_iteration_folder( iterations_base_dir=self.iterations_dir, iteration_number=self._iteration, expression_updates=expressions, ) sim_file = iter_dir / self.sim_file.name prt_file = iter_dir / self.prt_file.name else: iter_dir = self.model_dir sim_file = self.sim_file prt_file = self.prt_file # Step 2: Run NX journal (update expressions + solve) solve_result = self._nx_solver.run_simulation( sim_file=sim_file, working_dir=iter_dir, expression_updates=expressions, ) if not solve_result.get("success", False): errors = solve_result.get("errors", ["Unknown solver error"]) return TrialResult( success=False, solve_time=time.time() - start_time, error_message=f"NX solve failed: {'; '.join(errors)}", iteration_dir=str(iter_dir), ) op2_file = solve_result.get("op2_file") if not op2_file or not Path(op2_file).exists(): return TrialResult( success=False, solve_time=time.time() - start_time, error_message="OP2 file not generated after solve", iteration_dir=str(iter_dir), ) op2_path = Path(op2_file) # Step 3: Extract mass from journal temp file try: mass_kg = self._extract_mass(prt_file, expression_name=EXPR_MASS) except Exception as e: logger.warning("Mass extraction failed: %s", e) mass_kg = float("nan") # Step 4: Extract displacement from OP2 try: disp_result = self._extract_displacement(op2_path) # For cantilever beam, max displacement IS tip displacement tip_displacement = disp_result["max_displacement"] except Exception as e: logger.warning("Displacement extraction failed: %s", e) tip_displacement = float("nan") # Step 5: Extract max von Mises stress from OP2 # Use shell element extraction (CQUAD4 mesh) try: stress_result = self._extract_stress( op2_path, element_type="cquad4", convert_to_mpa=True, # ⚠️ LAC lesson: NX outputs kPa, must convert ) max_vm_stress = stress_result["max_von_mises"] except Exception as e: logger.warning("Stress extraction failed: %s", e) max_vm_stress = float("nan") elapsed = time.time() - start_time logger.info( "Trial %d complete: mass=%.2f kg, disp=%.3f mm, stress=%.1f MPa (%.1fs)", self._iteration, mass_kg, tip_displacement, max_vm_stress, elapsed, ) return TrialResult( success=True, mass=mass_kg, tip_displacement=tip_displacement, max_von_mises=max_vm_stress, solve_time=elapsed, iteration_dir=str(iter_dir), ) except Exception as e: elapsed = time.time() - start_time logger.error("Trial %d failed: %s", self._iteration, e, exc_info=True) return TrialResult( success=False, solve_time=elapsed, error_message=str(e), iteration_dir=str(iter_dir) if 'iter_dir' in locals() else None, ) def close(self) -> None: """Clean up NX solver resources.""" logger.info("AtomizerNXSolver closed. %d iterations completed.", self._iteration) # --------------------------------------------------------------------------- # Stub solver — for development/testing without NX # --------------------------------------------------------------------------- class StubSolver: """Synthetic solver for testing without NX. Generates physically-plausible approximate results based on beam theory. NOT accurate — only for pipeline validation. """ def __init__(self, **kwargs: Any): self._call_count = 0 logger.warning( "Using NX STUB solver — results are synthetic approximations. " "Replace with AtomizerNXSolver (--backend nxopen) for real evaluations." ) def solve(self, trial: TrialInput) -> TrialResult: """Generate approximate results from beam theory. Uses simplified cantilever beam formulas: - Mass ∝ cross-section area × length - hole_volume - Displacement ~ PL³/3EI (Euler-Bernoulli) - Stress ~ Mc/I (nominal) with hole SCF """ self._call_count += 1 import numpy as np # Geometry (mm) L = 5000.0 # beam length h_half = 250.0 # beam half-height (fixed) w_half = 150.0 # beam half-width (fixed) h_core = trial.beam_half_core_thickness t_face = trial.beam_face_thickness d_hole = trial.holes_diameter n_hole = trial.hole_count # Material: AISI 1005 E = 205000.0 # MPa (Young's modulus) rho = 7.3e-6 # kg/mm³ (7.3 g/cm³) # I-beam cross-section second moment of area (approximate) # Full section: 2*w_half × 2*h_half rectangle # Minus core cutouts (simplified) H = 2 * h_half # 500 mm total height W = 2 * w_half # 300 mm total width I_full = W * H**3 / 12 # Subtract inner rectangle (core region without faces) h_web = H - 2 * t_face w_web = W - 2 * h_core # approximate I_inner = max(0, w_web) * max(0, h_web)**3 / 12 I_eff = max(I_full - I_inner, I_full * 0.01) # don't go to zero # Cross-section area (approximate) A_section = W * H - max(0, w_web) * max(0, h_web) # Hole volume removal web_height = H - 2 * t_face hole_area = n_hole * np.pi * (d_hole / 2)**2 # Only remove from web if holes fit if d_hole < web_height: effective_hole_area = min(hole_area, 0.8 * web_height * 4000) else: effective_hole_area = 0 # Mass vol = A_section * L - effective_hole_area * min(h_core * 2, 50) mass = max(rho * vol, 1.0) # Tip displacement: δ = PL³ / 3EI P = 10000 * 9.80665 # 10,000 kgf → N delta = P * L**3 / (3 * E * I_eff) # Stress: σ = M*c/I with SCF from holes M = P * L # max moment at fixed end c = h_half # distance to extreme fiber sigma_nominal = M * c / I_eff / 1000 # kPa → MPa # Stress concentration from holes (simplified) scf = 1.0 + 0.5 * (d_hole / (web_height + 1)) sigma_max = sigma_nominal * scf # Add noise (±5%) to simulate model variability rng = np.random.default_rng(self._call_count) noise = rng.uniform(0.95, 1.05, 3) return TrialResult( success=True, mass=float(mass * noise[0]), tip_displacement=float(delta * noise[1]), max_von_mises=float(sigma_max * noise[2]), solve_time=0.1, ) def close(self) -> None: """Clean up stub solver.""" logger.info("Stub solver closed.")