""" NX Model Introspection Service - Real Implementation Discovers expressions, solver types, dependent files, and actual result data from NX model files. Uses PyNastran for OP2 result parsing. Used by the Canvas Builder to help users configure optimization workflows. """ import json import os import re import struct from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import logging logger = logging.getLogger(__name__) # Path to studies root _file_path = os.path.abspath(__file__) ATOMIZER_ROOT = Path(os.path.normpath(os.path.dirname(os.path.dirname(os.path.dirname( os.path.dirname(os.path.dirname(_file_path)) ))))) STUDIES_ROOT = ATOMIZER_ROOT / "studies" # Try to import PyNastran for OP2 parsing try: from pyNastran.op2.op2 import OP2 HAS_PYNASTRAN = True except ImportError: HAS_PYNASTRAN = False logger.warning("PyNastran not available - OP2 parsing disabled") class NXIntrospector: """Introspect NX model files to discover expressions, dependencies, and solver info.""" def __init__(self, file_path: str): """ Initialize introspector with a file path. Args: file_path: Relative path from studies root (e.g., "M1_Mirror/study_v1/model.sim") """ self.relative_path = file_path.replace("\\", "/") self.file_path = STUDIES_ROOT / self.relative_path self.file_type = self.file_path.suffix.lower() self.parent_dir = self.file_path.parent self.study_dir = self._find_study_dir() def _find_study_dir(self) -> Path: """Find the study root directory.""" # Walk up to find study markers (optimization_config.json, study.db, etc.) current = self.parent_dir for _ in range(5): # Max 5 levels up if (current / "optimization_config.json").exists(): return current if (current / "3_results").exists(): return current if (current / "1_model").exists(): return current if current == STUDIES_ROOT: break current = current.parent return self.parent_dir def introspect(self) -> Dict[str, Any]: """ Full introspection of the model file. Returns: Dict with expressions, solver_type, dependent_files, extractors_available, mesh_info, result_files, warnings """ result = { "file_path": self.relative_path, "file_type": self.file_type, "expressions": [], "solver_type": None, "dependent_files": [], "result_files": [], "mesh_info": None, "extractors_available": [], "warnings": [], "study_dir": str(self.study_dir.relative_to(STUDIES_ROOT)).replace("\\", "/") if self.study_dir != self.parent_dir else None, } if not self.file_path.exists(): result["warnings"].append(f"File not found: {self.file_path}") return result try: # Step 1: Discover related files result["dependent_files"] = self._discover_related_files() # Step 2: Detect solver type from files result["solver_type"] = self._detect_solver_type() # Step 3: Find and analyze OP2 result files op2_files = self._find_op2_files() if op2_files: result["result_files"] = op2_files # Analyze the first OP2 file for available result types op2_analysis = self._analyze_op2(op2_files[0]["path"]) if HAS_PYNASTRAN else None if op2_analysis: result["op2_analysis"] = op2_analysis # Step 4: Try to get mesh info from FEM files fem_file = self._find_fem_file() if fem_file: mesh_info = self._analyze_fem(fem_file) if mesh_info: result["mesh_info"] = mesh_info # Step 5: Parse BDF for actual parameter values bdf_file = self._find_bdf_file() if bdf_file: bdf_analysis = self._analyze_bdf(bdf_file) if bdf_analysis: result["bdf_analysis"] = bdf_analysis if bdf_analysis.get("mass"): result["mass_from_bdf"] = bdf_analysis["mass"] # Step 6: Try to load expressions from config or discover them config_expressions = self._load_expressions_from_config() if config_expressions: result["expressions"] = config_expressions else: # Try to discover from study history historical = self._discover_from_study_history() if historical: result["expressions"] = historical else: # Fall back to common patterns result["expressions"] = self._discover_common_expressions() except Exception as e: logger.error(f"Introspection error: {e}", exc_info=True) result["warnings"].append(str(e)) # Suggest extractors based on solver type and available data result["extractors_available"] = self._suggest_extractors( result.get("solver_type"), result.get("result_files", []), result.get("op2_analysis") ) return result def _discover_related_files(self) -> List[Dict[str, Any]]: """Find all related NX files by naming convention.""" related = [] # Get base name without _sim1, _fem1, _i suffixes base_name = self.file_path.stem base_name = re.sub(r'_sim\d*$', '', base_name) base_name = re.sub(r'_fem\d*$', '', base_name) base_name = re.sub(r'_i$', '', base_name) # Search directories search_dirs = [self.parent_dir] if self.study_dir != self.parent_dir: search_dirs.append(self.study_dir) # Also check 1_model subfolder model_dir = self.study_dir / "1_model" if model_dir.exists(): search_dirs.append(model_dir) seen_paths = set() for search_dir in search_dirs: if not search_dir.exists(): continue # Define patterns to search for patterns = [ (f"{base_name}.prt", "geometry"), (f"{base_name}_i.prt", "idealized"), (f"{base_name}_fem*.fem", "fem"), (f"{base_name}_fem*_i.prt", "idealized_fem"), (f"{base_name}_sim*.sim", "simulation"), (f"{base_name}.afem", "assembled_fem"), ] for pattern, file_category in patterns: for match in search_dir.glob(pattern): if match.exists() and str(match) not in seen_paths: seen_paths.add(str(match)) try: rel_path = str(match.relative_to(STUDIES_ROOT)).replace("\\", "/") except ValueError: rel_path = str(match) related.append({ "name": match.name, "path": rel_path, "type": match.suffix[1:].lower(), "category": file_category, "size": match.stat().st_size, }) return related def _find_op2_files(self) -> List[Dict[str, Any]]: """Find OP2 result files in the study.""" op2_files = [] # Search in iterations/results folders search_dirs = [ self.study_dir / "2_iterations", self.study_dir / "3_results", self.parent_dir, ] for search_dir in search_dirs: if not search_dir.exists(): continue # Search recursively for OP2 files (limit depth to avoid going too deep) for op2_path in search_dir.rglob("*.op2"): try: rel_path = str(op2_path.relative_to(STUDIES_ROOT)).replace("\\", "/") except ValueError: rel_path = str(op2_path) op2_files.append({ "name": op2_path.name, "path": rel_path, "full_path": str(op2_path), "size": op2_path.stat().st_size, "trial_folder": op2_path.parent.name if "trial_" in op2_path.parent.name else None, }) # Limit to 10 OP2 files for performance if len(op2_files) >= 10: break return op2_files def _analyze_op2(self, op2_path: str) -> Optional[Dict[str, Any]]: """Analyze an OP2 file to discover available result types.""" if not HAS_PYNASTRAN: return None full_path = STUDIES_ROOT / op2_path if not full_path.exists(): return None try: op2 = OP2() op2.set_results_to_include({ 'displacements': True, 'eigenvectors': True, 'solid_stress': True, 'plate_stress': True, }) op2.read_op2(str(full_path), build_dataframe=False) analysis = { "subcases": list(op2.displacements.keys()) if op2.displacements else [], "has_displacements": bool(op2.displacements), "has_eigenvectors": bool(op2.eigenvectors), "has_solid_stress": bool(getattr(op2, 'solid_stress', None)), "has_plate_stress": bool(getattr(op2, 'plate_stress', None)), } # Get node count from displacement results if op2.displacements: first_subcase = list(op2.displacements.values())[0] analysis["node_count"] = len(first_subcase.node_gridtype) # Get eigenvalue info if modal analysis if op2.eigenvectors: first_subcase = list(op2.eigenvectors.values())[0] if hasattr(first_subcase, 'eigrs'): # Convert eigenvalues to frequencies import numpy as np eigenvalues = first_subcase.eigrs frequencies = np.sqrt(np.abs(eigenvalues)) / (2 * np.pi) analysis["frequencies_hz"] = frequencies[:10].tolist() # First 10 modes analysis["num_modes"] = len(eigenvalues) return analysis except Exception as e: logger.warning(f"OP2 analysis failed: {e}") return {"error": str(e)} def _find_fem_file(self) -> Optional[Path]: """Find the FEM file for this model.""" base_name = self.file_path.stem base_name = re.sub(r'_sim\d*$', '', base_name) base_name = re.sub(r'_i$', '', base_name) patterns = [ f"{base_name}.fem", f"{base_name}_fem1.fem", f"{base_name}_fem.fem", ] for search_dir in [self.parent_dir, self.study_dir / "1_model"]: if not search_dir.exists(): continue for pattern in patterns: fem_path = search_dir / pattern if fem_path.exists(): return fem_path return None def _analyze_fem(self, fem_path: Path) -> Optional[Dict[str, Any]]: """Analyze FEM file for mesh statistics.""" try: # FEM files are binary - we can get basic stats from file size # For actual mesh data, we'd need NX Open API stats = { "path": str(fem_path.relative_to(STUDIES_ROOT)).replace("\\", "/"), "size_mb": round(fem_path.stat().st_size / 1024 / 1024, 2), } # Try to find corresponding .dat file for actual mesh info dat_path = fem_path.with_suffix('.dat') if dat_path.exists(): dat_analysis = self._analyze_dat_file(dat_path) if dat_analysis: stats.update(dat_analysis) return stats except Exception as e: logger.warning(f"FEM analysis failed: {e}") return None def _find_bdf_file(self) -> Optional[Path]: """Find BDF/DAT file in the study.""" # Check iterations folder first (most recent analysis) iterations_dir = self.study_dir / "2_iterations" if iterations_dir.exists(): # Look in the most recent trial folder trial_folders = sorted( [d for d in iterations_dir.iterdir() if d.is_dir() and d.name.startswith("trial_")], key=lambda x: x.name, reverse=True ) if trial_folders: for trial in trial_folders[:3]: # Check last 3 trials for ext in ['.dat', '.bdf']: for bdf_path in trial.glob(f"*{ext}"): return bdf_path # Check model directory for search_dir in [self.parent_dir, self.study_dir / "1_model"]: if search_dir.exists(): for ext in ['.dat', '.bdf']: for bdf_path in search_dir.glob(f"*{ext}"): return bdf_path return None def _analyze_bdf(self, bdf_path: Path) -> Optional[Dict[str, Any]]: """Analyze BDF/DAT file for mass and other properties.""" try: analysis = { "path": str(bdf_path.relative_to(STUDIES_ROOT)).replace("\\", "/"), } with open(bdf_path, 'r', errors='ignore') as f: content = f.read() # Extract mass from GRID+element cards or PARAM,WTMASS # Look for mass in comments or parameters mass_match = re.search(r'(?:MASS|mass)\s*[=:]\s*([\d.eE+-]+)', content) if mass_match: analysis["mass"] = float(mass_match.group(1)) # Count grid points grid_count = len(re.findall(r'^GRID[\s,]', content, re.MULTILINE)) if grid_count > 0: analysis["grid_count"] = grid_count # Count elements by type element_counts = {} for elem_type in ['CTETRA', 'CHEXA', 'CPENTA', 'CTRIA3', 'CQUAD4', 'CBAR', 'CBEAM']: count = len(re.findall(rf'^{elem_type}[\s,]', content, re.MULTILINE)) if count > 0: element_counts[elem_type.lower()] = count if element_counts: analysis["elements"] = element_counts analysis["total_elements"] = sum(element_counts.values()) # Detect solver type from executive control if 'SOL 101' in content or 'SOL101' in content: analysis["solver"] = "SOL101" elif 'SOL 103' in content or 'SOL103' in content: analysis["solver"] = "SOL103" elif 'SOL 111' in content or 'SOL111' in content: analysis["solver"] = "SOL111" return analysis except Exception as e: logger.warning(f"BDF analysis failed: {e}") return None def _analyze_dat_file(self, dat_path: Path) -> Optional[Dict[str, Any]]: """Analyze .dat file for mesh/model info.""" try: analysis = {} with open(dat_path, 'r', errors='ignore') as f: # Read first 10000 chars for efficiency content = f.read(10000) # Count grid points grid_count = len(re.findall(r'^GRID[\s,]', content, re.MULTILINE)) if grid_count > 0: analysis["node_count"] = grid_count return analysis if analysis else None except Exception as e: return None def _detect_solver_type(self) -> Optional[str]: """Detect solver type from files and naming.""" # First check BDF file bdf_file = self._find_bdf_file() if bdf_file: analysis = self._analyze_bdf(bdf_file) if analysis and analysis.get("solver"): return analysis["solver"] # Infer from naming conventions name_lower = self.file_path.name.lower() parent_lower = str(self.study_dir).lower() if 'modal' in name_lower or 'freq' in name_lower or 'modal' in parent_lower: return 'SOL103' elif 'static' in name_lower or 'stress' in name_lower: return 'SOL101' elif 'thermal' in name_lower or 'heat' in name_lower: return 'SOL153' elif 'dynamic' in name_lower: return 'SOL111' elif 'mirror' in parent_lower or 'wfe' in parent_lower: return 'SOL101' return 'SOL101' # Default def _load_expressions_from_config(self) -> List[Dict[str, Any]]: """Load expressions from optimization_config.json if it exists.""" expressions = [] config_paths = [ self.study_dir / "optimization_config.json", self.study_dir / "1_config" / "optimization_config.json", self.parent_dir / "optimization_config.json", ] for config_path in config_paths: if config_path.exists(): try: with open(config_path, 'r') as f: config = json.load(f) design_vars = config.get("design_variables", []) for dv in design_vars: expr_name = dv.get("name", dv.get("expression", "unknown")) expr_min = dv.get("min", 0) expr_max = dv.get("max", 100) expressions.append({ "name": expr_name, "value": (expr_min + expr_max) / 2, "min": expr_min, "max": expr_max, "unit": dv.get("unit", "mm"), "type": "design_variable", "source": "config", }) return expressions except Exception as e: logger.warning(f"Failed to load config: {e}") return expressions def _discover_from_study_history(self) -> List[Dict[str, Any]]: """Try to discover expressions from study database or previous trials.""" expressions = [] # Check study.db for parameter history db_path = self.study_dir / "3_results" / "study.db" if db_path.exists(): try: import sqlite3 conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Try Optuna schema first cursor.execute(""" SELECT DISTINCT param_name, param_value FROM trial_params ORDER BY trial_id DESC LIMIT 20 """) rows = cursor.fetchall() param_values: Dict[str, List[float]] = {} for name, value in rows: if name not in param_values: param_values[name] = [] try: param_values[name].append(float(value)) except (ValueError, TypeError): pass for name, values in param_values.items(): if values: expressions.append({ "name": name, "value": sum(values) / len(values), "min": min(values), "max": max(values), "unit": "mm", "type": "design_variable", "source": "database", }) conn.close() except Exception as e: logger.debug(f"Database query failed: {e}") return expressions def _discover_common_expressions(self) -> List[Dict[str, Any]]: """Discover common expressions based on study type.""" parent_lower = str(self.study_dir).lower() if 'mirror' in parent_lower: return [ {"name": "flatback_thickness", "value": 30.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "rib_height", "value": 40.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "rib_width", "value": 8.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "fillet_radius", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"}, ] elif 'bracket' in parent_lower: return [ {"name": "thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "height", "value": 30.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "template"}, ] elif 'beam' in parent_lower: return [ {"name": "height", "value": 100.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "web_thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"}, ] # Generic return [ {"name": "thickness", "value": 10.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "length", "value": 100.0, "unit": "mm", "type": "dimension", "source": "template"}, {"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"}, ] def _suggest_extractors( self, solver_type: Optional[str], result_files: List[Dict[str, Any]], op2_analysis: Optional[Dict[str, Any]] ) -> List[Dict[str, Any]]: """Suggest extractors based on solver type and available data.""" extractors = [ {"id": "E4", "name": "Mass (BDF)", "description": "Extract mass from BDF file", "always": True, "available": True}, {"id": "E5", "name": "Mass (Expression)", "description": "Extract mass from NX expression", "always": True, "available": True}, ] # Determine availability based on OP2 analysis has_displacements = op2_analysis.get("has_displacements", False) if op2_analysis else False has_eigenvectors = op2_analysis.get("has_eigenvectors", False) if op2_analysis else False has_stress = op2_analysis.get("has_solid_stress", False) or op2_analysis.get("has_plate_stress", False) if op2_analysis else False has_results = len(result_files) > 0 if solver_type == 'SOL101' or has_displacements: extractors.extend([ { "id": "E1", "name": "Displacement", "description": "Max displacement from static analysis", "always": False, "available": has_displacements or has_results }, { "id": "E3", "name": "Stress", "description": "Von Mises stress from static analysis", "always": False, "available": has_stress or has_results }, ]) if solver_type == 'SOL103' or has_eigenvectors: extractors.append({ "id": "E2", "name": "Frequency", "description": "Natural frequencies from modal analysis", "always": False, "available": has_eigenvectors or has_results }) # Mirror-specific extractors parent_lower = str(self.study_dir).lower() if 'mirror' in parent_lower or 'wfe' in parent_lower: extractors.extend([ {"id": "E8", "name": "Zernike Coefficients", "description": "Zernike polynomial coefficients from OP2", "always": False, "available": has_displacements}, {"id": "E9", "name": "Zernike CSV", "description": "Zernike from CSV export", "always": False, "available": True}, {"id": "E10", "name": "Zernike RMS WFE", "description": "RMS wavefront error calculation", "always": False, "available": True}, ]) return extractors