""" NX Model Introspection Service Discovers expressions, solver types, and dependent files from NX model files. Used by the Canvas Builder to help users configure optimization workflows. """ import json import os import re from pathlib import Path from typing import Any, Dict, List, Optional import logging logger = logging.getLogger(__name__) # Path to studies root _file_path = os.path.abspath(__file__) ATOMIZER_ROOT = Path(os.path.normpath(os.path.dirname(os.path.dirname(os.path.dirname( os.path.dirname(os.path.dirname(_file_path)) ))))) STUDIES_ROOT = ATOMIZER_ROOT / "studies" class NXIntrospector: """Introspect NX model files to discover expressions, dependencies, and solver info.""" def __init__(self, file_path: str): """ Initialize introspector with a file path. Args: file_path: Relative path from studies root (e.g., "M1_Mirror/study_v1/model.sim") """ self.relative_path = file_path.replace("\\", "/") self.file_path = STUDIES_ROOT / self.relative_path self.file_type = self.file_path.suffix.lower() self.parent_dir = self.file_path.parent def introspect(self) -> Dict[str, Any]: """ Full introspection of the model file. Returns: Dict with expressions, solver_type, dependent_files, extractors_available, warnings """ result = { "file_path": self.relative_path, "file_type": self.file_type, "expressions": [], "solver_type": None, "dependent_files": [], "extractors_available": [], "warnings": [], } if not self.file_path.exists(): result["warnings"].append(f"File not found: {self.file_path}") return result try: if self.file_type == '.sim': result.update(self._introspect_sim()) elif self.file_type == '.prt': result.update(self._introspect_prt()) elif self.file_type in ['.fem', '.afem']: result.update(self._introspect_fem()) # Try to load expressions from optimization_config.json if present config_expressions = self._load_expressions_from_config() if config_expressions: result["expressions"] = config_expressions # If still no expressions, try from study history if not result["expressions"]: result["expressions"] = self._discover_common_expressions() except Exception as e: logger.error(f"Introspection error: {e}") result["warnings"].append(str(e)) # Suggest extractors based on solver type result["extractors_available"] = self._suggest_extractors(result.get("solver_type")) return result def _introspect_sim(self) -> Dict[str, Any]: """Introspect .sim file.""" result = { "solver_type": None, "dependent_files": [], } base_name = self.file_path.stem # Find related files in the same directory and parent search_dirs = [self.parent_dir] if self.parent_dir.name in ['1_config', '1_setup', 'config', 'setup']: search_dirs.append(self.parent_dir.parent) for search_dir in search_dirs: if not search_dir.exists(): continue for ext in ['.prt', '.fem', '.afem']: # Look for variations of the file name patterns = [ f"{base_name}{ext}", f"{base_name.replace('_sim1', '')}{ext}", f"{base_name.replace('_sim1', '_fem1')}{ext}", ] for pattern in patterns: file_candidate = search_dir / pattern if file_candidate.exists(): result["dependent_files"].append({ "path": str(file_candidate.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": ext[1:], "name": file_candidate.name, }) # Find idealized part (*_i.prt) - critical for mesh updates for f in search_dir.glob("*_i.prt"): result["dependent_files"].append({ "path": str(f.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": "idealized_prt", "name": f.name, }) # Try to determine solver type result["solver_type"] = self._detect_solver_type() return result def _introspect_prt(self) -> Dict[str, Any]: """Introspect .prt file.""" result = { "dependent_files": [], } base_name = self.file_path.stem # Look for associated .sim and .fem files search_dirs = [self.parent_dir] if self.parent_dir.name in ['1_config', '1_setup', 'config', 'setup']: search_dirs.append(self.parent_dir.parent) for search_dir in search_dirs: if not search_dir.exists(): continue for ext in ['.sim', '.fem', '.afem']: patterns = [ f"{base_name}{ext}", f"{base_name}_sim1{ext}", f"{base_name}_fem1{ext}", ] for pattern in patterns: file_candidate = search_dir / pattern if file_candidate.exists(): result["dependent_files"].append({ "path": str(file_candidate.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": ext[1:], "name": file_candidate.name, }) return result def _introspect_fem(self) -> Dict[str, Any]: """Introspect .fem or .afem file.""" result = { "dependent_files": [], } base_name = self.file_path.stem # Look for associated files for ext in ['.prt', '.sim']: patterns = [ f"{base_name}{ext}", f"{base_name.replace('_fem1', '')}{ext}", f"{base_name.replace('_fem1', '_sim1')}{ext}", ] for pattern in patterns: file_candidate = self.parent_dir / pattern if file_candidate.exists(): result["dependent_files"].append({ "path": str(file_candidate.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": ext[1:], "name": file_candidate.name, }) return result def _detect_solver_type(self) -> Optional[str]: """Detect solver type from file name or contents.""" name_lower = self.file_path.name.lower() parent_lower = str(self.parent_dir).lower() # Infer from naming conventions if 'modal' in name_lower or 'freq' in name_lower or 'modal' in parent_lower: return 'SOL103' # Modal analysis elif 'static' in name_lower or 'stress' in name_lower: return 'SOL101' # Static analysis elif 'thermal' in name_lower or 'heat' in name_lower: return 'SOL153' # Thermal elif 'dynamic' in name_lower: return 'SOL111' # Frequency response elif 'mirror' in parent_lower or 'wfe' in parent_lower: return 'SOL101' # Mirrors usually use static analysis # Default to static return 'SOL101' def _load_expressions_from_config(self) -> List[Dict[str, Any]]: """Load expressions from optimization_config.json if it exists.""" expressions = [] # Look for config file in study directory config_paths = [ self.parent_dir / "optimization_config.json", self.parent_dir / "1_config" / "optimization_config.json", self.parent_dir / "1_setup" / "optimization_config.json", self.parent_dir.parent / "optimization_config.json", self.parent_dir.parent / "1_config" / "optimization_config.json", ] for config_path in config_paths: if config_path.exists(): try: with open(config_path, 'r') as f: config = json.load(f) # Extract design variables design_vars = config.get("design_variables", []) for dv in design_vars: expressions.append({ "name": dv.get("name", dv.get("expression", "unknown")), "value": (dv.get("min", 0) + dv.get("max", 100)) / 2, "min": dv.get("min"), "max": dv.get("max"), "unit": dv.get("unit", "mm"), "type": "design_variable", "source": "config", }) return expressions except Exception as e: logger.warning(f"Failed to load config: {e}") return expressions def _discover_common_expressions(self) -> List[Dict[str, Any]]: """Discover common expressions based on study type.""" # Check parent directory name to infer study type parent_lower = str(self.parent_dir).lower() if 'mirror' in parent_lower: return [ {"name": "flatback_thickness", "value": 30.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "rib_height", "value": 40.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "rib_width", "value": 8.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "fillet_radius", "value": 5.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "web_thickness", "value": 4.0, "unit": "mm", "type": "dimension", "source": "inferred"}, ] elif 'bracket' in parent_lower: return [ {"name": "thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "height", "value": 30.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "hole_diameter", "value": 8.0, "unit": "mm", "type": "dimension", "source": "inferred"}, ] elif 'beam' in parent_lower: return [ {"name": "height", "value": 100.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "web_thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "flange_thickness", "value": 8.0, "unit": "mm", "type": "dimension", "source": "inferred"}, ] # Generic expressions return [ {"name": "thickness", "value": 10.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "length", "value": 100.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "height", "value": 25.0, "unit": "mm", "type": "dimension", "source": "inferred"}, {"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "inferred"}, ] def _suggest_extractors(self, solver_type: Optional[str]) -> List[Dict[str, Any]]: """Suggest extractors based on solver type.""" extractors = [ {"id": "E4", "name": "Mass (BDF)", "description": "Extract mass from BDF file", "always": True}, {"id": "E5", "name": "Mass (Expression)", "description": "Extract mass from NX expression", "always": True}, ] if solver_type == 'SOL101': extractors.extend([ {"id": "E1", "name": "Displacement", "description": "Max displacement from static analysis", "always": False}, {"id": "E3", "name": "Stress", "description": "Von Mises stress from static analysis", "always": False}, ]) elif solver_type == 'SOL103': extractors.extend([ {"id": "E2", "name": "Frequency", "description": "Natural frequencies from modal analysis", "always": False}, ]) # Check if study appears to be mirror-related parent_lower = str(self.parent_dir).lower() if 'mirror' in parent_lower or 'wfe' in parent_lower: extractors.extend([ {"id": "E8", "name": "Zernike Coefficients", "description": "Zernike polynomial coefficients", "always": False}, {"id": "E9", "name": "Zernike RMS", "description": "RMS wavefront error", "always": False}, {"id": "E10", "name": "Zernike WFE", "description": "Weighted WFE metric", "always": False}, ]) return extractors