Files
Atomizer/atomizer-dashboard/backend/api/services/nx_introspection.py
Anto01 ac5e9b4054 docs: Comprehensive documentation update for Dashboard V3 and Canvas
## Documentation Updates
- DASHBOARD.md: Updated to V3.0 with Canvas V3 features, file browser, introspection
- DASHBOARD_IMPLEMENTATION_STATUS.md: Marked Canvas V3 features as COMPLETE
- CANVAS.md: New comprehensive guide for Canvas Builder V3 with all features
- CLAUDE.md: Added dashboard quick reference and Canvas V3 features

## Canvas V3 Features Documented
- File Browser: Browse studies directory for model files
- Model Introspection: Auto-discover expressions, solver type, dependencies
- One-Click Add: Add expressions as design variables instantly
- Claude Bug Fixes: WebSocket reconnection, SQL errors resolved
- Health Check: /api/health endpoint for monitoring

## Backend Services
- NX introspection service with expression discovery
- File browser API with type filtering
- Claude session management improvements
- Context builder enhancements

## Frontend Components
- FileBrowser: Modal for file selection with search
- IntrospectionPanel: View discovered model information
- ExpressionSelector: Dropdown for design variable configuration
- Improved chat hooks with reconnection logic

## Plan Documents
- Added RALPH_LOOP_CANVAS_V2/V3 implementation records
- Added ATOMIZER_DASHBOARD_V2_MASTER_PLAN
- Added investigation and sync documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 20:48:58 -05:00

626 lines
24 KiB
Python

