- Each iteration gets: params.json, results.json, OP2, F06, mass files - Model directory stays clean (no solver output buildup) - Study folder is self-contained with full trial history
536 lines
20 KiB
Python
536 lines
20 KiB
Python
"""NX automation interface for Hydrotech Beam optimization.
|
||
|
||
Integrates with the existing Atomizer optimization_engine:
|
||
- NXSolver for journal-based solve (run_journal.exe → solve_simulation.py)
|
||
- pyNastran OP2 extractors for displacement + stress
|
||
- Expression-based mass extraction via journal temp file
|
||
|
||
The proven SAT3 pipeline: write .exp → NX journal updates + solves → pyNastran reads OP2.
|
||
|
||
NX Expression Names (confirmed via binary introspection — CONTEXT.md):
|
||
Design Variables:
|
||
- beam_half_core_thickness (mm, continuous)
|
||
- beam_face_thickness (mm, continuous)
|
||
- holes_diameter (mm, continuous)
|
||
- hole_count (integer, links to Pattern_p7)
|
||
Outputs:
|
||
- p173 (mass in kg, body_property147.mass)
|
||
Fixed:
|
||
- beam_lenght (⚠️ TYPO in NX — no 'h', 5000 mm)
|
||
- beam_half_height (250 mm)
|
||
- beam_half_width (150 mm)
|
||
|
||
References:
|
||
CONTEXT.md — Full expression map
|
||
optimization_engine/nx/solver.py — NXSolver class
|
||
optimization_engine/extractors/ — pyNastran-based result extractors
|
||
studies/M1_Mirror/SAT3_Trajectory_V7/run_optimization.py — proven pattern
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional, Protocol
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data types
|
||
# ---------------------------------------------------------------------------
|
||
@dataclass(frozen=True)
|
||
class TrialInput:
|
||
"""Design variable values for a single trial."""
|
||
|
||
beam_half_core_thickness: float # mm — DV1
|
||
beam_face_thickness: float # mm — DV2
|
||
holes_diameter: float # mm — DV3
|
||
hole_count: int # — DV4
|
||
|
||
|
||
@dataclass
|
||
class TrialResult:
|
||
"""Results extracted from NX after a trial solve.
|
||
|
||
All values populated after a successful SOL 101 solve.
|
||
On failure, success=False and error_message explains the failure.
|
||
"""
|
||
|
||
success: bool
|
||
mass: float = float("nan") # kg — from expression `p173`
|
||
tip_displacement: float = float("nan") # mm — from SOL 101 results
|
||
max_von_mises: float = float("nan") # MPa — from SOL 101 results
|
||
solve_time: float = 0.0 # seconds
|
||
error_message: str = ""
|
||
iteration_dir: Optional[str] = None # path to iteration folder
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# NX expression name constants
|
||
# ---------------------------------------------------------------------------
|
||
# ⚠️ These are EXACT NX expression names from binary introspection.
|
||
# Do NOT change spelling — `beam_lenght` has a typo (no 'h') in NX.
|
||
EXPR_HALF_CORE_THICKNESS = "beam_half_core_thickness"
|
||
EXPR_FACE_THICKNESS = "beam_face_thickness"
|
||
EXPR_HOLES_DIAMETER = "holes_diameter"
|
||
EXPR_HOLE_COUNT = "hole_count"
|
||
EXPR_MASS = "p173" # body_property147.mass, kg
|
||
EXPR_BEAM_LENGTH = "beam_lenght" # ⚠️ TYPO IN NX — intentional
|
||
|
||
# Unit mapping for .exp file generation (NXSolver._write_expression_file)
|
||
UNIT_MAPPING = {
|
||
EXPR_HALF_CORE_THICKNESS: "mm",
|
||
EXPR_FACE_THICKNESS: "mm",
|
||
EXPR_HOLES_DIAMETER: "mm",
|
||
EXPR_HOLE_COUNT: "Constant", # integer — no unit
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Interface protocol
|
||
# ---------------------------------------------------------------------------
|
||
class NXSolverInterface(Protocol):
|
||
"""Protocol for NX solver backends (enables stub/real swap)."""
|
||
|
||
def solve(self, trial: TrialInput) -> TrialResult: ...
|
||
def close(self) -> None: ...
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Factory
|
||
# ---------------------------------------------------------------------------
|
||
def create_solver(backend: str = "stub", **kwargs: Any) -> NXSolverInterface:
|
||
"""Create the appropriate solver backend.
|
||
|
||
Args:
|
||
backend: "stub" for testing, "nxopen" for real NX runs.
|
||
**kwargs: Passed to solver constructor.
|
||
For "nxopen":
|
||
model_dir: Path to NX model files (required)
|
||
nx_version: NX version string (default: "2412")
|
||
timeout: Max solve time in seconds (default: 600)
|
||
use_iteration_folders: HEEDS-style per-trial folders (default: True)
|
||
|
||
Returns:
|
||
Solver instance implementing NXSolverInterface.
|
||
"""
|
||
if backend == "stub":
|
||
return StubSolver(**kwargs)
|
||
elif backend == "nxopen":
|
||
return AtomizerNXSolver(**kwargs)
|
||
else:
|
||
raise ValueError(f"Unknown backend: {backend!r}. Use 'stub' or 'nxopen'.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Real solver — wraps optimization_engine
|
||
# ---------------------------------------------------------------------------
|
||
class AtomizerNXSolver:
|
||
"""Production solver using Atomizer's optimization_engine.
|
||
|
||
Pipeline (proven in SAT3):
|
||
1. Create iteration folder with fresh model copies
|
||
2. Write .exp file with updated expression values
|
||
3. NX journal: open .sim → import .exp → update geometry → solve SOL 101
|
||
4. Journal writes mass to _temp_mass.txt
|
||
5. pyNastran reads .op2 → extract displacement + stress
|
||
6. Return results
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
model_dir: str | Path = ".",
|
||
nx_version: str = "2412",
|
||
timeout: int = 600,
|
||
use_iteration_folders: bool = False, # Disabled: copied NX files break internal references
|
||
):
|
||
model_dir = Path(model_dir)
|
||
if not model_dir.exists():
|
||
raise FileNotFoundError(f"Model directory not found: {model_dir}")
|
||
|
||
self.model_dir = model_dir.resolve()
|
||
self.nx_version = nx_version
|
||
self.timeout = timeout
|
||
self.use_iteration_folders = use_iteration_folders
|
||
self._iteration = 0
|
||
|
||
# Iteration outputs go inside the study folder, not models/
|
||
# study_dir = the directory where run_doe.py lives
|
||
self.study_dir = Path(__file__).parent.resolve()
|
||
self.iterations_dir = self.study_dir / "iterations"
|
||
self.iterations_dir.mkdir(parents=True, exist_ok=True)
|
||
logger.info("Iterations dir: %s", self.iterations_dir)
|
||
|
||
# Find the .sim file
|
||
sim_files = list(model_dir.glob("*.sim"))
|
||
if not sim_files:
|
||
raise FileNotFoundError(f"No .sim file found in {model_dir}")
|
||
self.sim_file = sim_files[0]
|
||
logger.info("SIM file: %s", self.sim_file.name)
|
||
|
||
# Find the .prt file (for mass extraction)
|
||
prt_files = [f for f in model_dir.glob("*.prt") if "_i." not in f.name]
|
||
if not prt_files:
|
||
raise FileNotFoundError(f"No .prt file found in {model_dir}")
|
||
self.prt_file = prt_files[0]
|
||
logger.info("PRT file: %s", self.prt_file.name)
|
||
|
||
# Add Atomizer root to path for imports
|
||
atomizer_root = self._find_atomizer_root()
|
||
if atomizer_root and str(atomizer_root) not in sys.path:
|
||
sys.path.insert(0, str(atomizer_root))
|
||
logger.info("Added Atomizer root to path: %s", atomizer_root)
|
||
|
||
# Import optimization_engine components
|
||
try:
|
||
from optimization_engine.nx.solver import NXSolver
|
||
self._nx_solver = NXSolver(
|
||
nastran_version=nx_version,
|
||
timeout=timeout,
|
||
use_journal=True,
|
||
study_name="hydrotech_beam_doe",
|
||
use_iteration_folders=False, # Direct model: avoid broken NX file references
|
||
master_model_dir=model_dir,
|
||
)
|
||
logger.info(
|
||
"NXSolver initialized (NX %s, timeout=%ds, journal mode)",
|
||
nx_version, timeout,
|
||
)
|
||
except ImportError as e:
|
||
raise ImportError(
|
||
f"Could not import optimization_engine. "
|
||
f"Ensure Atomizer repo root is on PYTHONPATH.\n"
|
||
f"Error: {e}"
|
||
) from e
|
||
|
||
# Import extractors
|
||
try:
|
||
from optimization_engine.extractors.extract_displacement import (
|
||
extract_displacement,
|
||
)
|
||
from optimization_engine.extractors.extract_von_mises_stress import (
|
||
extract_solid_stress,
|
||
)
|
||
from optimization_engine.extractors.extract_mass_from_expression import (
|
||
extract_mass_from_expression,
|
||
)
|
||
self._extract_displacement = extract_displacement
|
||
self._extract_stress = extract_solid_stress
|
||
self._extract_mass = extract_mass_from_expression
|
||
logger.info("Extractors loaded: displacement, von_mises, mass")
|
||
except ImportError as e:
|
||
raise ImportError(
|
||
f"Could not import extractors from optimization_engine.\n"
|
||
f"Error: {e}"
|
||
) from e
|
||
|
||
def _find_atomizer_root(self) -> Optional[Path]:
|
||
"""Walk up from model_dir to find the Atomizer repo root."""
|
||
# Look for optimization_engine directory
|
||
candidate = self.model_dir
|
||
for _ in range(10):
|
||
candidate = candidate.parent
|
||
if (candidate / "optimization_engine").is_dir():
|
||
return candidate
|
||
if candidate == candidate.parent:
|
||
break
|
||
|
||
# Fallback: common paths
|
||
for path in [
|
||
Path("C:/Users/antoi/Atomizer"),
|
||
Path("/home/papa/repos/Atomizer"),
|
||
]:
|
||
if (path / "optimization_engine").is_dir():
|
||
return path
|
||
|
||
logger.warning("Could not find Atomizer root with optimization_engine/")
|
||
return None
|
||
|
||
def solve(self, trial: TrialInput) -> TrialResult:
|
||
"""Run a single trial through the NX pipeline.
|
||
|
||
Args:
|
||
trial: Design variable values.
|
||
|
||
Returns:
|
||
TrialResult with mass, displacement, stress (or failure info).
|
||
"""
|
||
self._iteration += 1
|
||
start_time = time.time()
|
||
|
||
# Build expression update dict
|
||
expressions: Dict[str, float] = {
|
||
EXPR_HALF_CORE_THICKNESS: trial.beam_half_core_thickness,
|
||
EXPR_FACE_THICKNESS: trial.beam_face_thickness,
|
||
EXPR_HOLES_DIAMETER: trial.holes_diameter,
|
||
EXPR_HOLE_COUNT: float(trial.hole_count), # NX expects float in .exp
|
||
}
|
||
|
||
logger.info(
|
||
"Trial %d: core=%.2f face=%.2f dia=%.1f count=%d",
|
||
self._iteration,
|
||
trial.beam_half_core_thickness,
|
||
trial.beam_face_thickness,
|
||
trial.holes_diameter,
|
||
trial.hole_count,
|
||
)
|
||
|
||
# Create iteration output folder inside the study
|
||
iter_dir = self.iterations_dir / f"iter{self._iteration:03d}"
|
||
iter_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
try:
|
||
# Step 1: Solve directly on master model (no file copying)
|
||
# NX file references stay intact — expressions updated in-place by journal
|
||
sim_file = self.sim_file
|
||
prt_file = self.prt_file
|
||
|
||
# Save trial params to iteration folder
|
||
import json
|
||
params_file = iter_dir / "params.json"
|
||
params_file.write_text(json.dumps({
|
||
"iteration": self._iteration,
|
||
"expressions": expressions,
|
||
"trial_input": {
|
||
"beam_half_core_thickness": trial.beam_half_core_thickness,
|
||
"beam_face_thickness": trial.beam_face_thickness,
|
||
"holes_diameter": trial.holes_diameter,
|
||
"hole_count": trial.hole_count,
|
||
},
|
||
}, indent=2))
|
||
|
||
# Step 2: Run NX journal (update expressions + solve) on master model
|
||
solve_result = self._nx_solver.run_simulation(
|
||
sim_file=sim_file,
|
||
working_dir=self.model_dir,
|
||
expression_updates=expressions,
|
||
)
|
||
|
||
if not solve_result.get("success", False):
|
||
errors = solve_result.get("errors", ["Unknown solver error"])
|
||
return TrialResult(
|
||
success=False,
|
||
solve_time=time.time() - start_time,
|
||
error_message=f"NX solve failed: {'; '.join(errors)}",
|
||
iteration_dir=str(iter_dir),
|
||
)
|
||
|
||
op2_file = solve_result.get("op2_file")
|
||
if not op2_file or not Path(op2_file).exists():
|
||
return TrialResult(
|
||
success=False,
|
||
solve_time=time.time() - start_time,
|
||
error_message="OP2 file not generated after solve",
|
||
iteration_dir=str(iter_dir),
|
||
)
|
||
|
||
op2_path = Path(op2_file)
|
||
|
||
# Step 3: Extract mass from journal temp file
|
||
try:
|
||
mass_kg = self._extract_mass(prt_file, expression_name=EXPR_MASS)
|
||
except Exception as e:
|
||
logger.warning("Mass extraction failed: %s", e)
|
||
mass_kg = float("nan")
|
||
|
||
# Step 4: Extract displacement from OP2
|
||
try:
|
||
disp_result = self._extract_displacement(op2_path)
|
||
# For cantilever beam, max displacement IS tip displacement
|
||
tip_displacement = disp_result["max_displacement"]
|
||
except Exception as e:
|
||
logger.warning("Displacement extraction failed: %s", e)
|
||
tip_displacement = float("nan")
|
||
|
||
# Step 5: Extract max von Mises stress from OP2
|
||
# Use shell element extraction (CQUAD4 mesh)
|
||
try:
|
||
stress_result = self._extract_stress(
|
||
op2_path,
|
||
element_type="cquad4",
|
||
convert_to_mpa=True, # ⚠️ LAC lesson: NX outputs kPa, must convert
|
||
)
|
||
max_vm_stress = stress_result["max_von_mises"]
|
||
except Exception as e:
|
||
logger.warning("Stress extraction failed: %s", e)
|
||
max_vm_stress = float("nan")
|
||
|
||
# Step 6: Copy solver outputs to iteration folder for archival
|
||
self._archive_iteration(iter_dir, op2_path, mass_kg, tip_displacement, max_vm_stress)
|
||
|
||
elapsed = time.time() - start_time
|
||
logger.info(
|
||
"Trial %d complete: mass=%.2f kg, disp=%.3f mm, stress=%.1f MPa (%.1fs)",
|
||
self._iteration, mass_kg, tip_displacement, max_vm_stress, elapsed,
|
||
)
|
||
|
||
return TrialResult(
|
||
success=True,
|
||
mass=mass_kg,
|
||
tip_displacement=tip_displacement,
|
||
max_von_mises=max_vm_stress,
|
||
solve_time=elapsed,
|
||
iteration_dir=str(iter_dir),
|
||
)
|
||
|
||
except Exception as e:
|
||
elapsed = time.time() - start_time
|
||
logger.error("Trial %d failed: %s", self._iteration, e, exc_info=True)
|
||
return TrialResult(
|
||
success=False,
|
||
solve_time=elapsed,
|
||
error_message=str(e),
|
||
iteration_dir=str(iter_dir) if 'iter_dir' in locals() else None,
|
||
)
|
||
|
||
def _archive_iteration(
|
||
self,
|
||
iter_dir: Path,
|
||
op2_path: Path,
|
||
mass: float,
|
||
displacement: float,
|
||
stress: float,
|
||
) -> None:
|
||
"""Copy solver outputs to iteration folder for archival.
|
||
|
||
Keeps the models/ directory clean — solver outputs go to the study's
|
||
iterations/ folder. Each iteration gets: OP2, F06, mass file, and
|
||
a results summary JSON.
|
||
"""
|
||
import json
|
||
import shutil
|
||
|
||
# Copy OP2 and F06 files
|
||
for suffix in [".op2", ".f06", ".log"]:
|
||
src = op2_path.with_suffix(suffix)
|
||
if src.exists():
|
||
try:
|
||
shutil.copy2(src, iter_dir / src.name)
|
||
except Exception as e:
|
||
logger.warning("Could not copy %s: %s", src.name, e)
|
||
|
||
# Copy mass temp file if it exists
|
||
for fname in ["_temp_mass.txt", "_temp_part_properties.json"]:
|
||
src = self.model_dir / fname
|
||
if src.exists():
|
||
try:
|
||
shutil.copy2(src, iter_dir / fname)
|
||
except Exception as e:
|
||
logger.warning("Could not copy %s: %s", fname, e)
|
||
|
||
# Write results summary
|
||
results_file = iter_dir / "results.json"
|
||
results_file.write_text(json.dumps({
|
||
"mass_kg": mass,
|
||
"tip_displacement_mm": displacement,
|
||
"max_von_mises_mpa": stress,
|
||
"op2_file": op2_path.name,
|
||
}, indent=2))
|
||
|
||
logger.info("Archived iteration %d to %s", self._iteration, iter_dir.name)
|
||
|
||
def close(self) -> None:
|
||
"""Clean up NX solver resources."""
|
||
logger.info("AtomizerNXSolver closed. %d iterations completed.", self._iteration)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stub solver — for development/testing without NX
|
||
# ---------------------------------------------------------------------------
|
||
class StubSolver:
|
||
"""Synthetic solver for testing without NX.
|
||
|
||
Generates physically-plausible approximate results based on
|
||
beam theory. NOT accurate — only for pipeline validation.
|
||
"""
|
||
|
||
def __init__(self, **kwargs: Any):
|
||
self._call_count = 0
|
||
logger.warning(
|
||
"Using NX STUB solver — results are synthetic approximations. "
|
||
"Replace with AtomizerNXSolver (--backend nxopen) for real evaluations."
|
||
)
|
||
|
||
def solve(self, trial: TrialInput) -> TrialResult:
|
||
"""Generate approximate results from beam theory.
|
||
|
||
Uses simplified cantilever beam formulas:
|
||
- Mass ∝ cross-section area × length - hole_volume
|
||
- Displacement ~ PL³/3EI (Euler-Bernoulli)
|
||
- Stress ~ Mc/I (nominal) with hole SCF
|
||
"""
|
||
self._call_count += 1
|
||
import numpy as np
|
||
|
||
# Geometry (mm)
|
||
L = 5000.0 # beam length
|
||
h_half = 250.0 # beam half-height (fixed)
|
||
w_half = 150.0 # beam half-width (fixed)
|
||
h_core = trial.beam_half_core_thickness
|
||
t_face = trial.beam_face_thickness
|
||
d_hole = trial.holes_diameter
|
||
n_hole = trial.hole_count
|
||
|
||
# Material: AISI 1005
|
||
E = 205000.0 # MPa (Young's modulus)
|
||
rho = 7.3e-6 # kg/mm³ (7.3 g/cm³)
|
||
|
||
# I-beam cross-section second moment of area (approximate)
|
||
# Full section: 2*w_half × 2*h_half rectangle
|
||
# Minus core cutouts (simplified)
|
||
H = 2 * h_half # 500 mm total height
|
||
W = 2 * w_half # 300 mm total width
|
||
I_full = W * H**3 / 12
|
||
# Subtract inner rectangle (core region without faces)
|
||
h_web = H - 2 * t_face
|
||
w_web = W - 2 * h_core # approximate
|
||
I_inner = max(0, w_web) * max(0, h_web)**3 / 12
|
||
I_eff = max(I_full - I_inner, I_full * 0.01) # don't go to zero
|
||
|
||
# Cross-section area (approximate)
|
||
A_section = W * H - max(0, w_web) * max(0, h_web)
|
||
# Hole volume removal
|
||
web_height = H - 2 * t_face
|
||
hole_area = n_hole * np.pi * (d_hole / 2)**2
|
||
# Only remove from web if holes fit
|
||
if d_hole < web_height:
|
||
effective_hole_area = min(hole_area, 0.8 * web_height * 4000)
|
||
else:
|
||
effective_hole_area = 0
|
||
# Mass
|
||
vol = A_section * L - effective_hole_area * min(h_core * 2, 50)
|
||
mass = max(rho * vol, 1.0)
|
||
|
||
# Tip displacement: δ = PL³ / 3EI
|
||
P = 10000 * 9.80665 # 10,000 kgf → N
|
||
delta = P * L**3 / (3 * E * I_eff)
|
||
|
||
# Stress: σ = M*c/I with SCF from holes
|
||
M = P * L # max moment at fixed end
|
||
c = h_half # distance to extreme fiber
|
||
sigma_nominal = M * c / I_eff / 1000 # kPa → MPa
|
||
# Stress concentration from holes (simplified)
|
||
scf = 1.0 + 0.5 * (d_hole / (web_height + 1))
|
||
sigma_max = sigma_nominal * scf
|
||
|
||
# Add noise (±5%) to simulate model variability
|
||
rng = np.random.default_rng(self._call_count)
|
||
noise = rng.uniform(0.95, 1.05, 3)
|
||
|
||
return TrialResult(
|
||
success=True,
|
||
mass=float(mass * noise[0]),
|
||
tip_displacement=float(delta * noise[1]),
|
||
max_von_mises=float(sigma_max * noise[2]),
|
||
solve_time=0.1,
|
||
)
|
||
|
||
def close(self) -> None:
|
||
"""Clean up stub solver."""
|
||
logger.info("Stub solver closed.")
|