# nx_post_each_iter.py import os, subprocess import NXOpen from datetime import datetime import csv, re # --- SETTINGS --- TEST_ENV_PY = r"C:\Users\antoi\anaconda3\envs\test_env\python.exe" SCRIPT_NAME = "zernike_Post_Script_NX.py" # your script in the .sim folder OP2_NAME = "assy_m1_assyfem1_sim1-solution_1.op2" EXP_NAME = "Iteration_results_expression.exp" TIMEOUT = None # e.g., 900 for 15 min # Option A: set via env NX_GEOM_PART_NAME, else hardcode your CAD part name here. GEOM_PART_NAME = os.environ.get("NX_GEOM_PART_NAME", "ASSY_M1_assyfem1") # --------------- def import_iteration_results_exp(exp_path: str, lw) -> bool: """Import EXP into current Work part (Replace) and update.""" theSession = NXOpen.Session.GetSession() workPart = theSession.Parts.BaseWork if not os.path.isfile(exp_path): lw.WriteLine(f"[EXP][ERROR] File not found: {exp_path}") return False mark_import = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Import Expressions") try: modified, err_msgs = workPart.Expressions.ImportFromFile( exp_path, NXOpen.ExpressionCollection.ImportMode.Replace ) # surface any parsing messages try: if err_msgs: for m in err_msgs: lw.WriteLine(f"[EXP][WARN] {m}") except Exception: pass mark_update = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "NX update") nErrs = theSession.UpdateManager.DoUpdate(mark_update) theSession.DeleteUndoMark(mark_update, "NX update") theSession.SetUndoMarkName(mark_import, "Expressions") theSession.DeleteUndoMark(mark_import, None) lw.WriteLine(f"[EXP] Imported OK (modified={modified}, nErrs={nErrs})") return True except Exception as ex: lw.WriteLine(f"[EXP][FATAL] {ex}") try: theSession.DeleteUndoMark(mark_import, None) except Exception: pass return False def export_all_named_expressions_to_exp(workPart, out_path, lw): """ Export expressions to an .exp file using the 3-arg signature: ExportToFile(, , ) Works across NX versions where enums live under either: NXOpen.ExpressionCollection.ExportMode / SortType or NXOpen.ExpressionCollectionExportMode / ExpressionCollectionSortType """ try: if not out_path.lower().endswith(".exp"): out_path += ".exp" mode_cls = getattr(NXOpen.ExpressionCollection, "ExportMode", getattr(NXOpen, "ExpressionCollectionExportMode", None)) sort_cls = getattr(NXOpen.ExpressionCollection, "SortType", getattr(NXOpen, "ExpressionCollectionSortType", None)) if mode_cls is None or sort_cls is None: raise RuntimeError("Unsupported NX/Open version: ExportMode/SortType enums not found") workPart.Expressions.ExportToFile(mode_cls.WorkPart, out_path, sort_cls.AlphaNum) lw.WriteLine(f"[EXP-EXPORT] Wrote: {out_path}") return True except Exception as ex: lw.WriteLine(f"[EXP-EXPORT][ERROR] {ex}") return False def parse_exp_file_to_dict(exp_path): """ Parse NX .exp lines like: // comments [MilliMeter]SomeName=0.001234 SomeOther=42 into { 'SomeName': numeric_or_str, ... }. """ out = {} num = re.compile(r'^[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?$') with open(exp_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: s = line.strip() if not s or s.startswith('//'): continue if '=' not in s: continue left, right = s.split('=', 1) # strip optional [Unit] prefixes on left side left = re.sub(r'\[[^\]]*\]\s*', '', left).strip() key = left val = right.strip() # try numeric v = val if num.match(val): try: v = float(val) except Exception: pass out[key] = v return out def append_named_exprs_row(results_dir, run_id, run_dt, expr_dict, lw, source, part_name): """ Appends one row to Results/NX_named_expressions_log.csv Columns auto-extend for new expression names. Adds metadata: RunID, RunDateTimeLocal, Source ('SIM'|'PART'), PartName. """ log_csv = os.path.join(results_dir, "NX_named_expressions_log.csv") meta = { "RunID": run_id, "RunDateTimeLocal": run_dt.strftime("%Y-%m-%d %H:%M:%S"), "Source": source, "PartName": part_name, } row = {**meta, **expr_dict} # Create or extend header as needed if not os.path.exists(log_csv): fieldnames = list(meta.keys()) + sorted(expr_dict.keys()) with open(log_csv, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fieldnames) w.writeheader() w.writerow({k: row.get(k, "") for k in fieldnames}) lw.WriteLine(f"[EXP-EXPORT] Created CSV log: {log_csv}") return with open(log_csv, "r", newline="", encoding="utf-8") as f: r = csv.reader(f) existing = list(r) if not existing: fieldnames = list(meta.keys()) + sorted(expr_dict.keys()) with open(log_csv, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fieldnames) w.writeheader() w.writerow({k: row.get(k, "") for k in fieldnames}) lw.WriteLine(f"[EXP-EXPORT] Rebuilt CSV log: {log_csv}") return header = existing[0] known = set(header) new_cols = [c for c in meta.keys() if c not in known] + \ sorted([k for k in expr_dict.keys() if k not in known]) if new_cols: header = header + new_cols with open(log_csv, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=header) w.writeheader() # Rewrite old rows (padding any new columns) for data in existing[1:]: old_row = {h: (data[i] if i < len(data) else "") for i, h in enumerate(existing[0])} for c in new_cols: old_row.setdefault(c, "") w.writerow({k: old_row.get(k, "") for k in header}) # Append new row w.writerow({k: row.get(k, "") for k in header}) lw.WriteLine(f"[EXP-EXPORT] Appended CSV log: {log_csv}") def export_geometry_named_expressions(sim_part, results_dir, run_id, lw): """ Switch display to the geometry part (like in your journal), export expressions, then restore. GEOM_PART_NAME must be resolvable via Session.Parts.FindObject. """ theSession = NXOpen.Session.GetSession() original_display = theSession.Parts.BaseDisplay original_work = theSession.Parts.BaseWork try: if not GEOM_PART_NAME: lw.WriteLine("[EXP-EXPORT][WARN] GEOM_PART_NAME not set; skipping geometry export.") return False, None, None try: part1 = theSession.Parts.FindObject(GEOM_PART_NAME) except Exception: lw.WriteLine(f"[EXP-EXPORT][WARN] Geometry part not found by name: {GEOM_PART_NAME}") return False, None, None mark = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Change Displayed Part") status, pls = theSession.Parts.SetActiveDisplay( part1, NXOpen.DisplayPartOption.AllowAdditional, NXOpen.PartDisplayPartWorkPartOption.UseLast ) # Switch to Modeling, like your journal try: theSession.ApplicationSwitchImmediate("UG_APP_MODELING") except Exception: pass workPart = theSession.Parts.Work out_exp = os.path.join(results_dir, f"NamedExpressions_PART_{run_id}.exp") ok = export_all_named_expressions_to_exp(workPart, out_exp, lw) if pls is not None: pls.Dispose() theSession.DeleteUndoMark(mark, None) # Part name for logging part_name = os.path.splitext(os.path.basename(workPart.FullPath))[0] if workPart and workPart.FullPath else GEOM_PART_NAME return ok, out_exp if ok else None, part_name except Exception as ex: lw.WriteLine(f"[EXP-EXPORT][ERROR] Geometry export failed: {ex}") return False, None, None finally: # Try to restore prior display/work part and CAE app try: if original_display is not None: theSession.Parts.SetActiveDisplay( original_display, NXOpen.DisplayPartOption.AllowAdditional, NXOpen.PartDisplayPartWorkPartOption.UseLast ) except Exception: pass try: theSession.ApplicationSwitchImmediate("UG_APP_SFEM") # back to CAE if applicable except Exception: pass def run_post(sim_dir, lw, run_id, results_dir): post_script = os.path.join(sim_dir, SCRIPT_NAME) op2 = os.path.join(sim_dir, OP2_NAME) if not os.path.exists(TEST_ENV_PY): lw.WriteLine(f"[ERROR] test_env python not found: {TEST_ENV_PY}") return 3 if not os.path.exists(post_script): lw.WriteLine(f"[ERROR] Post script not found: {post_script}") return 4 if not os.path.exists(op2): lw.WriteLine(f"[ERROR] OP2 not found: {op2}") return 2 cmd = [TEST_ENV_PY, post_script, "--op2", op2] lw.WriteLine("[POST] " + " ".join(cmd)) lw.WriteLine(f"[POST] cwd={sim_dir}") env = os.environ.copy() env["ZERNIKE_RUN_ID"] = run_id env["ZERNIKE_RESULTS_DIR"] = results_dir proc = subprocess.run( cmd, cwd=sim_dir, capture_output=True, text=True, shell=False, timeout=TIMEOUT, env=env ) if proc.stdout: lw.WriteLine(proc.stdout) if proc.stderr: lw.WriteLine("[STDERR]\n" + proc.stderr) lw.WriteLine(f"[INFO] Post finished (rc={proc.returncode})") return proc.returncode def main(): s = NXOpen.Session.GetSession() lw = s.ListingWindow; lw.Open() sim_part = s.Parts.BaseWork sim_dir = os.path.dirname(sim_part.FullPath) # --- New: Results folder + a run id/timestamp we can also hand to Zernike --- results_dir = os.path.join(sim_dir, "Results") os.makedirs(results_dir, exist_ok=True) run_dt = datetime.now() run_id = run_dt.strftime("%Y%m%d_%H%M%S") # --- Run the Zernike post (hand it the same run id & results dir via env) --- rc = run_post(sim_dir, lw, run_id, results_dir) if rc != 0: lw.WriteLine(f"[POST] Zernike post failed (rc={rc}). Skipping EXP import and NX expr logging.") return # or 'pass' if you prefer to continue anyway # Import EXP if it exists — prefer Results/, then fall back to the sim folder exp_candidates = [ os.path.join(results_dir, EXP_NAME), os.path.join(sim_dir, EXP_NAME), ] for exp_path in exp_candidates: if os.path.isfile(exp_path): import_iteration_results_exp(exp_path, lw) break else: lw.WriteLine(f"[EXP] Skipped: not found → {exp_candidates[0]}") # --- Export SIM (work CAE part) expressions and append log --- sim_part_name = os.path.splitext(os.path.basename(sim_part.FullPath))[0] if sim_part and sim_part.FullPath else "SIM" named_exp_sim = os.path.join(results_dir, f"NamedExpressions_SIM_{run_id}.exp") if export_all_named_expressions_to_exp(sim_part, named_exp_sim, lw): exprs_sim = parse_exp_file_to_dict(named_exp_sim) append_named_exprs_row(results_dir, run_id, run_dt, exprs_sim, lw, source="SIM", part_name=sim_part_name) # --- Export GEOMETRY (modeling) part expressions like your journal, and append log --- ok_part, part_exp_path, part_name = export_geometry_named_expressions(sim_part, results_dir, run_id, lw) if ok_part and part_exp_path: exprs_part = parse_exp_file_to_dict(part_exp_path) append_named_exprs_row(results_dir, run_id, run_dt, exprs_part, lw, source="PART", part_name=part_name) if __name__ == "__main__": main()