"""
NX Model Introspection Service - Real Implementation
Discovers expressions, solver types, dependent files, and actual result data
from NX model files. Uses PyNastran for OP2 result parsing.
Used by the Canvas Builder to help users configure optimization workflows.
"""
import json
import os
import re
import struct
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
# Path to studies root
_file_path = os.path.abspath(__file__)
ATOMIZER_ROOT = Path(os.path.normpath(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.dirname(os.path.dirname(_file_path))
)))))
STUDIES_ROOT = ATOMIZER_ROOT / "studies"
# Try to import PyNastran for OP2 parsing
try:
from pyNastran.op2.op2 import OP2
HAS_PYNASTRAN = True
except ImportError:
HAS_PYNASTRAN = False
logger.warning("PyNastran not available - OP2 parsing disabled")
class NXIntrospector:
"""Introspect NX model files to discover expressions, dependencies, and solver info."""
def __init__(self, file_path: str):
"""
Initialize introspector with a file path.
Args:
file_path: Relative path from studies root (e.g., "M1_Mirror/study_v1/model.sim")
"""
self.relative_path = file_path.replace("\\", "/")
self.file_path = STUDIES_ROOT / self.relative_path
self.file_type = self.file_path.suffix.lower()
self.parent_dir = self.file_path.parent
self.study_dir = self._find_study_dir()
def _find_study_dir(self) -> Path:
"""Find the study root directory."""
# Walk up to find study markers (optimization_config.json, study.db, etc.)
current = self.parent_dir
for _ in range(5): # Max 5 levels up
if (current / "optimization_config.json").exists():
return current
if (current / "3_results").exists():
return current
if (current / "1_model").exists():
return current
if current == STUDIES_ROOT:
break
current = current.parent
return self.parent_dir
def introspect(self) -> Dict[str, Any]:
"""
Full introspection of the model file.
Returns:
Dict with expressions, solver_type, dependent_files, extractors_available,
mesh_info, result_files, warnings
"""
result = {
"file_path": self.relative_path,
"file_type": self.file_type,
"expressions": [],
"solver_type": None,
"dependent_files": [],
"result_files": [],
"mesh_info": None,
"extractors_available": [],
"warnings": [],
"study_dir": str(self.study_dir.relative_to(STUDIES_ROOT)).replace("\\", "/") if self.study_dir != self.parent_dir else None,
}
if not self.file_path.exists():
result["warnings"].append(f"File not found: {self.file_path}")
return result
try:
# Step 1: Discover related files
result["dependent_files"] = self._discover_related_files()
# Step 2: Detect solver type from files
result["solver_type"] = self._detect_solver_type()
# Step 3: Find and analyze OP2 result files
op2_files = self._find_op2_files()
if op2_files:
result["result_files"] = op2_files
# Analyze the first OP2 file for available result types
op2_analysis = self._analyze_op2(op2_files[0]["path"]) if HAS_PYNASTRAN else None
if op2_analysis:
result["op2_analysis"] = op2_analysis
# Step 4: Try to get mesh info from FEM files
fem_file = self._find_fem_file()
if fem_file:
mesh_info = self._analyze_fem(fem_file)
if mesh_info:
result["mesh_info"] = mesh_info
# Step 5: Parse BDF for actual parameter values
bdf_file = self._find_bdf_file()
if bdf_file:
bdf_analysis = self._analyze_bdf(bdf_file)
if bdf_analysis:
result["bdf_analysis"] = bdf_analysis
if bdf_analysis.get("mass"):
result["mass_from_bdf"] = bdf_analysis["mass"]
# Step 6: Try to load expressions from config or discover them
config_expressions = self._load_expressions_from_config()
if config_expressions:
result["expressions"] = config_expressions
else:
# Try to discover from study history
historical = self._discover_from_study_history()
if historical:
result["expressions"] = historical
else:
# Fall back to common patterns
result["expressions"] = self._discover_common_expressions()
except Exception as e:
logger.error(f"Introspection error: {e}", exc_info=True)
result["warnings"].append(str(e))
# Suggest extractors based on solver type and available data
result["extractors_available"] = self._suggest_extractors(
result.get("solver_type"),
result.get("result_files", []),
result.get("op2_analysis")
)
return result
def _discover_related_files(self) -> List[Dict[str, Any]]:
"""Find all related NX files by naming convention."""
related = []
# Get base name without _sim1, _fem1, _i suffixes
base_name = self.file_path.stem
base_name = re.sub(r'_sim\d*$', '', base_name)
base_name = re.sub(r'_fem\d*$', '', base_name)
base_name = re.sub(r'_i$', '', base_name)
# Search directories
search_dirs = [self.parent_dir]
if self.study_dir != self.parent_dir:
search_dirs.append(self.study_dir)
# Also check 1_model subfolder
model_dir = self.study_dir / "1_model"
if model_dir.exists():
search_dirs.append(model_dir)
seen_paths = set()
for search_dir in search_dirs:
if not search_dir.exists():
continue
# Define patterns to search for
patterns = [
(f"{base_name}.prt", "geometry"),
(f"{base_name}_i.prt", "idealized"),
(f"{base_name}_fem*.fem", "fem"),
(f"{base_name}_fem*_i.prt", "idealized_fem"),
(f"{base_name}_sim*.sim", "simulation"),
(f"{base_name}.afem", "assembled_fem"),
]
for pattern, file_category in patterns:
for match in search_dir.glob(pattern):
if match.exists() and str(match) not in seen_paths:
seen_paths.add(str(match))
try:
rel_path = str(match.relative_to(STUDIES_ROOT)).replace("\\", "/")
except ValueError:
rel_path = str(match)
related.append({
"name": match.name,
"path": rel_path,
"type": match.suffix[1:].lower(),
"category": file_category,
"size": match.stat().st_size,
})
return related
def _find_op2_files(self) -> List[Dict[str, Any]]:
"""Find OP2 result files in the study."""
op2_files = []
# Search in iterations/results folders
search_dirs = [
self.study_dir / "2_iterations",
self.study_dir / "3_results",
self.parent_dir,
]
for search_dir in search_dirs:
if not search_dir.exists():
continue
# Search recursively for OP2 files (limit depth to avoid going too deep)
for op2_path in search_dir.rglob("*.op2"):
try:
rel_path = str(op2_path.relative_to(STUDIES_ROOT)).replace("\\", "/")
except ValueError:
rel_path = str(op2_path)
op2_files.append({
"name": op2_path.name,
"path": rel_path,
"full_path": str(op2_path),
"size": op2_path.stat().st_size,
"trial_folder": op2_path.parent.name if "trial_" in op2_path.parent.name else None,
})
# Limit to 10 OP2 files for performance
if len(op2_files) >= 10:
break
return op2_files
def _analyze_op2(self, op2_path: str) -> Optional[Dict[str, Any]]:
"""Analyze an OP2 file to discover available result types."""
if not HAS_PYNASTRAN:
return None
full_path = STUDIES_ROOT / op2_path
if not full_path.exists():
return None
try:
op2 = OP2()
op2.set_results_to_include({
'displacements': True,
'eigenvectors': True,
'solid_stress': True,
'plate_stress': True,
})
op2.read_op2(str(full_path), build_dataframe=False)
analysis = {
"subcases": list(op2.displacements.keys()) if op2.displacements else [],
"has_displacements": bool(op2.displacements),
"has_eigenvectors": bool(op2.eigenvectors),
"has_solid_stress": bool(getattr(op2, 'solid_stress', None)),
"has_plate_stress": bool(getattr(op2, 'plate_stress', None)),
}
# Get node count from displacement results
if op2.displacements:
first_subcase = list(op2.displacements.values())[0]
analysis["node_count"] = len(first_subcase.node_gridtype)
# Get eigenvalue info if modal analysis
if op2.eigenvectors:
first_subcase = list(op2.eigenvectors.values())[0]
if hasattr(first_subcase, 'eigrs'):
# Convert eigenvalues to frequencies
import numpy as np
eigenvalues = first_subcase.eigrs
frequencies = np.sqrt(np.abs(eigenvalues)) / (2 * np.pi)
analysis["frequencies_hz"] = frequencies[:10].tolist() # First 10 modes
analysis["num_modes"] = len(eigenvalues)
return analysis
except Exception as e:
logger.warning(f"OP2 analysis failed: {e}")
return {"error": str(e)}
def _find_fem_file(self) -> Optional[Path]:
"""Find the FEM file for this model."""
base_name = self.file_path.stem
base_name = re.sub(r'_sim\d*$', '', base_name)
base_name = re.sub(r'_i$', '', base_name)
patterns = [
f"{base_name}.fem",
f"{base_name}_fem1.fem",
f"{base_name}_fem.fem",
]
for search_dir in [self.parent_dir, self.study_dir / "1_model"]:
if not search_dir.exists():
continue
for pattern in patterns:
fem_path = search_dir / pattern
if fem_path.exists():
return fem_path
return None
def _analyze_fem(self, fem_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze FEM file for mesh statistics."""
try:
# FEM files are binary - we can get basic stats from file size
# For actual mesh data, we'd need NX Open API
stats = {
"path": str(fem_path.relative_to(STUDIES_ROOT)).replace("\\", "/"),
"size_mb": round(fem_path.stat().st_size / 1024 / 1024, 2),
}
# Try to find corresponding .dat file for actual mesh info
dat_path = fem_path.with_suffix('.dat')
if dat_path.exists():
dat_analysis = self._analyze_dat_file(dat_path)
if dat_analysis:
stats.update(dat_analysis)
return stats
except Exception as e:
logger.warning(f"FEM analysis failed: {e}")
return None
def _find_bdf_file(self) -> Optional[Path]:
"""Find BDF/DAT file in the study."""
# Check iterations folder first (most recent analysis)
iterations_dir = self.study_dir / "2_iterations"
if iterations_dir.exists():
# Look in the most recent trial folder
trial_folders = sorted(
[d for d in iterations_dir.iterdir() if d.is_dir() and d.name.startswith("trial_")],
key=lambda x: x.name,
reverse=True
)
if trial_folders:
for trial in trial_folders[:3]: # Check last 3 trials
for ext in ['.dat', '.bdf']:
for bdf_path in trial.glob(f"*{ext}"):
return bdf_path
# Check model directory
for search_dir in [self.parent_dir, self.study_dir / "1_model"]:
if search_dir.exists():
for ext in ['.dat', '.bdf']:
for bdf_path in search_dir.glob(f"*{ext}"):
return bdf_path
return None
def _analyze_bdf(self, bdf_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze BDF/DAT file for mass and other properties."""
try:
analysis = {
"path": str(bdf_path.relative_to(STUDIES_ROOT)).replace("\\", "/"),
}
with open(bdf_path, 'r', errors='ignore') as f:
content = f.read()
# Extract mass from GRID+element cards or PARAM,WTMASS
# Look for mass in comments or parameters
mass_match = re.search(r'(?:MASS|mass)\s*[=:]\s*([\d.eE+-]+)', content)
if mass_match:
analysis["mass"] = float(mass_match.group(1))
# Count grid points
grid_count = len(re.findall(r'^GRID[\s,]', content, re.MULTILINE))
if grid_count > 0:
analysis["grid_count"] = grid_count
# Count elements by type
element_counts = {}
for elem_type in ['CTETRA', 'CHEXA', 'CPENTA', 'CTRIA3', 'CQUAD4', 'CBAR', 'CBEAM']:
count = len(re.findall(rf'^{elem_type}[\s,]', content, re.MULTILINE))
if count > 0:
element_counts[elem_type.lower()] = count
if element_counts:
analysis["elements"] = element_counts
analysis["total_elements"] = sum(element_counts.values())
# Detect solver type from executive control
if 'SOL 101' in content or 'SOL101' in content:
analysis["solver"] = "SOL101"
elif 'SOL 103' in content or 'SOL103' in content:
analysis["solver"] = "SOL103"
elif 'SOL 111' in content or 'SOL111' in content:
analysis["solver"] = "SOL111"
return analysis
except Exception as e:
logger.warning(f"BDF analysis failed: {e}")
return None
def _analyze_dat_file(self, dat_path: Path) -> Optional[Dict[str, Any]]:
"""Analyze .dat file for mesh/model info."""
try:
analysis = {}
with open(dat_path, 'r', errors='ignore') as f:
# Read first 10000 chars for efficiency
content = f.read(10000)
# Count grid points
grid_count = len(re.findall(r'^GRID[\s,]', content, re.MULTILINE))
if grid_count > 0:
analysis["node_count"] = grid_count
return analysis if analysis else None
except Exception as e:
return None
def _detect_solver_type(self) -> Optional[str]:
"""Detect solver type from files and naming."""
# First check BDF file
bdf_file = self._find_bdf_file()
if bdf_file:
analysis = self._analyze_bdf(bdf_file)
if analysis and analysis.get("solver"):
return analysis["solver"]
# Infer from naming conventions
name_lower = self.file_path.name.lower()
parent_lower = str(self.study_dir).lower()
if 'modal' in name_lower or 'freq' in name_lower or 'modal' in parent_lower:
return 'SOL103'
elif 'static' in name_lower or 'stress' in name_lower:
return 'SOL101'
elif 'thermal' in name_lower or 'heat' in name_lower:
return 'SOL153'
elif 'dynamic' in name_lower:
return 'SOL111'
elif 'mirror' in parent_lower or 'wfe' in parent_lower:
return 'SOL101'
return 'SOL101' # Default
def _load_expressions_from_config(self) -> List[Dict[str, Any]]:
"""Load expressions from optimization_config.json if it exists."""
expressions = []
config_paths = [
self.study_dir / "optimization_config.json",
self.study_dir / "1_config" / "optimization_config.json",
self.parent_dir / "optimization_config.json",
]
for config_path in config_paths:
if config_path.exists():
try:
with open(config_path, 'r') as f:
config = json.load(f)
design_vars = config.get("design_variables", [])
for dv in design_vars:
expr_name = dv.get("name", dv.get("expression", "unknown"))
expr_min = dv.get("min", 0)
expr_max = dv.get("max", 100)
expressions.append({
"name": expr_name,
"value": (expr_min + expr_max) / 2,
"min": expr_min,
"max": expr_max,
"unit": dv.get("unit", "mm"),
"type": "design_variable",
"source": "config",
})
return expressions
except Exception as e:
logger.warning(f"Failed to load config: {e}")
return expressions
def _discover_from_study_history(self) -> List[Dict[str, Any]]:
"""Try to discover expressions from study database or previous trials."""
expressions = []
# Check study.db for parameter history
db_path = self.study_dir / "3_results" / "study.db"
if db_path.exists():
try:
import sqlite3
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Try Optuna schema first
cursor.execute("""
SELECT DISTINCT param_name, param_value
FROM trial_params
ORDER BY trial_id DESC
LIMIT 20
""")
rows = cursor.fetchall()
param_values: Dict[str, List[float]] = {}
for name, value in rows:
if name not in param_values:
param_values[name] = []
try:
param_values[name].append(float(value))
except (ValueError, TypeError):
pass
for name, values in param_values.items():
if values:
expressions.append({
"name": name,
"value": sum(values) / len(values),
"min": min(values),
"max": max(values),
"unit": "mm",
"type": "design_variable",
"source": "database",
})
conn.close()
except Exception as e:
logger.debug(f"Database query failed: {e}")
return expressions
def _discover_common_expressions(self) -> List[Dict[str, Any]]:
"""Discover common expressions based on study type."""
parent_lower = str(self.study_dir).lower()
if 'mirror' in parent_lower:
return [
{"name": "flatback_thickness", "value": 30.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "rib_height", "value": 40.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "rib_width", "value": 8.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "fillet_radius", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"},
]
elif 'bracket' in parent_lower:
return [
{"name": "thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "height", "value": 30.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "fillet_radius", "value": 3.0, "unit": "mm", "type": "dimension", "source": "template"},
]
elif 'beam' in parent_lower:
return [
{"name": "height", "value": 100.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "web_thickness", "value": 5.0, "unit": "mm", "type": "dimension", "source": "template"},
]
# Generic
return [
{"name": "thickness", "value": 10.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "length", "value": 100.0, "unit": "mm", "type": "dimension", "source": "template"},
{"name": "width", "value": 50.0, "unit": "mm", "type": "dimension", "source": "template"},
]
def _suggest_extractors(
self,
solver_type: Optional[str],
result_files: List[Dict[str, Any]],
op2_analysis: Optional[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Suggest extractors based on solver type and available data."""
extractors = [
{"id": "E4", "name": "Mass (BDF)", "description": "Extract mass from BDF file", "always": True, "available": True},
{"id": "E5", "name": "Mass (Expression)", "description": "Extract mass from NX expression", "always": True, "available": True},
]
# Determine availability based on OP2 analysis
has_displacements = op2_analysis.get("has_displacements", False) if op2_analysis else False
has_eigenvectors = op2_analysis.get("has_eigenvectors", False) if op2_analysis else False
has_stress = op2_analysis.get("has_solid_stress", False) or op2_analysis.get("has_plate_stress", False) if op2_analysis else False
has_results = len(result_files) > 0
if solver_type == 'SOL101' or has_displacements:
extractors.extend([
{
"id": "E1",
"name": "Displacement",
"description": "Max displacement from static analysis",
"always": False,
"available": has_displacements or has_results
},
{
"id": "E3",
"name": "Stress",
"description": "Von Mises stress from static analysis",
"always": False,
"available": has_stress or has_results
},
])
if solver_type == 'SOL103' or has_eigenvectors:
extractors.append({
"id": "E2",
"name": "Frequency",
"description": "Natural frequencies from modal analysis",
"always": False,
"available": has_eigenvectors or has_results
})
# Mirror-specific extractors
parent_lower = str(self.study_dir).lower()
if 'mirror' in parent_lower or 'wfe' in parent_lower:
extractors.extend([
{"id": "E8", "name": "Zernike Coefficients", "description": "Zernike polynomial coefficients from OP2", "always": False, "available": has_displacements},
{"id": "E9", "name": "Zernike CSV", "description": "Zernike from CSV export", "always": False, "available": True},
{"id": "E10", "name": "Zernike RMS WFE", "description": "RMS wavefront error calculation", "always": False, "available": True},
])
return extractors