From 126f0bb2e01b0614c0bb309e0ea0f5fd42e686b4 Mon Sep 17 00:00:00 2001 From: Antoine Date: Wed, 11 Feb 2026 13:33:09 +0000 Subject: [PATCH] Refactor: nx_interface uses optimization_engine (NXSolver + pyNastran extractors) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AtomizerNXSolver wraps existing NXSolver + extractors from SAT3 pipeline - HEEDS-style iteration folders with fresh model copies per trial - Expression .exp file generation with correct unit mapping - pyNastran OP2 extraction: displacement, von Mises (kPa→MPa), mass - StubSolver improved with beam-theory approximations - Reuses proven journal pipeline (solve_simulation.py) --- .../studies/01_doe_landscape/nx_interface.py | 795 +++++++----------- .../studies/01_doe_landscape/run_doe.py | 2 +- 2 files changed, 321 insertions(+), 476 deletions(-) diff --git a/projects/hydrotech-beam/studies/01_doe_landscape/nx_interface.py b/projects/hydrotech-beam/studies/01_doe_landscape/nx_interface.py index dabc32c3..4f2a3f94 100644 --- a/projects/hydrotech-beam/studies/01_doe_landscape/nx_interface.py +++ b/projects/hydrotech-beam/studies/01_doe_landscape/nx_interface.py @@ -1,9 +1,11 @@ """NX automation interface for Hydrotech Beam optimization. -This module uses the EXISTING Atomizer optimization engine for NX integration: -- optimization_engine.nx.updater.NXParameterUpdater (expression updates via .exp import) -- optimization_engine.nx.solver.NXSolver (journal-based solving with run_journal.exe) -- optimization_engine.extractors.* (pyNastran OP2-based result extraction) +Integrates with the existing Atomizer optimization_engine: +- NXSolver for journal-based solve (run_journal.exe → solve_simulation.py) +- pyNastran OP2 extractors for displacement + stress +- Expression-based mass extraction via journal temp file + +The proven SAT3 pipeline: write .exp → NX journal updates + solves → pyNastran reads OP2. NX Expression Names (confirmed via binary introspection — CONTEXT.md): Design Variables: @@ -20,24 +22,23 @@ NX Expression Names (confirmed via binary introspection — CONTEXT.md): References: CONTEXT.md — Full expression map - OPTIMIZATION_STRATEGY.md §8.2 — Extractor requirements + optimization_engine/nx/solver.py — NXSolver class + optimization_engine/extractors/ — pyNastran-based result extractors + studies/M1_Mirror/SAT3_Trajectory_V7/run_optimization.py — proven pattern """ from __future__ import annotations import logging +import os import sys +import time from dataclasses import dataclass from pathlib import Path -from typing import Protocol +from typing import Any, Dict, Optional, Protocol logger = logging.getLogger(__name__) -# Add Atomizer repo root to sys.path for imports -ATOMIZER_REPO_ROOT = Path("/home/papa/repos/Atomizer") -if str(ATOMIZER_REPO_ROOT) not in sys.path: - sys.path.insert(0, str(ATOMIZER_REPO_ROOT)) - # --------------------------------------------------------------------------- # Data types @@ -64,7 +65,9 @@ class TrialResult: mass: float = float("nan") # kg — from expression `p173` tip_displacement: float = float("nan") # mm — from SOL 101 results max_von_mises: float = float("nan") # MPa — from SOL 101 results + solve_time: float = 0.0 # seconds error_message: str = "" + iteration_dir: Optional[str] = None # path to iteration folder # --------------------------------------------------------------------------- @@ -79,238 +82,131 @@ EXPR_HOLE_COUNT = "hole_count" EXPR_MASS = "p173" # body_property147.mass, kg EXPR_BEAM_LENGTH = "beam_lenght" # ⚠️ TYPO IN NX — intentional +# Unit mapping for .exp file generation (NXSolver._write_expression_file) +UNIT_MAPPING = { + EXPR_HALF_CORE_THICKNESS: "mm", + EXPR_FACE_THICKNESS: "mm", + EXPR_HOLES_DIAMETER: "mm", + EXPR_HOLE_COUNT: "Constant", # integer — no unit +} + # --------------------------------------------------------------------------- # Interface protocol # --------------------------------------------------------------------------- class NXSolverInterface(Protocol): - """Protocol for NX solver backends. + """Protocol for NX solver backends (enables stub/real swap).""" - Implementors must provide the full pipeline: - 1. Update expressions → 2. Rebuild model → 3. Solve SOL 101 → 4. Extract results + def solve(self, trial: TrialInput) -> TrialResult: ... + def close(self) -> None: ... + + +# --------------------------------------------------------------------------- +# Factory +# --------------------------------------------------------------------------- +def create_solver(backend: str = "stub", **kwargs: Any) -> NXSolverInterface: + """Create the appropriate solver backend. + + Args: + backend: "stub" for testing, "nxopen" for real NX runs. + **kwargs: Passed to solver constructor. + For "nxopen": + model_dir: Path to NX model files (required) + nx_version: NX version string (default: "2412") + timeout: Max solve time in seconds (default: 600) + use_iteration_folders: HEEDS-style per-trial folders (default: True) + + Returns: + Solver instance implementing NXSolverInterface. """ - - def evaluate(self, trial_input: TrialInput) -> TrialResult: - """Run a full NX evaluation cycle for one trial. - - Args: - trial_input: Design variable values. - - Returns: - TrialResult with extracted outputs or failure info. - """ - ... - - def close(self) -> None: - """Clean up NX session resources. - - ⚠️ LAC CRITICAL: NEVER kill NX processes directly. - Use NXSessionManager.close_nx_if_allowed() only. - """ - ... + if backend == "stub": + return StubSolver(**kwargs) + elif backend == "nxopen": + return AtomizerNXSolver(**kwargs) + else: + raise ValueError(f"Unknown backend: {backend!r}. Use 'stub' or 'nxopen'.") # --------------------------------------------------------------------------- -# Stub implementation (for development/testing without NX) +# Real solver — wraps optimization_engine # --------------------------------------------------------------------------- -class NXStubSolver: - """Stub NX solver for development and testing. +class AtomizerNXSolver: + """Production solver using Atomizer's optimization_engine. - Returns synthetic results based on simple analytical approximations - of the beam behavior. NOT physically accurate — use only for - testing the optimization pipeline. - - The stub uses rough scaling relationships: - - Mass ∝ (core + face) and inversely with hole area - - Displacement ∝ 1/I where I depends on core and face thickness - - Stress ∝ M*y/I (bending stress approximation) + Pipeline (proven in SAT3): + 1. Create iteration folder with fresh model copies + 2. Write .exp file with updated expression values + 3. NX journal: open .sim → import .exp → update geometry → solve SOL 101 + 4. Journal writes mass to _temp_mass.txt + 5. pyNastran reads .op2 → extract displacement + stress + 6. Return results """ - def __init__(self) -> None: - """Initialize stub solver.""" - logger.warning( - "Using NX STUB solver — results are synthetic approximations. " - "Replace with NXOpenSolver for real evaluations." - ) - - def evaluate(self, trial_input: TrialInput) -> TrialResult: - """Return synthetic results based on simplified beam mechanics. - - Args: - trial_input: Design variable values. - - Returns: - TrialResult with approximate values. - """ - try: - return self._compute_approximate(trial_input) - except Exception as e: - logger.error("Stub evaluation failed: %s", e) - return TrialResult( - success=False, - error_message=f"Stub evaluation error: {e}", - ) - - def _compute_approximate(self, inp: TrialInput) -> TrialResult: - """Simple analytical approximation of beam response. - - This is a ROUGH approximation for pipeline testing only. - Real physics requires NX Nastran SOL 101. - """ - import math - - # Geometry - L = 5000.0 # mm — beam length - b = 300.0 # mm — beam width (2 × beam_half_width) - h_core = inp.beam_half_core_thickness # mm — half core - t_face = inp.beam_face_thickness # mm — face thickness - d_hole = inp.holes_diameter # mm - n_holes = inp.hole_count - - # Total height and section properties (simplified I-beam) - h_total = 500.0 # mm — 2 × beam_half_height (fixed) - - # Approximate second moment of area (sandwich beam) - # I ≈ b*h_total^3/12 - b*(h_total-2*t_face)^3/12 + web contribution - h_inner = h_total - 2.0 * t_face - I_section = (b * h_total**3 / 12.0) - (b * max(h_inner, 0.0) ** 3 / 12.0) - - # Add core contribution - I_section += 2.0 * h_core * h_total**2 / 4.0 # approximate - - # Hole area reduction (mass) - hole_area = n_holes * math.pi * (d_hole / 2.0) ** 2 # mm² - - # Approximate mass (steel: 7.3 g/cm³ = 7.3e-6 kg/mm³) - rho = 7.3e-6 # kg/mm³ - # Gross cross-section area (very simplified) - A_gross = 2.0 * b * t_face + 2.0 * h_core * h_total - # Remove holes from web - web_thickness = 2.0 * h_core # approximate web thickness - A_holes = n_holes * math.pi * (d_hole / 2.0) ** 2 - V_solid = A_gross * L - V_holes = A_holes * web_thickness - mass = rho * (V_solid - min(V_holes, V_solid * 0.8)) - - # Approximate tip displacement (cantilever, point load) - # δ = PL³/(3EI) - P = 10000.0 * 9.81 # 10,000 kgf → N - E = 200000.0 # MPa (steel) - if I_section > 0: - displacement = P * L**3 / (3.0 * E * I_section) - else: - displacement = 9999.0 - - # Approximate max bending stress - # σ = M*y/I where M = P*L, y = h_total/2 - M_max = P * L # N·mm - y_max = h_total / 2.0 - if I_section > 0: - stress = M_max * y_max / I_section # MPa - else: - stress = 9999.0 - - return TrialResult( - success=True, - mass=mass, - tip_displacement=displacement, - max_von_mises=stress, - ) - - def close(self) -> None: - """No-op for stub solver.""" - logger.info("Stub solver closed.") - - -# --------------------------------------------------------------------------- -# NXOpen implementation using existing Atomizer engine -# --------------------------------------------------------------------------- -class NXOpenSolver: - """Real NX solver using existing Atomizer optimization engine. - - Uses these Atomizer components: - - optimization_engine.nx.solver.NXSolver (journal-based solving with run_journal.exe) - → handles iteration folders, expression import via .exp, and NX solve - - optimization_engine.extractors.extract_displacement.extract_displacement() - - optimization_engine.extractors.extract_von_mises_stress.extract_solid_stress() - - optimization_engine.extractors.extract_mass_from_expression.extract_mass_from_expression() - - Pipeline per trial (HEEDS-style iteration folder pattern): - 1. NXSolver.create_iteration_folder() — copies model files + writes params.exp - 2. NXSolver.run_simulation() — runs solve_simulation.py journal via run_journal.exe - → The journal imports params.exp, rebuilds geometry, updates FEM, solves, extracts mass - 3. extract_displacement(op2) — max displacement from SOL 101 - 4. extract_solid_stress(op2) — max von Mises (auto-detect element type) - 5. extract_mass_from_expression(prt) — reads _temp_mass.txt written by journal - - Files required in model_dir: - - Beam.prt (part file with expressions) - - Beam_sim1.sim (simulation file) - - Expected OP2 output: beam_sim1-solution_1.op2 - - Expression names (confirmed from binary introspection): - - DVs: beam_half_core_thickness, beam_face_thickness, holes_diameter, hole_count - - Mass output: p173 (body_property147.mass, kg) - - References: - - M1_Mirror/SAT3_Trajectory_V7/run_optimization.py — FEARunner pattern - - optimization_engine/nx/solver.py — NXSolver API - - optimization_engine/nx/solve_simulation.py — Journal internals - """ - - # SIM filename and solution name for this model - SIM_FILENAME = "Beam_sim1.sim" - PRT_FILENAME = "Beam.prt" - SOLUTION_NAME = "Solution 1" - # Expected OP2: -.op2 - # = beam_sim1-solution_1.op2 - def __init__( self, - model_dir: str | Path, - nx_install_dir: str | Path | None = None, + model_dir: str | Path = ".", + nx_version: str = "2412", timeout: int = 600, - nastran_version: str = "2412", - ) -> None: - """Initialize NXOpen solver using Atomizer engine. + use_iteration_folders: bool = True, + ): + model_dir = Path(model_dir) + if not model_dir.exists(): + raise FileNotFoundError(f"Model directory not found: {model_dir}") - Args: - model_dir: Path to directory containing Beam.prt, Beam_sim1.sim, etc. - This is the "master model" directory — files are copied per iteration. - nx_install_dir: Path to NX installation (auto-detected if None). - timeout: Timeout per trial in seconds (default: 600s = 10 min). - nastran_version: NX version string (e.g., "2412", "2506", "2512"). - """ - import time as _time # avoid repeated __import__ - self._time = _time - - self.model_dir = Path(model_dir) + self.model_dir = model_dir + self.nx_version = nx_version self.timeout = timeout - self.nastran_version = nastran_version - self.nx_install_dir = str(nx_install_dir) if nx_install_dir else None + self.use_iteration_folders = use_iteration_folders + self._iteration = 0 - if not self.model_dir.exists(): - raise FileNotFoundError(f"Model directory not found: {self.model_dir}") - - # Validate required files - self.prt_file = self.model_dir / self.PRT_FILENAME - self.sim_file = self.model_dir / self.SIM_FILENAME - - for f in (self.prt_file, self.sim_file): - if not f.exists(): - raise FileNotFoundError(f"Required file not found: {f}") - - # Iterations output directory (sibling to model_dir per study convention) - # Layout: studies/01_doe_landscape/ - # 1_setup/model/ ← model_dir (master) - # 2_iterations/ ← iteration folders - # 3_results/ ← final outputs - self.iterations_dir = self.model_dir.parent.parent / "2_iterations" + # Set up iteration base directory + self.iterations_dir = model_dir.parent / "2_iterations" self.iterations_dir.mkdir(parents=True, exist_ok=True) - # Import Atomizer components at init time (fail-fast on missing engine) + # Find the .sim file + sim_files = list(model_dir.glob("*.sim")) + if not sim_files: + raise FileNotFoundError(f"No .sim file found in {model_dir}") + self.sim_file = sim_files[0] + logger.info("SIM file: %s", self.sim_file.name) + + # Find the .prt file (for mass extraction) + prt_files = [f for f in model_dir.glob("*.prt") if "_i." not in f.name] + if not prt_files: + raise FileNotFoundError(f"No .prt file found in {model_dir}") + self.prt_file = prt_files[0] + logger.info("PRT file: %s", self.prt_file.name) + + # Add Atomizer root to path for imports + atomizer_root = self._find_atomizer_root() + if atomizer_root and str(atomizer_root) not in sys.path: + sys.path.insert(0, str(atomizer_root)) + logger.info("Added Atomizer root to path: %s", atomizer_root) + + # Import optimization_engine components try: from optimization_engine.nx.solver import NXSolver + self._nx_solver = NXSolver( + nastran_version=nx_version, + timeout=timeout, + use_journal=True, + study_name="hydrotech_beam_doe", + use_iteration_folders=use_iteration_folders, + master_model_dir=model_dir, + ) + logger.info( + "NXSolver initialized (NX %s, timeout=%ds, journal mode)", + nx_version, timeout, + ) + except ImportError as e: + raise ImportError( + f"Could not import optimization_engine. " + f"Ensure Atomizer repo root is on PYTHONPATH.\n" + f"Error: {e}" + ) from e + + # Import extractors + try: from optimization_engine.extractors.extract_displacement import ( extract_displacement, ) @@ -320,310 +216,259 @@ class NXOpenSolver: from optimization_engine.extractors.extract_mass_from_expression import ( extract_mass_from_expression, ) - - self._NXSolver = NXSolver - self._extract_displacement = staticmethod(extract_displacement) - self._extract_stress = staticmethod(extract_solid_stress) - self._extract_mass = staticmethod(extract_mass_from_expression) - + self._extract_displacement = extract_displacement + self._extract_stress = extract_solid_stress + self._extract_mass = extract_mass_from_expression + logger.info("Extractors loaded: displacement, von_mises, mass") except ImportError as e: raise ImportError( - f"Failed to import Atomizer optimization engine: {e}\n" - f"Ensure {ATOMIZER_REPO_ROOT} is accessible and contains optimization_engine/" + f"Could not import extractors from optimization_engine.\n" + f"Error: {e}" ) from e - # Lazy-init solver on first evaluate() call - self._solver: object | None = None - self._trial_counter: int = 0 + def _find_atomizer_root(self) -> Optional[Path]: + """Walk up from model_dir to find the Atomizer repo root.""" + # Look for optimization_engine directory + candidate = self.model_dir + for _ in range(10): + candidate = candidate.parent + if (candidate / "optimization_engine").is_dir(): + return candidate + if candidate == candidate.parent: + break - logger.info( - "NXOpenSolver initialized — model_dir=%s, timeout=%ds, nastran=%s", - self.model_dir, - self.timeout, - self.nastran_version, - ) + # Fallback: common paths + for path in [ + Path("C:/Users/antoi/Atomizer"), + Path("/home/papa/repos/Atomizer"), + ]: + if (path / "optimization_engine").is_dir(): + return path - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ + logger.warning("Could not find Atomizer root with optimization_engine/") + return None - def evaluate(self, trial_input: TrialInput) -> TrialResult: - """Full NX evaluation pipeline for one trial. - - Pipeline (mirrors M1_Mirror/SAT3_Trajectory_V7 FEARunner.run_fea): - 1. create_iteration_folder → copies model + writes params.exp - 2. run_simulation → journal updates expressions, rebuilds, solves - 3. extract displacement, stress, mass from results + def solve(self, trial: TrialInput) -> TrialResult: + """Run a single trial through the NX pipeline. Args: - trial_input: Design variable values. + trial: Design variable values. Returns: - TrialResult with extracted outputs or failure info. + TrialResult with mass, displacement, stress (or failure info). """ - self._trial_counter += 1 - trial_num = self._trial_counter - t_start = self._time.time() + self._iteration += 1 + start_time = time.time() + + # Build expression update dict + expressions: Dict[str, float] = { + EXPR_HALF_CORE_THICKNESS: trial.beam_half_core_thickness, + EXPR_FACE_THICKNESS: trial.beam_face_thickness, + EXPR_HOLES_DIAMETER: trial.holes_diameter, + EXPR_HOLE_COUNT: float(trial.hole_count), # NX expects float in .exp + } logger.info( - "Trial %d — DVs: core=%.3f mm, face=%.3f mm, hole_d=%.3f mm, n_holes=%d", - trial_num, - trial_input.beam_half_core_thickness, - trial_input.beam_face_thickness, - trial_input.holes_diameter, - trial_input.hole_count, + "Trial %d: core=%.2f face=%.2f dia=%.1f count=%d", + self._iteration, + trial.beam_half_core_thickness, + trial.beam_face_thickness, + trial.holes_diameter, + trial.hole_count, ) try: - # 0. Lazy-init solver - if self._solver is None: - self._init_solver() - - expressions = self._build_expression_dict(trial_input) - - # 1. Create iteration folder with fresh model copies + params.exp - iter_folder = self._solver.create_iteration_folder( - iterations_base_dir=self.iterations_dir, - iteration_number=trial_num, - expression_updates=expressions, - ) - logger.info(" Iteration folder: %s", iter_folder) - - working_sim = iter_folder / self.SIM_FILENAME - working_prt = iter_folder / self.PRT_FILENAME - - if not working_sim.exists(): - return TrialResult( - success=False, - error_message=f"SIM file missing in iteration folder: {working_sim}", + # Step 1: Create iteration folder with fresh model copies + .exp file + if self.use_iteration_folders: + iter_dir = self._nx_solver.create_iteration_folder( + iterations_base_dir=self.iterations_dir, + iteration_number=self._iteration, + expression_updates=expressions, ) - - # 2. Solve — journal handles expression import + geometry rebuild + FEM update + solve - # expression_updates are passed as argv to the journal (key=value pairs) - logger.info(" Solving: %s", working_sim.name) - solve_result = self._solver.run_simulation( - sim_file=working_sim, - working_dir=iter_folder, - cleanup=False, # keep OP2/F06 for extraction - expression_updates=expressions, - solution_name=self.SOLUTION_NAME, - ) - - if not solve_result["success"]: - errors = solve_result.get("errors", ["Unknown error"]) - rc = solve_result.get("return_code", "?") - msg = f"NX solve failed (rc={rc}): {'; '.join(errors)}" - logger.error(" %s", msg) - return TrialResult(success=False, error_message=msg) - - # 3. Locate OP2 - op2_file = solve_result.get("op2_file") - if op2_file is None or not Path(op2_file).exists(): - # Fallback: try the expected naming convention - op2_file = iter_folder / "beam_sim1-solution_1.op2" - if not op2_file.exists(): - return TrialResult( - success=False, - error_message=f"OP2 not found. Expected: {op2_file}", - ) + sim_file = iter_dir / self.sim_file.name + prt_file = iter_dir / self.prt_file.name else: - op2_file = Path(op2_file) + iter_dir = self.model_dir + sim_file = self.sim_file + prt_file = self.prt_file - logger.info(" OP2: %s", op2_file.name) + # Step 2: Run NX journal (update expressions + solve) + solve_result = self._nx_solver.run_simulation( + sim_file=sim_file, + working_dir=iter_dir, + expression_updates=expressions, + ) - # 4a. Extract displacement - try: - disp_result = self._extract_displacement(op2_file) - tip_displacement = disp_result["max_displacement"] # mm - except Exception as e: - logger.error(" Displacement extraction failed: %s", e) + if not solve_result.get("success", False): + errors = solve_result.get("errors", ["Unknown solver error"]) return TrialResult( success=False, - error_message=f"Displacement extraction failed: {e}", + solve_time=time.time() - start_time, + error_message=f"NX solve failed: {'; '.join(errors)}", + iteration_dir=str(iter_dir), ) - # 4b. Extract stress — auto-detect element type (solid or shell) - # Pass element_type=None so it checks CTETRA, CHEXA, CPENTA, CPYRAM + op2_file = solve_result.get("op2_file") + if not op2_file or not Path(op2_file).exists(): + return TrialResult( + success=False, + solve_time=time.time() - start_time, + error_message="OP2 file not generated after solve", + iteration_dir=str(iter_dir), + ) + + op2_path = Path(op2_file) + + # Step 3: Extract mass from journal temp file + try: + mass_kg = self._extract_mass(prt_file, expression_name=EXPR_MASS) + except Exception as e: + logger.warning("Mass extraction failed: %s", e) + mass_kg = float("nan") + + # Step 4: Extract displacement from OP2 + try: + disp_result = self._extract_displacement(op2_path) + # For cantilever beam, max displacement IS tip displacement + tip_displacement = disp_result["max_displacement"] + except Exception as e: + logger.warning("Displacement extraction failed: %s", e) + tip_displacement = float("nan") + + # Step 5: Extract max von Mises stress from OP2 + # Use shell element extraction (CQUAD4 mesh) try: stress_result = self._extract_stress( - op2_file, - element_type=None, # auto-detect from OP2 contents - convert_to_mpa=True, # NX kg-mm-s → kPa, convert to MPa + op2_path, + element_type="cquad4", + convert_to_mpa=True, # ⚠️ LAC lesson: NX outputs kPa, must convert ) - max_von_mises = stress_result["max_von_mises"] # MPa + max_vm_stress = stress_result["max_von_mises"] except Exception as e: - # Fallback: try shell elements if solid extraction failed - logger.warning(" Solid stress extraction failed, trying shell: %s", e) - try: - stress_result = self._extract_stress( - op2_file, - element_type="cquad4", - convert_to_mpa=True, - ) - max_von_mises = stress_result["max_von_mises"] - except Exception as e2: - logger.error(" Stress extraction failed (all types): %s", e2) - return TrialResult( - success=False, - error_message=f"Stress extraction failed: {e}; shell fallback: {e2}", - ) + logger.warning("Stress extraction failed: %s", e) + max_vm_stress = float("nan") - # 4c. Extract mass — reads _temp_mass.txt written by solve_simulation.py journal - try: - mass = self._extract_mass(working_prt, expression_name=EXPR_MASS) # kg - except FileNotFoundError: - # _temp_mass.txt not found — journal may not have written it for single-part models - # Fallback: try reading from _temp_part_properties.json - logger.warning(" _temp_mass.txt not found, trying _temp_part_properties.json") - mass = self._extract_mass_fallback(iter_folder) - if mass is None: - return TrialResult( - success=False, - error_message="Mass extraction failed: _temp_mass.txt not found", - ) - except Exception as e: - logger.error(" Mass extraction failed: %s", e) - return TrialResult( - success=False, - error_message=f"Mass extraction failed: {e}", - ) - - elapsed = self._time.time() - t_start + elapsed = time.time() - start_time logger.info( - " Trial %d OK (%.1fs) — mass=%.4f kg, disp=%.4f mm, σ_vm=%.2f MPa", - trial_num, - elapsed, - mass, - tip_displacement, - max_von_mises, + "Trial %d complete: mass=%.2f kg, disp=%.3f mm, stress=%.1f MPa (%.1fs)", + self._iteration, mass_kg, tip_displacement, max_vm_stress, elapsed, ) return TrialResult( success=True, - mass=mass, + mass=mass_kg, tip_displacement=tip_displacement, - max_von_mises=max_von_mises, + max_von_mises=max_vm_stress, + solve_time=elapsed, + iteration_dir=str(iter_dir), ) except Exception as e: - elapsed = self._time.time() - t_start - logger.error(" Trial %d FAILED (%.1fs): %s", trial_num, elapsed, e) + elapsed = time.time() - start_time + logger.error("Trial %d failed: %s", self._iteration, e, exc_info=True) return TrialResult( success=False, - error_message=f"Unexpected error in trial {trial_num}: {e}", + solve_time=elapsed, + error_message=str(e), + iteration_dir=str(iter_dir) if 'iter_dir' in locals() else None, ) def close(self) -> None: - """Clean up NX session resources. - - ⚠️ LAC CRITICAL: NEVER kill NX processes directly. - Uses NXSessionManager for safe lock cleanup only. - """ - if self._solver is not None: - sm = getattr(self._solver, "session_manager", None) - if sm is not None: - logger.info("Cleaning up NX session locks via session manager") - try: - sm.cleanup_stale_locks() - except Exception as e: - logger.warning("Session lock cleanup warning: %s", e) - - self._solver = None - logger.info("NXOpenSolver closed.") - - # ------------------------------------------------------------------ - # Private helpers - # ------------------------------------------------------------------ - - def _init_solver(self) -> None: - """Lazy-initialize NXSolver (matches SAT3_V7 FEARunner.setup pattern).""" - logger.info("Initializing NXSolver (nastran=%s, timeout=%ds)", self.nastran_version, self.timeout) - - kwargs: dict = { - "nastran_version": self.nastran_version, - "timeout": self.timeout, - "use_journal": True, - "enable_session_management": True, - "study_name": "hydrotech_beam_doe", - "use_iteration_folders": True, - "master_model_dir": str(self.model_dir), - } - if self.nx_install_dir: - kwargs["nx_install_dir"] = self.nx_install_dir - - self._solver = self._NXSolver(**kwargs) - logger.info("NXSolver ready") - - def _build_expression_dict(self, trial_input: TrialInput) -> dict[str, float]: - """Build NX expression name→value dict for the solver. - - These are passed to: - - create_iteration_folder() → writes params.exp (unit defaulting to mm) - - run_simulation(expression_updates=...) → passed as argv to solve journal - """ - return { - EXPR_HALF_CORE_THICKNESS: trial_input.beam_half_core_thickness, - EXPR_FACE_THICKNESS: trial_input.beam_face_thickness, - EXPR_HOLES_DIAMETER: trial_input.holes_diameter, - EXPR_HOLE_COUNT: float(trial_input.hole_count), # NX expressions are float - } - - @staticmethod - def _extract_mass_fallback(iter_folder: Path) -> float | None: - """Try to read mass from _temp_part_properties.json (backup path).""" - import json as _json - - props_file = iter_folder / "_temp_part_properties.json" - if not props_file.exists(): - return None - try: - with open(props_file) as f: - props = _json.load(f) - mass = props.get("mass_kg", 0.0) - if mass > 0: - logger.info(" Mass from _temp_part_properties.json: %.6f kg", mass) - return mass - return None - except Exception as e: - logger.warning(" Failed to read %s: %s", props_file, e) - return None + """Clean up NX solver resources.""" + logger.info("AtomizerNXSolver closed. %d iterations completed.", self._iteration) # --------------------------------------------------------------------------- -# Factory +# Stub solver — for development/testing without NX # --------------------------------------------------------------------------- -def create_solver( - backend: str = "stub", - model_dir: str = "", - nx_install_dir: str | None = None, - timeout: int = 600, - nastran_version: str = "2412", -) -> NXStubSolver | NXOpenSolver: - """Create an NX solver instance. +class StubSolver: + """Synthetic solver for testing without NX. - Args: - backend: "stub" for development, "nxopen" for real NX (Windows/dalidou only). - model_dir: Path to NX model directory (required for nxopen backend). - nx_install_dir: Path to NX installation (auto-detected if None). - timeout: Timeout per trial in seconds (default: 600s = 10 min). - nastran_version: NX version (e.g., "2412", "2506", "2512"). - - Returns: - Solver instance implementing the NXSolverInterface protocol. - - Raises: - ValueError: If backend is unknown or model_dir missing for nxopen. + Generates physically-plausible approximate results based on + beam theory. NOT accurate — only for pipeline validation. """ - if backend == "stub": - return NXStubSolver() - elif backend == "nxopen": - if not model_dir: - raise ValueError("model_dir required for nxopen backend") - return NXOpenSolver( - model_dir=model_dir, - nx_install_dir=nx_install_dir, - timeout=timeout, - nastran_version=nastran_version, + + def __init__(self, **kwargs: Any): + self._call_count = 0 + logger.warning( + "Using NX STUB solver — results are synthetic approximations. " + "Replace with AtomizerNXSolver (--backend nxopen) for real evaluations." ) - else: - raise ValueError(f"Unknown backend: {backend!r}. Use 'stub' or 'nxopen'.") \ No newline at end of file + + def solve(self, trial: TrialInput) -> TrialResult: + """Generate approximate results from beam theory. + + Uses simplified cantilever beam formulas: + - Mass ∝ cross-section area × length - hole_volume + - Displacement ~ PL³/3EI (Euler-Bernoulli) + - Stress ~ Mc/I (nominal) with hole SCF + """ + self._call_count += 1 + import numpy as np + + # Geometry (mm) + L = 5000.0 # beam length + h_half = 250.0 # beam half-height (fixed) + w_half = 150.0 # beam half-width (fixed) + h_core = trial.beam_half_core_thickness + t_face = trial.beam_face_thickness + d_hole = trial.holes_diameter + n_hole = trial.hole_count + + # Material: AISI 1005 + E = 205000.0 # MPa (Young's modulus) + rho = 7.3e-6 # kg/mm³ (7.3 g/cm³) + + # I-beam cross-section second moment of area (approximate) + # Full section: 2*w_half × 2*h_half rectangle + # Minus core cutouts (simplified) + H = 2 * h_half # 500 mm total height + W = 2 * w_half # 300 mm total width + I_full = W * H**3 / 12 + # Subtract inner rectangle (core region without faces) + h_web = H - 2 * t_face + w_web = W - 2 * h_core # approximate + I_inner = max(0, w_web) * max(0, h_web)**3 / 12 + I_eff = max(I_full - I_inner, I_full * 0.01) # don't go to zero + + # Cross-section area (approximate) + A_section = W * H - max(0, w_web) * max(0, h_web) + # Hole volume removal + web_height = H - 2 * t_face + hole_area = n_hole * np.pi * (d_hole / 2)**2 + # Only remove from web if holes fit + if d_hole < web_height: + effective_hole_area = min(hole_area, 0.8 * web_height * 4000) + else: + effective_hole_area = 0 + # Mass + vol = A_section * L - effective_hole_area * min(h_core * 2, 50) + mass = max(rho * vol, 1.0) + + # Tip displacement: δ = PL³ / 3EI + P = 10000 * 9.80665 # 10,000 kgf → N + delta = P * L**3 / (3 * E * I_eff) + + # Stress: σ = M*c/I with SCF from holes + M = P * L # max moment at fixed end + c = h_half # distance to extreme fiber + sigma_nominal = M * c / I_eff / 1000 # kPa → MPa + # Stress concentration from holes (simplified) + scf = 1.0 + 0.5 * (d_hole / (web_height + 1)) + sigma_max = sigma_nominal * scf + + # Add noise (±5%) to simulate model variability + rng = np.random.default_rng(self._call_count) + noise = rng.uniform(0.95, 1.05, 3) + + return TrialResult( + success=True, + mass=float(mass * noise[0]), + tip_displacement=float(delta * noise[1]), + max_von_mises=float(sigma_max * noise[2]), + solve_time=0.1, + ) + + def close(self) -> None: + """Clean up stub solver.""" + logger.info("Stub solver closed.") diff --git a/projects/hydrotech-beam/studies/01_doe_landscape/run_doe.py b/projects/hydrotech-beam/studies/01_doe_landscape/run_doe.py index f25de316..0c8b135c 100644 --- a/projects/hydrotech-beam/studies/01_doe_landscape/run_doe.py +++ b/projects/hydrotech-beam/studies/01_doe_landscape/run_doe.py @@ -173,7 +173,7 @@ def evaluate_trial( ) t_start = time.monotonic() - nx_result: TrialResult = solver.evaluate(trial_input) + nx_result: TrialResult = solver.solve(trial_input) t_elapsed = time.monotonic() - t_start trial.set_user_attr("solve_time_s", round(t_elapsed, 2))