Major improvements to telescope mirror optimization workflow: Assembly FEM Workflow (solve_simulation.py): - Fixed multi-part assembly FEM update sequence - Use ImportFromFile() for reliable expression updates - Add DuplicateNodesCheckBuilder with MergeOccurrenceNodes=True - Switch to Foreground solve mode for multi-subcase solutions - Add detailed logging and diagnostics for node merge operations Zernike RMS Calculation: - CRITICAL FIX: Use correct surface-based RMS formula - Global RMS = sqrt(mean(W^2)) from actual WFE values - Filtered RMS = sqrt(mean(W_residual^2)) after removing low-order fit - This matches zernike_Post_Script_NX.py (optical standard) - Previous WRONG formula was: sqrt(sum(coeffs^2)) - Add compute_rms_filter_j1to3() for optician workload metric Subcase Mapping: - Fix subcase mapping to match NX model: - Subcase 1 = 90 deg (polishing orientation) - Subcase 2 = 20 deg (reference) - Subcase 3 = 40 deg - Subcase 4 = 60 deg New Study: M1 Mirror Zernike Optimization - Full optimization config with 11 design variables - 3 objectives: rel_filtered_rms_40_vs_20, rel_filtered_rms_60_vs_20, mfg_90_optician_workload - Neural surrogate support for accelerated optimization Documentation: - Update ZERNIKE_INTEGRATION.md with correct RMS formula - Update ASSEMBLY_FEM_WORKFLOW.md with expression import and node merge details - Add reference scripts from original zernike_Post_Script_NX.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
333 lines
12 KiB
Python
333 lines
12 KiB
Python
# nx_post_each_iter.py
|
|
import os, subprocess
|
|
import NXOpen
|
|
from datetime import datetime
|
|
import csv, re
|
|
|
|
# --- SETTINGS ---
|
|
TEST_ENV_PY = r"C:\Users\antoi\anaconda3\envs\test_env\python.exe"
|
|
SCRIPT_NAME = "zernike_Post_Script_NX.py" # your script in the .sim folder
|
|
OP2_NAME = "assy_m1_assyfem1_sim1-solution_1.op2"
|
|
EXP_NAME = "Iteration_results_expression.exp"
|
|
TIMEOUT = None # e.g., 900 for 15 min
|
|
# Option A: set via env NX_GEOM_PART_NAME, else hardcode your CAD part name here.
|
|
GEOM_PART_NAME = os.environ.get("NX_GEOM_PART_NAME", "ASSY_M1_assyfem1")
|
|
|
|
# ---------------
|
|
|
|
def import_iteration_results_exp(exp_path: str, lw) -> bool:
|
|
"""Import EXP into current Work part (Replace) and update."""
|
|
theSession = NXOpen.Session.GetSession()
|
|
workPart = theSession.Parts.BaseWork
|
|
|
|
if not os.path.isfile(exp_path):
|
|
lw.WriteLine(f"[EXP][ERROR] File not found: {exp_path}")
|
|
return False
|
|
|
|
mark_import = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Import Expressions")
|
|
try:
|
|
modified, err_msgs = workPart.Expressions.ImportFromFile(
|
|
exp_path, NXOpen.ExpressionCollection.ImportMode.Replace
|
|
)
|
|
# surface any parsing messages
|
|
try:
|
|
if err_msgs:
|
|
for m in err_msgs:
|
|
lw.WriteLine(f"[EXP][WARN] {m}")
|
|
except Exception:
|
|
pass
|
|
|
|
mark_update = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "NX update")
|
|
nErrs = theSession.UpdateManager.DoUpdate(mark_update)
|
|
theSession.DeleteUndoMark(mark_update, "NX update")
|
|
|
|
theSession.SetUndoMarkName(mark_import, "Expressions")
|
|
theSession.DeleteUndoMark(mark_import, None)
|
|
|
|
lw.WriteLine(f"[EXP] Imported OK (modified={modified}, nErrs={nErrs})")
|
|
return True
|
|
except Exception as ex:
|
|
lw.WriteLine(f"[EXP][FATAL] {ex}")
|
|
try:
|
|
theSession.DeleteUndoMark(mark_import, None)
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def export_all_named_expressions_to_exp(workPart, out_path, lw):
|
|
"""
|
|
Export expressions to an .exp file using the 3-arg signature:
|
|
ExportToFile(<mode>, <filepath>, <sort_type>)
|
|
Works across NX versions where enums live under either:
|
|
NXOpen.ExpressionCollection.ExportMode / SortType
|
|
or
|
|
NXOpen.ExpressionCollectionExportMode / ExpressionCollectionSortType
|
|
"""
|
|
try:
|
|
if not out_path.lower().endswith(".exp"):
|
|
out_path += ".exp"
|
|
|
|
mode_cls = getattr(NXOpen.ExpressionCollection, "ExportMode",
|
|
getattr(NXOpen, "ExpressionCollectionExportMode", None))
|
|
sort_cls = getattr(NXOpen.ExpressionCollection, "SortType",
|
|
getattr(NXOpen, "ExpressionCollectionSortType", None))
|
|
if mode_cls is None or sort_cls is None:
|
|
raise RuntimeError("Unsupported NX/Open version: ExportMode/SortType enums not found")
|
|
|
|
workPart.Expressions.ExportToFile(mode_cls.WorkPart, out_path, sort_cls.AlphaNum)
|
|
lw.WriteLine(f"[EXP-EXPORT] Wrote: {out_path}")
|
|
return True
|
|
except Exception as ex:
|
|
lw.WriteLine(f"[EXP-EXPORT][ERROR] {ex}")
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def parse_exp_file_to_dict(exp_path):
|
|
"""
|
|
Parse NX .exp lines like:
|
|
// comments
|
|
[MilliMeter]SomeName=0.001234
|
|
SomeOther=42
|
|
into { 'SomeName': numeric_or_str, ... }.
|
|
"""
|
|
out = {}
|
|
num = re.compile(r'^[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?$')
|
|
|
|
with open(exp_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
for line in f:
|
|
s = line.strip()
|
|
if not s or s.startswith('//'):
|
|
continue
|
|
if '=' not in s:
|
|
continue
|
|
left, right = s.split('=', 1)
|
|
# strip optional [Unit] prefixes on left side
|
|
left = re.sub(r'\[[^\]]*\]\s*', '', left).strip()
|
|
key = left
|
|
val = right.strip()
|
|
|
|
# try numeric
|
|
v = val
|
|
if num.match(val):
|
|
try:
|
|
v = float(val)
|
|
except Exception:
|
|
pass
|
|
out[key] = v
|
|
return out
|
|
|
|
def append_named_exprs_row(results_dir, run_id, run_dt, expr_dict, lw, source, part_name):
|
|
"""
|
|
Appends one row to Results/NX_named_expressions_log.csv
|
|
Columns auto-extend for new expression names.
|
|
Adds metadata: RunID, RunDateTimeLocal, Source ('SIM'|'PART'), PartName.
|
|
"""
|
|
log_csv = os.path.join(results_dir, "NX_named_expressions_log.csv")
|
|
meta = {
|
|
"RunID": run_id,
|
|
"RunDateTimeLocal": run_dt.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"Source": source,
|
|
"PartName": part_name,
|
|
}
|
|
row = {**meta, **expr_dict}
|
|
|
|
# Create or extend header as needed
|
|
if not os.path.exists(log_csv):
|
|
fieldnames = list(meta.keys()) + sorted(expr_dict.keys())
|
|
with open(log_csv, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=fieldnames)
|
|
w.writeheader()
|
|
w.writerow({k: row.get(k, "") for k in fieldnames})
|
|
lw.WriteLine(f"[EXP-EXPORT] Created CSV log: {log_csv}")
|
|
return
|
|
|
|
with open(log_csv, "r", newline="", encoding="utf-8") as f:
|
|
r = csv.reader(f)
|
|
existing = list(r)
|
|
|
|
if not existing:
|
|
fieldnames = list(meta.keys()) + sorted(expr_dict.keys())
|
|
with open(log_csv, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=fieldnames)
|
|
w.writeheader()
|
|
w.writerow({k: row.get(k, "") for k in fieldnames})
|
|
lw.WriteLine(f"[EXP-EXPORT] Rebuilt CSV log: {log_csv}")
|
|
return
|
|
|
|
header = existing[0]
|
|
known = set(header)
|
|
new_cols = [c for c in meta.keys() if c not in known] + \
|
|
sorted([k for k in expr_dict.keys() if k not in known])
|
|
if new_cols:
|
|
header = header + new_cols
|
|
|
|
with open(log_csv, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=header)
|
|
w.writeheader()
|
|
# Rewrite old rows (padding any new columns)
|
|
for data in existing[1:]:
|
|
old_row = {h: (data[i] if i < len(data) else "") for i, h in enumerate(existing[0])}
|
|
for c in new_cols:
|
|
old_row.setdefault(c, "")
|
|
w.writerow({k: old_row.get(k, "") for k in header})
|
|
# Append new row
|
|
w.writerow({k: row.get(k, "") for k in header})
|
|
|
|
lw.WriteLine(f"[EXP-EXPORT] Appended CSV log: {log_csv}")
|
|
|
|
def export_geometry_named_expressions(sim_part, results_dir, run_id, lw):
|
|
"""
|
|
Switch display to the geometry part (like in your journal), export expressions, then restore.
|
|
GEOM_PART_NAME must be resolvable via Session.Parts.FindObject.
|
|
"""
|
|
theSession = NXOpen.Session.GetSession()
|
|
original_display = theSession.Parts.BaseDisplay
|
|
original_work = theSession.Parts.BaseWork
|
|
|
|
try:
|
|
if not GEOM_PART_NAME:
|
|
lw.WriteLine("[EXP-EXPORT][WARN] GEOM_PART_NAME not set; skipping geometry export.")
|
|
return False, None, None
|
|
|
|
try:
|
|
part1 = theSession.Parts.FindObject(GEOM_PART_NAME)
|
|
except Exception:
|
|
lw.WriteLine(f"[EXP-EXPORT][WARN] Geometry part not found by name: {GEOM_PART_NAME}")
|
|
return False, None, None
|
|
|
|
mark = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Change Displayed Part")
|
|
status, pls = theSession.Parts.SetActiveDisplay(
|
|
part1,
|
|
NXOpen.DisplayPartOption.AllowAdditional,
|
|
NXOpen.PartDisplayPartWorkPartOption.UseLast
|
|
)
|
|
# Switch to Modeling, like your journal
|
|
try:
|
|
theSession.ApplicationSwitchImmediate("UG_APP_MODELING")
|
|
except Exception:
|
|
pass
|
|
|
|
workPart = theSession.Parts.Work
|
|
out_exp = os.path.join(results_dir, f"NamedExpressions_PART_{run_id}.exp")
|
|
ok = export_all_named_expressions_to_exp(workPart, out_exp, lw)
|
|
|
|
if pls is not None:
|
|
pls.Dispose()
|
|
theSession.DeleteUndoMark(mark, None)
|
|
|
|
# Part name for logging
|
|
part_name = os.path.splitext(os.path.basename(workPart.FullPath))[0] if workPart and workPart.FullPath else GEOM_PART_NAME
|
|
return ok, out_exp if ok else None, part_name
|
|
|
|
except Exception as ex:
|
|
lw.WriteLine(f"[EXP-EXPORT][ERROR] Geometry export failed: {ex}")
|
|
return False, None, None
|
|
|
|
finally:
|
|
# Try to restore prior display/work part and CAE app
|
|
try:
|
|
if original_display is not None:
|
|
theSession.Parts.SetActiveDisplay(
|
|
original_display,
|
|
NXOpen.DisplayPartOption.AllowAdditional,
|
|
NXOpen.PartDisplayPartWorkPartOption.UseLast
|
|
)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
theSession.ApplicationSwitchImmediate("UG_APP_SFEM") # back to CAE if applicable
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
|
|
def run_post(sim_dir, lw, run_id, results_dir):
|
|
post_script = os.path.join(sim_dir, SCRIPT_NAME)
|
|
op2 = os.path.join(sim_dir, OP2_NAME)
|
|
|
|
if not os.path.exists(TEST_ENV_PY):
|
|
lw.WriteLine(f"[ERROR] test_env python not found: {TEST_ENV_PY}")
|
|
return 3
|
|
if not os.path.exists(post_script):
|
|
lw.WriteLine(f"[ERROR] Post script not found: {post_script}")
|
|
return 4
|
|
if not os.path.exists(op2):
|
|
lw.WriteLine(f"[ERROR] OP2 not found: {op2}")
|
|
return 2
|
|
|
|
cmd = [TEST_ENV_PY, post_script, "--op2", op2]
|
|
lw.WriteLine("[POST] " + " ".join(cmd))
|
|
lw.WriteLine(f"[POST] cwd={sim_dir}")
|
|
|
|
env = os.environ.copy()
|
|
env["ZERNIKE_RUN_ID"] = run_id
|
|
env["ZERNIKE_RESULTS_DIR"] = results_dir
|
|
|
|
proc = subprocess.run(
|
|
cmd, cwd=sim_dir, capture_output=True, text=True,
|
|
shell=False, timeout=TIMEOUT, env=env
|
|
)
|
|
if proc.stdout:
|
|
lw.WriteLine(proc.stdout)
|
|
if proc.stderr:
|
|
lw.WriteLine("[STDERR]\n" + proc.stderr)
|
|
lw.WriteLine(f"[INFO] Post finished (rc={proc.returncode})")
|
|
return proc.returncode
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
s = NXOpen.Session.GetSession()
|
|
lw = s.ListingWindow; lw.Open()
|
|
sim_part = s.Parts.BaseWork
|
|
sim_dir = os.path.dirname(sim_part.FullPath)
|
|
|
|
# --- New: Results folder + a run id/timestamp we can also hand to Zernike ---
|
|
results_dir = os.path.join(sim_dir, "Results")
|
|
os.makedirs(results_dir, exist_ok=True)
|
|
run_dt = datetime.now()
|
|
run_id = run_dt.strftime("%Y%m%d_%H%M%S")
|
|
|
|
# --- Run the Zernike post (hand it the same run id & results dir via env) ---
|
|
rc = run_post(sim_dir, lw, run_id, results_dir)
|
|
|
|
if rc != 0:
|
|
lw.WriteLine(f"[POST] Zernike post failed (rc={rc}). Skipping EXP import and NX expr logging.")
|
|
return # or 'pass' if you prefer to continue anyway
|
|
|
|
# Import EXP if it exists — prefer Results/, then fall back to the sim folder
|
|
exp_candidates = [
|
|
os.path.join(results_dir, EXP_NAME),
|
|
os.path.join(sim_dir, EXP_NAME),
|
|
]
|
|
for exp_path in exp_candidates:
|
|
if os.path.isfile(exp_path):
|
|
import_iteration_results_exp(exp_path, lw)
|
|
break
|
|
else:
|
|
lw.WriteLine(f"[EXP] Skipped: not found → {exp_candidates[0]}")
|
|
|
|
|
|
# --- Export SIM (work CAE part) expressions and append log ---
|
|
sim_part_name = os.path.splitext(os.path.basename(sim_part.FullPath))[0] if sim_part and sim_part.FullPath else "SIM"
|
|
named_exp_sim = os.path.join(results_dir, f"NamedExpressions_SIM_{run_id}.exp")
|
|
if export_all_named_expressions_to_exp(sim_part, named_exp_sim, lw):
|
|
exprs_sim = parse_exp_file_to_dict(named_exp_sim)
|
|
append_named_exprs_row(results_dir, run_id, run_dt, exprs_sim, lw, source="SIM", part_name=sim_part_name)
|
|
|
|
# --- Export GEOMETRY (modeling) part expressions like your journal, and append log ---
|
|
ok_part, part_exp_path, part_name = export_geometry_named_expressions(sim_part, results_dir, run_id, lw)
|
|
if ok_part and part_exp_path:
|
|
exprs_part = parse_exp_file_to_dict(part_exp_path)
|
|
append_named_exprs_row(results_dir, run_id, run_dt, exprs_part, lw, source="PART", part_name=part_name)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|