Revise spec to reserved-region FEM and add Phase 2 NX sandbox scripts
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
"""
|
||||
Adaptive Isogrid — NX Hands
|
||||
|
||||
NXOpen journal scripts for geometry extraction, AFEM setup, and per-iteration solve.
|
||||
These scripts run inside NX Simcenter via the NXOpen Python API.
|
||||
Reserved-region NXOpen scripts:
|
||||
- extract_sandbox.py: sandbox loop extraction to geometry JSON
|
||||
- import_profile.py: profile reimport and sandbox replacement
|
||||
- solve_and_extract.py: remesh, solve, and result export
|
||||
- run_iteration.py: one-iteration orchestrator
|
||||
"""
|
||||
|
||||
318
tools/adaptive-isogrid/src/nx/extract_sandbox.py
Normal file
318
tools/adaptive-isogrid/src/nx/extract_sandbox.py
Normal file
@@ -0,0 +1,318 @@
|
||||
"""
|
||||
NXOpen script — extract sandbox face geometry for Adaptive Isogrid.
|
||||
|
||||
Finds faces tagged with user attribute:
|
||||
ISOGRID_SANDBOX = sandbox_1, sandbox_2, ...
|
||||
|
||||
For each sandbox face, exports `geometry_<sandbox_id>.json` in the same schema
|
||||
expected by the Python Brain (`outer_boundary`, `holes`, transform metadata, etc.).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Sequence, Tuple
|
||||
|
||||
|
||||
Point3D = Tuple[float, float, float]
|
||||
Point2D = Tuple[float, float]
|
||||
|
||||
|
||||
@dataclass
|
||||
class LocalFrame:
|
||||
origin: Point3D
|
||||
x_axis: Point3D
|
||||
y_axis: Point3D
|
||||
normal: Point3D
|
||||
|
||||
|
||||
def _norm(v: Sequence[float]) -> float:
|
||||
return math.sqrt(sum(c * c for c in v))
|
||||
|
||||
|
||||
def _normalize(v: Sequence[float]) -> Tuple[float, float, float]:
|
||||
n = _norm(v)
|
||||
if n < 1e-12:
|
||||
return (0.0, 0.0, 1.0)
|
||||
return (v[0] / n, v[1] / n, v[2] / n)
|
||||
|
||||
|
||||
def _dot(a: Sequence[float], b: Sequence[float]) -> float:
|
||||
return a[0] * b[0] + a[1] * b[1] + a[2] * b[2]
|
||||
|
||||
|
||||
def _cross(a: Sequence[float], b: Sequence[float]) -> Tuple[float, float, float]:
|
||||
return (
|
||||
a[1] * b[2] - a[2] * b[1],
|
||||
a[2] * b[0] - a[0] * b[2],
|
||||
a[0] * b[1] - a[1] * b[0],
|
||||
)
|
||||
|
||||
|
||||
def _sub(a: Sequence[float], b: Sequence[float]) -> Tuple[float, float, float]:
|
||||
return (a[0] - b[0], a[1] - b[1], a[2] - b[2])
|
||||
|
||||
|
||||
def fit_circle(points: Sequence[Point2D]) -> Tuple[Point2D, float, float]:
|
||||
"""
|
||||
Least-squares circle fit.
|
||||
Returns (center, diameter, rms_error).
|
||||
"""
|
||||
if len(points) < 3:
|
||||
return ((0.0, 0.0), 0.0, float("inf"))
|
||||
|
||||
sx = sy = sxx = syy = sxy = 0.0
|
||||
sxxx = syyy = sxxy = sxyy = 0.0
|
||||
|
||||
for x, y in points:
|
||||
xx = x * x
|
||||
yy = y * y
|
||||
sx += x
|
||||
sy += y
|
||||
sxx += xx
|
||||
syy += yy
|
||||
sxy += x * y
|
||||
sxxx += xx * x
|
||||
syyy += yy * y
|
||||
sxxy += xx * y
|
||||
sxyy += x * yy
|
||||
|
||||
n = float(len(points))
|
||||
c = n * sxx - sx * sx
|
||||
d = n * sxy - sx * sy
|
||||
e = n * (sxxx + sxyy) - (sxx + syy) * sx
|
||||
g = n * syy - sy * sy
|
||||
h = n * (sxxy + syyy) - (sxx + syy) * sy
|
||||
denom = (c * g - d * d)
|
||||
|
||||
if abs(denom) < 1e-12:
|
||||
return ((0.0, 0.0), 0.0, float("inf"))
|
||||
|
||||
a = (h * d - e * g) / denom
|
||||
b = (h * c - e * d) / (d * d - g * c)
|
||||
cx = -a / 2.0
|
||||
cy = -b / 2.0
|
||||
r = math.sqrt(max((a * a + b * b) / 4.0 - (sx * sx + sy * sy - n * (sxx + syy)) / n, 0.0))
|
||||
|
||||
errs = []
|
||||
for x, y in points:
|
||||
errs.append(abs(math.hypot(x - cx, y - cy) - r))
|
||||
rms = math.sqrt(sum(e * e for e in errs) / len(errs)) if errs else float("inf")
|
||||
|
||||
return ((cx, cy), 2.0 * r, rms)
|
||||
|
||||
|
||||
def is_loop_circular(points2d: Sequence[Point2D], tol_mm: float = 0.5) -> Tuple[bool, Point2D, float]:
|
||||
center, dia, rms = fit_circle(points2d)
|
||||
return (rms <= tol_mm, center, dia)
|
||||
|
||||
|
||||
def project_to_2d(points3d: Sequence[Point3D], frame: LocalFrame) -> List[Point2D]:
|
||||
out: List[Point2D] = []
|
||||
for p in points3d:
|
||||
v = _sub(p, frame.origin)
|
||||
out.append((_dot(v, frame.x_axis), _dot(v, frame.y_axis)))
|
||||
return out
|
||||
|
||||
|
||||
def _close_polyline(points: List[Point3D]) -> List[Point3D]:
|
||||
if not points:
|
||||
return points
|
||||
if _norm(_sub(points[0], points[-1])) > 1e-6:
|
||||
points.append(points[0])
|
||||
return points
|
||||
|
||||
|
||||
def _sample_edge_polyline(edge: Any, chord_tol_mm: float) -> List[Point3D]:
|
||||
"""
|
||||
Sample an NX edge as a polyline.
|
||||
NOTE: NX APIs vary by curve type; this helper intentionally keeps fallback logic.
|
||||
"""
|
||||
# Preferred path: use evaluator where available.
|
||||
try:
|
||||
evaluator = edge.CreateEvaluator()
|
||||
t0, t1 = evaluator.GetLimits()
|
||||
length = edge.GetLength()
|
||||
n = max(2, int(length / max(chord_tol_mm, 1e-3)))
|
||||
pts: List[Point3D] = []
|
||||
for i in range(n + 1):
|
||||
t = t0 + (t1 - t0) * (i / n)
|
||||
p, _ = evaluator.Evaluate(t)
|
||||
pts.append((float(p.X), float(p.Y), float(p.Z)))
|
||||
return pts
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: edge vertices only (less accurate, but safe fallback).
|
||||
try:
|
||||
verts = edge.GetVertices()
|
||||
pts = []
|
||||
for v in verts:
|
||||
p = v.Coordinates
|
||||
pts.append((float(p.X), float(p.Y), float(p.Z)))
|
||||
return pts
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"Could not sample edge polyline: {exc}")
|
||||
|
||||
|
||||
def _face_local_frame(face: Any) -> LocalFrame:
|
||||
"""
|
||||
Build a stable local frame on a face:
|
||||
- origin: first loop first point
|
||||
- normal: face normal near origin
|
||||
- x/y axes: orthonormal basis on tangent plane
|
||||
"""
|
||||
loops = face.GetLoops()
|
||||
first_edge = loops[0].GetEdges()[0]
|
||||
sample = _sample_edge_polyline(first_edge, chord_tol_mm=1.0)[0]
|
||||
|
||||
# Try direct normal from face API.
|
||||
normal = (0.0, 0.0, 1.0)
|
||||
try:
|
||||
n = face.GetFaceNormal(sample[0], sample[1], sample[2])
|
||||
normal = _normalize((float(n.X), float(n.Y), float(n.Z)))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
ref = (1.0, 0.0, 0.0) if abs(normal[0]) < 0.95 else (0.0, 1.0, 0.0)
|
||||
x_axis = _normalize(_cross(ref, normal))
|
||||
y_axis = _normalize(_cross(normal, x_axis))
|
||||
return LocalFrame(origin=sample, x_axis=x_axis, y_axis=y_axis, normal=normal)
|
||||
|
||||
|
||||
def _get_string_attribute(obj: Any, title: str) -> str | None:
|
||||
try:
|
||||
return obj.GetStringUserAttribute(title, -1)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
return obj.GetUserAttributeAsString(title, -1)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def find_sandbox_faces(work_part: Any, attr_name: str = "ISOGRID_SANDBOX") -> List[Tuple[str, Any]]:
|
||||
tagged: List[Tuple[str, Any]] = []
|
||||
for body in getattr(work_part.Bodies, "ToArray", lambda: work_part.Bodies)():
|
||||
for face in body.GetFaces():
|
||||
sandbox_id = _get_string_attribute(face, attr_name)
|
||||
if sandbox_id and sandbox_id.startswith("sandbox_"):
|
||||
tagged.append((sandbox_id, face))
|
||||
return tagged
|
||||
|
||||
|
||||
def _extract_face_loops(face: Any, chord_tol_mm: float, frame: LocalFrame) -> Tuple[List[Point2D], List[Dict[str, Any]]]:
|
||||
outer_2d: List[Point2D] = []
|
||||
holes: List[Dict[str, Any]] = []
|
||||
|
||||
loops = face.GetLoops()
|
||||
for loop_index, loop in enumerate(loops):
|
||||
loop_pts3d: List[Point3D] = []
|
||||
for edge in loop.GetEdges():
|
||||
pts = _sample_edge_polyline(edge, chord_tol_mm)
|
||||
if loop_pts3d and pts:
|
||||
pts = pts[1:] # avoid duplicate joining point
|
||||
loop_pts3d.extend(pts)
|
||||
|
||||
loop_pts3d = _close_polyline(loop_pts3d)
|
||||
loop_pts2d = project_to_2d(loop_pts3d, frame)
|
||||
|
||||
is_outer = False
|
||||
try:
|
||||
is_outer = loop.IsOuter()
|
||||
except Exception:
|
||||
is_outer = (loop_index == 0)
|
||||
|
||||
if is_outer:
|
||||
outer_2d = loop_pts2d
|
||||
continue
|
||||
|
||||
is_circ, center, diameter = is_loop_circular(loop_pts2d)
|
||||
holes.append(
|
||||
{
|
||||
"index": len(holes),
|
||||
"boundary": [[x, y] for x, y in loop_pts2d],
|
||||
"center": [center[0], center[1]] if is_circ else None,
|
||||
"diameter": diameter if is_circ else None,
|
||||
"is_circular": bool(is_circ),
|
||||
"weight": 0.0,
|
||||
}
|
||||
)
|
||||
|
||||
return outer_2d, holes
|
||||
|
||||
|
||||
def extract_sandbox_geometry(face: Any, sandbox_id: str, chord_tol_mm: float = 0.1) -> Dict[str, Any]:
|
||||
frame = _face_local_frame(face)
|
||||
outer, holes = _extract_face_loops(face, chord_tol_mm=chord_tol_mm, frame=frame)
|
||||
|
||||
geom = {
|
||||
"units": "mm",
|
||||
"sandbox_id": sandbox_id,
|
||||
"outer_boundary": [[x, y] for x, y in outer],
|
||||
"holes": holes,
|
||||
"transform": {
|
||||
"origin": list(frame.origin),
|
||||
"x_axis": list(frame.x_axis),
|
||||
"y_axis": list(frame.y_axis),
|
||||
"normal": list(frame.normal),
|
||||
},
|
||||
}
|
||||
|
||||
# Optional thickness hint if available.
|
||||
try:
|
||||
geom["thickness"] = float(face.GetBody().GetThickness())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return geom
|
||||
|
||||
|
||||
def export_sandbox_geometries(output_dir: Path, geometries: Dict[str, Dict[str, Any]]) -> List[Path]:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
written: List[Path] = []
|
||||
for sandbox_id, payload in geometries.items():
|
||||
out = output_dir / f"geometry_{sandbox_id}.json"
|
||||
out.write_text(json.dumps(payload, indent=2))
|
||||
written.append(out)
|
||||
return written
|
||||
|
||||
|
||||
def run_in_nx(output_dir: Path, chord_tol_mm: float = 0.1) -> List[Path]:
|
||||
import NXOpen # type: ignore
|
||||
|
||||
session = NXOpen.Session.GetSession()
|
||||
work_part = session.Parts.Work
|
||||
if work_part is None:
|
||||
raise RuntimeError("No active NX work part.")
|
||||
|
||||
sandbox_faces = find_sandbox_faces(work_part)
|
||||
if not sandbox_faces:
|
||||
raise RuntimeError("No faces found with ISOGRID_SANDBOX attribute.")
|
||||
|
||||
payloads: Dict[str, Dict[str, Any]] = {}
|
||||
for sandbox_id, face in sandbox_faces:
|
||||
payloads[sandbox_id] = extract_sandbox_geometry(face, sandbox_id, chord_tol_mm=chord_tol_mm)
|
||||
|
||||
return export_sandbox_geometries(output_dir=output_dir, geometries=payloads)
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Extract NX sandbox face geometry to JSON")
|
||||
parser.add_argument("--output-dir", default=".", help="Directory for geometry_sandbox_*.json")
|
||||
parser.add_argument("--chord-tol", type=float, default=0.1, help="Edge sampling chord tolerance (mm)")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
out_dir = Path(args.output_dir)
|
||||
written = run_in_nx(output_dir=out_dir, chord_tol_mm=args.chord_tol)
|
||||
for p in written:
|
||||
print(f"[extract_sandbox] wrote: {p}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
163
tools/adaptive-isogrid/src/nx/import_profile.py
Normal file
163
tools/adaptive-isogrid/src/nx/import_profile.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""
|
||||
NXOpen script — import rib profile JSON and replace sandbox geometry.
|
||||
|
||||
Input:
|
||||
rib_profile_<sandbox_id>.json (or rib_profile.json)
|
||||
|
||||
Responsibilities:
|
||||
- Recreate closed polylines from profile coordinate arrays
|
||||
- Build sheet region for sandbox
|
||||
- Replace sandbox face geometry only
|
||||
- Sew/unite with neighboring reserved faces
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Sequence, Tuple
|
||||
|
||||
|
||||
Point2D = Tuple[float, float]
|
||||
Point3D = Tuple[float, float, float]
|
||||
|
||||
|
||||
def _add(a: Sequence[float], b: Sequence[float]) -> Point3D:
|
||||
return (a[0] + b[0], a[1] + b[1], a[2] + b[2])
|
||||
|
||||
|
||||
def _mul(v: Sequence[float], s: float) -> Point3D:
|
||||
return (v[0] * s, v[1] * s, v[2] * s)
|
||||
|
||||
|
||||
def load_json(path: Path) -> Dict[str, Any]:
|
||||
return json.loads(path.read_text())
|
||||
|
||||
|
||||
def map_2d_to_3d(p: Point2D, transform: Dict[str, List[float]]) -> Point3D:
|
||||
origin = transform["origin"]
|
||||
x_axis = transform["x_axis"]
|
||||
y_axis = transform["y_axis"]
|
||||
return _add(_add(origin, _mul(x_axis, p[0])), _mul(y_axis, p[1]))
|
||||
|
||||
|
||||
def _ensure_closed(coords: List[Point2D]) -> List[Point2D]:
|
||||
if not coords:
|
||||
return coords
|
||||
if coords[0] != coords[-1]:
|
||||
coords.append(coords[0])
|
||||
return coords
|
||||
|
||||
|
||||
def _create_polyline_curve(work_part: Any, pts3d: List[Point3D]) -> Any:
|
||||
"""
|
||||
Create a closed polyline curve in NX.
|
||||
API notes: this can be implemented with StudioSplineBuilderEx, PolygonBuilder,
|
||||
or line segments + composite curve depending on NX version/license.
|
||||
"""
|
||||
# Line-segment fallback (works in all NX versions)
|
||||
curves = []
|
||||
for i in range(len(pts3d) - 1):
|
||||
p1 = work_part.Points.CreatePoint(pts3d[i])
|
||||
p2 = work_part.Points.CreatePoint(pts3d[i + 1])
|
||||
curves.append(work_part.Curves.CreateLine(p1, p2))
|
||||
return curves
|
||||
|
||||
|
||||
def build_profile_curves(work_part: Any, profile: Dict[str, Any], transform: Dict[str, List[float]]) -> Dict[str, List[Any]]:
|
||||
created: Dict[str, List[Any]] = {"outer": [], "pockets": [], "holes": []}
|
||||
|
||||
outer = _ensure_closed([(float(x), float(y)) for x, y in profile["outer_boundary"]])
|
||||
outer_3d = [map_2d_to_3d(p, transform) for p in outer]
|
||||
created["outer"] = _create_polyline_curve(work_part, outer_3d)
|
||||
|
||||
for pocket in profile.get("pockets", []):
|
||||
coords = _ensure_closed([(float(x), float(y)) for x, y in pocket])
|
||||
pts3d = [map_2d_to_3d(p, transform) for p in coords]
|
||||
created["pockets"].extend(_create_polyline_curve(work_part, pts3d))
|
||||
|
||||
for hole in profile.get("hole_boundaries", []):
|
||||
coords = _ensure_closed([(float(x), float(y)) for x, y in hole])
|
||||
pts3d = [map_2d_to_3d(p, transform) for p in coords]
|
||||
created["holes"].extend(_create_polyline_curve(work_part, pts3d))
|
||||
|
||||
return created
|
||||
|
||||
|
||||
def _find_sandbox_face(work_part: Any, sandbox_id: str) -> Any:
|
||||
for body in getattr(work_part.Bodies, "ToArray", lambda: work_part.Bodies)():
|
||||
for face in body.GetFaces():
|
||||
try:
|
||||
tag = face.GetStringUserAttribute("ISOGRID_SANDBOX", -1)
|
||||
except Exception:
|
||||
tag = None
|
||||
if tag == sandbox_id:
|
||||
return face
|
||||
raise RuntimeError(f"Sandbox face not found for id={sandbox_id}")
|
||||
|
||||
|
||||
def replace_sandbox_face_geometry(work_part: Any, sandbox_face: Any, created_curves: Dict[str, List[Any]]) -> None:
|
||||
"""
|
||||
Replace sandbox surface region from generated profile curves.
|
||||
|
||||
This operation depends on the model topology and NX license package.
|
||||
Typical implementation:
|
||||
1) Build bounded plane/sheet from outer and inner loops
|
||||
2) Trim/split host face by new boundaries
|
||||
3) Delete old sandbox patch
|
||||
4) Sew new patch with reserved neighboring faces
|
||||
5) Unite if multiple sheet bodies are produced
|
||||
"""
|
||||
# Recommended implementation hook points.
|
||||
# - Through Curve Mesh / Bounded Plane builders in NXOpen.Features
|
||||
# - SewBuilder in NXOpen.Features
|
||||
# - DeleteFace + ReplaceFace in synchronous modeling toolkit
|
||||
raise NotImplementedError(
|
||||
"Sandbox face replacement is model-specific. Implement with NXOpen feature builders "
|
||||
"(bounded sheet + replace face + sew/unite) in target NX environment."
|
||||
)
|
||||
|
||||
|
||||
def run_in_nx(
|
||||
profile_path: Path,
|
||||
geometry_path: Path,
|
||||
sandbox_id: str,
|
||||
) -> None:
|
||||
import NXOpen # type: ignore
|
||||
|
||||
session = NXOpen.Session.GetSession()
|
||||
work_part = session.Parts.Work
|
||||
if work_part is None:
|
||||
raise RuntimeError("No active NX work part.")
|
||||
|
||||
profile = load_json(profile_path)
|
||||
geometry = load_json(geometry_path)
|
||||
transform = geometry.get("transform")
|
||||
if not transform:
|
||||
raise ValueError(f"Missing transform in {geometry_path}")
|
||||
|
||||
sandbox_face = _find_sandbox_face(work_part, sandbox_id)
|
||||
created_curves = build_profile_curves(work_part, profile, transform)
|
||||
replace_sandbox_face_geometry(work_part, sandbox_face, created_curves)
|
||||
|
||||
print(f"[import_profile] Imported profile for {sandbox_id}: {profile_path}")
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Import rib profile JSON into NX sandbox face")
|
||||
parser.add_argument("--profile", required=True, help="Path to rib_profile json")
|
||||
parser.add_argument("--geometry", required=True, help="Path to geometry_sandbox json")
|
||||
parser.add_argument("--sandbox-id", required=True, help="Sandbox id (e.g. sandbox_1)")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
run_in_nx(
|
||||
profile_path=Path(args.profile),
|
||||
geometry_path=Path(args.geometry),
|
||||
sandbox_id=args.sandbox_id,
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
120
tools/adaptive-isogrid/src/nx/run_iteration.py
Normal file
120
tools/adaptive-isogrid/src/nx/run_iteration.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
NXOpen orchestrator — single Adaptive Isogrid iteration.
|
||||
|
||||
Pipeline (inside NX batch or GUI session):
|
||||
1) extract_sandbox.py
|
||||
2) external Python Brain call
|
||||
3) import_profile.py
|
||||
4) solve_and_extract.py
|
||||
|
||||
This script is designed to be launched from external Python with subprocess,
|
||||
for example via run_journal.exe / NX batch mode.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List, Sequence
|
||||
|
||||
try:
|
||||
from .extract_sandbox import run_in_nx as run_extract_sandbox
|
||||
from .import_profile import run_in_nx as run_import_profile
|
||||
from .solve_and_extract import run_in_nx as run_solve_and_extract
|
||||
except ImportError:
|
||||
# NX run_journal often executes files as scripts (no package context).
|
||||
from extract_sandbox import run_in_nx as run_extract_sandbox # type: ignore
|
||||
from import_profile import run_in_nx as run_import_profile # type: ignore
|
||||
from solve_and_extract import run_in_nx as run_solve_and_extract # type: ignore
|
||||
|
||||
|
||||
def _detect_sandbox_ids_from_files(work_dir: Path) -> List[str]:
|
||||
ids = []
|
||||
for p in sorted(work_dir.glob("geometry_sandbox_*.json")):
|
||||
# geometry_sandbox_1.json -> sandbox_1
|
||||
suffix = p.stem.replace("geometry_", "", 1)
|
||||
ids.append(suffix)
|
||||
return ids
|
||||
|
||||
|
||||
def call_python_brain(brain_cmd: str, work_dir: Path, sandbox_ids: List[str]) -> None:
|
||||
"""
|
||||
Executes external Python Brain command.
|
||||
|
||||
The command should read geometry_sandbox_*.json from work_dir and write
|
||||
rib_profile_sandbox_*.json files in the same folder.
|
||||
"""
|
||||
env = None
|
||||
cmd = [brain_cmd, "--work-dir", str(work_dir)]
|
||||
for sid in sandbox_ids:
|
||||
cmd += ["--sandbox-id", sid]
|
||||
|
||||
result = subprocess.run(cmd, cwd=str(work_dir), capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(
|
||||
"Python Brain call failed\n"
|
||||
f"cmd: {' '.join(cmd)}\n"
|
||||
f"stdout:\n{result.stdout}\n"
|
||||
f"stderr:\n{result.stderr}"
|
||||
)
|
||||
|
||||
|
||||
def run_iteration(
|
||||
work_dir: Path,
|
||||
brain_cmd: str,
|
||||
solution_name: str | None = None,
|
||||
csv_fallback: bool = False,
|
||||
) -> Path:
|
||||
work_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 1) Extract all sandbox geometries from active NX model.
|
||||
run_extract_sandbox(output_dir=work_dir, chord_tol_mm=0.1)
|
||||
sandbox_ids = _detect_sandbox_ids_from_files(work_dir)
|
||||
if not sandbox_ids:
|
||||
raise RuntimeError("No geometry_sandbox_*.json files generated.")
|
||||
|
||||
# 2) Call external Brain.
|
||||
call_python_brain(brain_cmd=brain_cmd, work_dir=work_dir, sandbox_ids=sandbox_ids)
|
||||
|
||||
# 3) Import each rib profile back into NX.
|
||||
for sid in sandbox_ids:
|
||||
geom_path = work_dir / f"geometry_{sid}.json"
|
||||
profile_path = work_dir / f"rib_profile_{sid}.json"
|
||||
if not profile_path.exists():
|
||||
# fallback name used by some scripts
|
||||
profile_path = work_dir / "rib_profile.json"
|
||||
run_import_profile(profile_path=profile_path, geometry_path=geom_path, sandbox_id=sid)
|
||||
|
||||
# 4) Remesh + solve + extract results.
|
||||
results_path = work_dir / "results.json"
|
||||
run_solve_and_extract(
|
||||
work_dir=work_dir,
|
||||
result_path=results_path,
|
||||
sandbox_ids=sandbox_ids,
|
||||
solution_name=solution_name,
|
||||
use_csv_fallback=csv_fallback,
|
||||
)
|
||||
return results_path
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Run one adaptive-isogrid NX iteration")
|
||||
parser.add_argument("--work-dir", required=True, help="Iteration working directory")
|
||||
parser.add_argument("--brain-cmd", required=True, help="Executable/command for external Python Brain")
|
||||
parser.add_argument("--solution", default=None, help="Optional NX solution name")
|
||||
parser.add_argument("--csv-fallback", action="store_true", help="Use CSV fallback for results extraction")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
results_path = run_iteration(
|
||||
work_dir=Path(args.work_dir),
|
||||
brain_cmd=args.brain_cmd,
|
||||
solution_name=args.solution,
|
||||
csv_fallback=args.csv_fallback,
|
||||
)
|
||||
print(f"[run_iteration] completed -> {results_path}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
166
tools/adaptive-isogrid/src/nx/solve_and_extract.py
Normal file
166
tools/adaptive-isogrid/src/nx/solve_and_extract.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
NXOpen script — remesh full plate, solve, and export results.json.
|
||||
|
||||
Outputs:
|
||||
{
|
||||
"nodes_xy": [[x, y], ...],
|
||||
"stress_values": [...],
|
||||
"disp_values": [...],
|
||||
"strain_values": [...],
|
||||
"mass": 0.0
|
||||
}
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Sequence, Tuple
|
||||
|
||||
|
||||
def remesh_full_plate(sim_part: Any) -> None:
|
||||
"""
|
||||
Trigger mesh regeneration for all FEM collectors associated with the plate.
|
||||
"""
|
||||
# This hook should iterate mesh managers/collectors in your SIM/FEM template.
|
||||
# API often used: FEModel.UpdateFemodel(), mesh manager GenerateMesh/Update.
|
||||
fe_model = sim_part.FindObject("FEModel")
|
||||
fe_model.UpdateFemodel()
|
||||
|
||||
|
||||
def solve_active_solution(sim_part: Any, solution_name: str | None = None) -> Any:
|
||||
"""
|
||||
Solve the requested solution (or first available).
|
||||
"""
|
||||
import NXOpen # type: ignore
|
||||
import NXOpen.CAE # type: ignore
|
||||
|
||||
simulation = sim_part.FindObject("Simulation")
|
||||
|
||||
target_solution = None
|
||||
if solution_name:
|
||||
try:
|
||||
target_solution = simulation.FindObject(f"Solution[{solution_name}]")
|
||||
except Exception:
|
||||
target_solution = None
|
||||
|
||||
if target_solution is None:
|
||||
target_solution = simulation.FindObject("Solution[Solution 1]")
|
||||
|
||||
solve_mgr = NXOpen.CAE.SimSolveManager.GetSimSolveManager(NXOpen.Session.GetSession())
|
||||
solve_mgr.SubmitSolves([target_solution])
|
||||
return target_solution
|
||||
|
||||
|
||||
def _parse_csv_results(path: Path) -> Tuple[List[List[float]], List[float]]:
|
||||
"""
|
||||
Parse generic CSV with columns: x,y,value (header-insensitive).
|
||||
"""
|
||||
coords: List[List[float]] = []
|
||||
values: List[float] = []
|
||||
with path.open(newline="") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
keys = {k.lower(): k for k in row.keys()}
|
||||
x = float(row[keys.get("x", keys.get("x_mm", "x"))])
|
||||
y = float(row[keys.get("y", keys.get("y_mm", "y"))])
|
||||
v = float(row[keys.get("value", keys.get("von_mises", "value"))])
|
||||
coords.append([x, y])
|
||||
values.append(v)
|
||||
return coords, values
|
||||
|
||||
|
||||
def extract_results_nxopen(sim_part: Any, sandbox_ids: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
Preferred extractor: NXOpen post-processing API.
|
||||
|
||||
NOTE: Result object names/components vary by template. Keep this function as the
|
||||
primary integration point for project-specific API wiring.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"Wire NXOpen post-processing calls here (nodal stress/displacement, elemental strain, mass)."
|
||||
)
|
||||
|
||||
|
||||
def extract_results_csv_fallback(work_dir: Path) -> Dict[str, Any]:
|
||||
"""
|
||||
Fallback extractor: parse Simcenter-exported CSV files in work_dir.
|
||||
|
||||
Expected files:
|
||||
- nodal_stress.csv
|
||||
- nodal_disp.csv
|
||||
- elemental_strain.csv
|
||||
- mass.json (optional: {"mass": ...})
|
||||
"""
|
||||
stress_coords, stress_vals = _parse_csv_results(work_dir / "nodal_stress.csv")
|
||||
disp_coords, disp_vals = _parse_csv_results(work_dir / "nodal_disp.csv")
|
||||
_, strain_vals = _parse_csv_results(work_dir / "elemental_strain.csv")
|
||||
|
||||
# Use stress nodal coordinates as canonical nodes_xy
|
||||
nodes_xy = stress_coords if stress_coords else disp_coords
|
||||
|
||||
mass = 0.0
|
||||
mass_file = work_dir / "mass.json"
|
||||
if mass_file.exists():
|
||||
mass = float(json.loads(mass_file.read_text()).get("mass", 0.0))
|
||||
|
||||
return {
|
||||
"nodes_xy": nodes_xy,
|
||||
"stress_values": stress_vals,
|
||||
"disp_values": disp_vals,
|
||||
"strain_values": strain_vals,
|
||||
"mass": mass,
|
||||
}
|
||||
|
||||
|
||||
def run_in_nx(
|
||||
work_dir: Path,
|
||||
result_path: Path,
|
||||
sandbox_ids: List[str],
|
||||
solution_name: str | None = None,
|
||||
use_csv_fallback: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
import NXOpen # type: ignore
|
||||
|
||||
session = NXOpen.Session.GetSession()
|
||||
sim_part = session.Parts.BaseWork
|
||||
if sim_part is None:
|
||||
raise RuntimeError("No active NX SIM/FEM work part.")
|
||||
|
||||
remesh_full_plate(sim_part)
|
||||
solve_active_solution(sim_part, solution_name=solution_name)
|
||||
|
||||
if use_csv_fallback:
|
||||
results = extract_results_csv_fallback(work_dir)
|
||||
else:
|
||||
results = extract_results_nxopen(sim_part, sandbox_ids)
|
||||
|
||||
result_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
result_path.write_text(json.dumps(results, indent=2))
|
||||
print(f"[solve_and_extract] wrote {result_path}")
|
||||
return results
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Remesh, solve, and export results.json")
|
||||
parser.add_argument("--work-dir", default=".", help="Working directory for CSV fallback artifacts")
|
||||
parser.add_argument("--results", default="results.json", help="Output JSON path")
|
||||
parser.add_argument("--sandbox-id", action="append", default=[], help="Sandbox id filter (repeatable)")
|
||||
parser.add_argument("--solution", default=None, help="NX solution name (defaults to Solution 1)")
|
||||
parser.add_argument("--csv-fallback", action="store_true", help="Parse CSV files instead of NXOpen post API")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
run_in_nx(
|
||||
work_dir=Path(args.work_dir),
|
||||
result_path=Path(args.results),
|
||||
sandbox_ids=args.sandbox_id,
|
||||
solution_name=args.solution,
|
||||
use_csv_fallback=args.csv_fallback,
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user