""" Intelligent Setup System for Atomizer This module provides COMPLETE autonomy for optimization setup: 1. Solves ALL solutions in .sim file 2. Discovers all available results (eigenvalues, displacements, stresses, etc.) 3. Catalogs expressions and parameters 4. Matches workflow objectives to available results 5. Auto-selects correct solution for optimization 6. Generates optimized runner code This is the level of intelligence Atomizer should have. """ from pathlib import Path from typing import Dict, Any, List, Optional, Tuple import json from datetime import datetime class IntelligentSetup: """ Intelligent benchmarking and setup system. Proactively discovers EVERYTHING about a simulation: - All solutions (Static, Modal, Buckling, etc.) - All result types (displacements, stresses, eigenvalues, etc.) - All expressions and parameters - Matches user objectives to available data """ def __init__(self): self.project_root = Path(__file__).parent.parent def run_complete_benchmarking( self, prt_file: Path, sim_file: Path, workflow: Dict[str, Any] ) -> Dict[str, Any]: """ Run COMPLETE benchmarking: 1. Extract ALL expressions from .prt 2. Solve ALL solutions in .sim 3. Analyze ALL result files 4. Match objectives to available results 5. Determine optimal solution for each objective Returns: Complete catalog of available data and recommendations """ print() print("="*80) print(" INTELLIGENT SETUP - COMPLETE ANALYSIS") print("="*80) print() results = { 'success': False, 'expressions': {}, 'solutions': {}, 'available_results': {}, 'objective_mapping': {}, 'recommended_solution': None, 'errors': [] } try: # Phase 1: Extract ALL expressions print("[Phase 1/4] Extracting ALL expressions from model...") expressions = self._extract_all_expressions(prt_file) results['expressions'] = expressions print(f" [OK] Found {len(expressions)} expressions") for name, info in list(expressions.items())[:5]: val = info.get('value', 'N/A') units = info.get('units', '') print(f" - {name}: {val} {units}") if len(expressions) > 5: print(f" ... and {len(expressions) - 5} more") print() # Phase 2: Solve ALL solutions print("[Phase 2/4] Solving ALL solutions in .sim file...") solutions_info = self._solve_all_solutions(sim_file) results['solutions'] = solutions_info print(f" [OK] Solved {solutions_info['num_solved']} solutions") for sol_name in solutions_info['solution_names']: print(f" - {sol_name}") print() # Phase 3: Analyze ALL result files print("[Phase 3/4] Analyzing ALL result files...") available_results = self._analyze_all_results(sim_file.parent, solutions_info) results['available_results'] = available_results print(f" [OK] Found {len(available_results)} result files") for result_type, details in available_results.items(): print(f" - {result_type}: {details['count']} entries in {details['file']}") print() # Phase 4: Match objectives to results print("[Phase 4/4] Matching objectives to available results...") mapping = self._match_objectives_to_results(workflow, available_results, solutions_info) results['objective_mapping'] = mapping results['recommended_solution'] = mapping.get('primary_solution') print(f" [OK] Objective mapping complete") for obj_name, obj_info in mapping['objectives'].items(): print(f" - {obj_name}") print(f" Solution: {obj_info.get('solution', 'NONE')}") print(f" Result type: {obj_info.get('result_type', 'Unknown')}") print(f" Extractor: {obj_info.get('extractor', 'Unknown')}") if 'error' in obj_info: print(f" [WARNING] {obj_info['error']}") print() if mapping.get('primary_solution'): print(f" [RECOMMENDATION] Use solution: {mapping['primary_solution']}") print() results['success'] = True except Exception as e: results['errors'].append(str(e)) print(f" [ERROR] {e}") print() print("="*80) print(" ANALYSIS COMPLETE") print("="*80) print() return results def _extract_all_expressions(self, prt_file: Path) -> Dict[str, Any]: """Extract ALL expressions from .prt file.""" from optimization_engine.nx_updater import NXParameterUpdater updater = NXParameterUpdater(prt_file) return updater.get_all_expressions() def _solve_all_solutions(self, sim_file: Path) -> Dict[str, Any]: """ Solve ALL solutions in .sim file using NXOpen journal approach. CRITICAL: This method updates the .fem file from the .prt before solving! This is required when geometry changes (modal analysis, etc.) Returns dict with: - num_solved: int - num_failed: int - num_skipped: int - solution_names: List[str] """ # Create journal to solve all solutions journal_code = f''' import sys import NXOpen import NXOpen.CAE def main(args): if len(args) < 1: print("ERROR: No .sim file path provided") return False sim_file_path = args[0] theSession = NXOpen.Session.GetSession() # Open the .sim file print(f"[JOURNAL] Opening simulation: {{sim_file_path}}") basePart1, partLoadStatus1 = theSession.Parts.OpenActiveDisplay( sim_file_path, NXOpen.DisplayPartOption.AllowAdditional ) partLoadStatus1.Dispose() workSimPart = theSession.Parts.BaseWork print(f"[JOURNAL] Simulation opened successfully") # CRITICAL: Update FEM from master model (.prt) # This is required when geometry has changed (modal analysis, etc.) print("[JOURNAL] Updating FEM from master model...") simSimulation = workSimPart.Simulation # Get all FEModels and update them femModels = simSimulation.FemParts for i in range(femModels.Length): femPart = femModels.Item(i) print(f"[JOURNAL] Updating FEM: {{femPart.Name}}") # Update the FEM from associated CAD part femPart.UpdateFemodel() # Save after FEM update print("[JOURNAL] Saving after FEM update...") partSaveStatus = workSimPart.Save( NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue ) partSaveStatus.Dispose() # Get all solutions theCAESimSolveManager = NXOpen.CAE.SimSolveManager.GetSimSolveManager(theSession) # Solve all solutions print("[JOURNAL] Solving ALL solutions...") num_solved, num_failed, num_skipped = theCAESimSolveManager.SolveAllSolutions( NXOpen.CAE.SimSolution.SolveOption.Solve, NXOpen.CAE.SimSolution.SetupCheckOption.CompleteCheckAndOutputErrors, NXOpen.CAE.SimSolution.SolveMode.Foreground, False ) # Get solution names simSimulation = workSimPart.FindObject("Simulation") solutions = [] for obj in simSimulation.GetAllDescendents(): if "Solution[" in str(obj): solutions.append(str(obj)) # Save to write output files print("[JOURNAL] Saving simulation to write output files...") partSaveStatus = workSimPart.Save( NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue ) partSaveStatus.Dispose() # Output results print(f"ATOMIZER_SOLUTIONS_SOLVED: {{num_solved}}") print(f"ATOMIZER_SOLUTIONS_FAILED: {{num_failed}}") print(f"ATOMIZER_SOLUTIONS_SKIPPED: {{num_skipped}}") for sol in solutions: print(f"ATOMIZER_SOLUTION: {{sol}}") return True if __name__ == '__main__': success = main(sys.argv[1:]) sys.exit(0 if success else 1) ''' # Write and execute journal journal_path = sim_file.parent / "_solve_all_solutions.py" with open(journal_path, 'w') as f: f.write(journal_code) # Run journal via NX from optimization_engine.nx_solver import NXSolver solver = NXSolver() import subprocess from config import NX_RUN_JOURNAL result = subprocess.run( [str(NX_RUN_JOURNAL), str(journal_path), str(sim_file)], capture_output=True, text=True, timeout=600 ) # Parse output num_solved = 0 num_failed = 0 num_skipped = 0 solution_names = [] for line in result.stdout.split('\n'): if 'ATOMIZER_SOLUTIONS_SOLVED:' in line: num_solved = int(line.split(':')[1].strip()) elif 'ATOMIZER_SOLUTIONS_FAILED:' in line: num_failed = int(line.split(':')[1].strip()) elif 'ATOMIZER_SOLUTIONS_SKIPPED:' in line: num_skipped = int(line.split(':')[1].strip()) elif 'ATOMIZER_SOLUTION:' in line: sol_name = line.split(':', 1)[1].strip() solution_names.append(sol_name) # Clean up journal_path.unlink() return { 'num_solved': num_solved, 'num_failed': num_failed, 'num_skipped': num_skipped, 'solution_names': solution_names } def _analyze_all_results( self, model_dir: Path, solutions_info: Dict[str, Any] ) -> Dict[str, Any]: """ Analyze ALL .op2 files to discover available results. Returns dict mapping result types to details: { 'eigenvalues': {'file': 'xxx.op2', 'count': 10, 'solution': 'Modal'}, 'displacements': {'file': 'yyy.op2', 'count': 613, 'solution': 'Static'}, 'stress_quad4': {'file': 'yyy.op2', 'count': 561, 'solution': 'Static'}, ... } """ from pyNastran.op2.op2 import OP2 available = {} # Find all .op2 files op2_files = list(model_dir.glob("*.op2")) for op2_file in op2_files: try: model = OP2() model.read_op2(str(op2_file)) # Check for eigenvalues if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0: subcase = list(model.eigenvalues.keys())[0] eig_obj = model.eigenvalues[subcase] available['eigenvalues'] = { 'file': op2_file.name, 'count': len(eig_obj.eigenvalues), 'solution': self._guess_solution_from_filename(op2_file.name), 'op2_path': op2_file } # Check for displacements if hasattr(model, 'displacements') and len(model.displacements) > 0: subcase = list(model.displacements.keys())[0] disp_obj = model.displacements[subcase] available['displacements'] = { 'file': op2_file.name, 'count': disp_obj.data.shape[1], # Number of nodes 'solution': self._guess_solution_from_filename(op2_file.name), 'op2_path': op2_file } # Check for stresses if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0: subcase = list(model.cquad4_stress.keys())[0] stress_obj = model.cquad4_stress[subcase] available['stress_quad4'] = { 'file': op2_file.name, 'count': stress_obj.data.shape[1], # Number of elements 'solution': self._guess_solution_from_filename(op2_file.name), 'op2_path': op2_file } # Check for forces if hasattr(model, 'cquad4_force') and len(model.cquad4_force) > 0: available['force_quad4'] = { 'file': op2_file.name, 'count': len(model.cquad4_force), 'solution': self._guess_solution_from_filename(op2_file.name), 'op2_path': op2_file } except Exception as e: print(f" [WARNING] Could not analyze {op2_file.name}: {e}") return available def _guess_solution_from_filename(self, filename: str) -> str: """Guess solution type from filename.""" filename_lower = filename.lower() if 'normal_modes' in filename_lower or 'modal' in filename_lower: return 'Solution_Normal_Modes' elif 'buckling' in filename_lower: return 'Solution_Buckling' elif 'static' in filename_lower or 'solution_1' in filename_lower: return 'Solution_1' else: return 'Unknown' def _match_objectives_to_results( self, workflow: Dict[str, Any], available_results: Dict[str, Any], solutions_info: Dict[str, Any] ) -> Dict[str, Any]: """ Intelligently match workflow objectives to available results. Returns: { 'objectives': { 'obj_name': { 'solution': 'Solution_Normal_Modes', 'result_type': 'eigenvalues', 'extractor': 'extract_first_frequency', 'op2_file': Path(...) } }, 'primary_solution': 'Solution_Normal_Modes' # Most important solution } """ mapping = { 'objectives': {}, 'primary_solution': None } for obj in workflow.get('objectives', []): obj_name = obj.get('name', 'unnamed') extraction = obj.get('extraction', {}) action = extraction.get('action', '').lower() # Match based on objective type if 'frequency' in action or 'eigenvalue' in action or 'modal' in action: if 'eigenvalues' in available_results: result_info = available_results['eigenvalues'] mapping['objectives'][obj_name] = { 'solution': result_info['solution'], 'result_type': 'eigenvalues', 'extractor': 'extract_first_frequency', 'op2_file': result_info['op2_path'], 'match_confidence': 'HIGH' } if not mapping['primary_solution']: mapping['primary_solution'] = result_info['solution'] else: mapping['objectives'][obj_name] = { 'solution': 'NONE', 'result_type': 'eigenvalues', 'extractor': 'extract_first_frequency', 'op2_file': None, 'match_confidence': 'ERROR', 'error': 'No eigenvalue results found - check if modal solution exists' } elif 'displacement' in action or 'deflection' in action: if 'displacements' in available_results: result_info = available_results['displacements'] mapping['objectives'][obj_name] = { 'solution': result_info['solution'], 'result_type': 'displacements', 'extractor': 'extract_max_displacement', 'op2_file': result_info['op2_path'], 'match_confidence': 'HIGH' } if not mapping['primary_solution']: mapping['primary_solution'] = result_info['solution'] elif 'stress' in action or 'von_mises' in action: if 'stress_quad4' in available_results: result_info = available_results['stress_quad4'] mapping['objectives'][obj_name] = { 'solution': result_info['solution'], 'result_type': 'stress', 'extractor': 'extract_max_stress', 'op2_file': result_info['op2_path'], 'match_confidence': 'HIGH' } if not mapping['primary_solution']: mapping['primary_solution'] = result_info['solution'] return mapping def generate_intelligent_runner( self, study_dir: Path, workflow: Dict[str, Any], benchmark_results: Dict[str, Any] ) -> Path: """ Generate optimized runner based on intelligent analysis. Uses benchmark results to: 1. Select correct solution to solve 2. Generate correct extractors 3. Optimize for speed (only solve what's needed) """ runner_path = study_dir / "run_optimization.py" # Get recommended solution recommended_solution = benchmark_results.get('recommended_solution', 'Solution_1') objective_mapping = benchmark_results.get('objective_mapping', {}) # Generate extractor functions based on actual available results extractor_code = self._generate_intelligent_extractors(objective_mapping) runner_code = f'''""" Auto-generated INTELLIGENT optimization runner Created: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Intelligently configured based on complete benchmarking: - Solution: {recommended_solution} - Extractors: Auto-matched to available results """ import sys from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) import json import optuna from optimization_engine.nx_updater import NXParameterUpdater from optimization_engine.nx_solver import NXSolver {extractor_code} def main(): print("="*80) print(" {workflow.get('study_name', 'OPTIMIZATION').upper()}") print(" Intelligent Setup - Auto-configured") print("="*80) print() # Load workflow config_file = Path(__file__).parent / "1_setup/workflow_config.json" with open(config_file) as f: workflow = json.load(f) print("Configuration:") print(f" Target solution: {recommended_solution}") print(f" Objectives: {len(workflow.get('objectives', []))}") print(f" Variables: {len(workflow.get('design_variables', []))}") print() # Setup paths model_dir = Path(__file__).parent / "1_setup/model" prt_file = list(model_dir.glob("*.prt"))[0] sim_file = list(model_dir.glob("*.sim"))[0] output_dir = Path(__file__).parent / "2_substudies/results" output_dir.mkdir(parents=True, exist_ok=True) # Initialize updater = NXParameterUpdater(prt_file) solver = NXSolver() # Create Optuna study study_name = "{workflow.get('study_name', 'optimization')}" storage = f"sqlite:///{{output_dir / 'study.db'}}" study = optuna.create_study( study_name=study_name, storage=storage, load_if_exists=True, direction="minimize" ) def objective(trial): # Sample design variables params = {{}} for var in workflow['design_variables']: name = var['parameter'] bounds = var['bounds'] params[name] = trial.suggest_float(name, bounds[0], bounds[1]) print(f"\\nTrial {{trial.number}}:") for name, value in params.items(): print(f" {{name}} = {{value:.2f}}") # Update model updater.update_expressions(params) # Run SPECIFIC solution (optimized - only what's needed) result = solver.run_simulation( sim_file, solution_name="{recommended_solution}" ) if not result['success']: raise RuntimeError(f"Simulation failed: {{result.get('errors', 'Unknown')}}") op2_file = result['op2_file'] # Extract results results = extract_results(op2_file, workflow) # Print results for name, value in results.items(): print(f" {{name}} = {{value:.4f}}") # Calculate objective obj_config = workflow['objectives'][0] result_name = list(results.keys())[0] if obj_config['goal'] == 'minimize': objective_value = results[result_name] else: objective_value = -results[result_name] print(f" Objective = {{objective_value:.4f}}") return objective_value # Run optimization n_trials = 10 print(f"\\nRunning {{n_trials}} trials...") print("="*80) print() study.optimize(objective, n_trials=n_trials) # Results print() print("="*80) print(" OPTIMIZATION COMPLETE") print("="*80) print() print(f"Best trial: #{{study.best_trial.number}}") for name, value in study.best_params.items(): print(f" {{name}} = {{value:.2f}}") print(f"\\nBest objective = {{study.best_value:.4f}}") print() if __name__ == "__main__": main() ''' with open(runner_path, 'w') as f: f.write(runner_code) return runner_path def _generate_intelligent_extractors(self, objective_mapping: Dict[str, Any]) -> str: """Generate extractor functions based on intelligent mapping.""" extractors = set() for obj_name, obj_info in objective_mapping.get('objectives', {}).items(): if 'extractor' in obj_info: extractors.add(obj_info['extractor']) code = ''' def extract_results(op2_file, workflow): """Intelligently extract results based on benchmarking.""" from pyNastran.op2.op2 import OP2 import numpy as np model = OP2() model.read_op2(str(op2_file)) results = {} ''' if 'extract_first_frequency' in extractors: code += ''' # Extract first frequency (auto-matched to eigenvalues) if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0: subcase = list(model.eigenvalues.keys())[0] eig_obj = model.eigenvalues[subcase] eigenvalue = eig_obj.eigenvalues[0] angular_freq = np.sqrt(eigenvalue) frequency_hz = angular_freq / (2 * np.pi) results['first_frequency'] = float(frequency_hz) ''' if 'extract_max_displacement' in extractors: code += ''' # Extract max displacement (auto-matched to displacements) if hasattr(model, 'displacements') and len(model.displacements) > 0: subcase = list(model.displacements.keys())[0] disp_obj = model.displacements[subcase] translations = disp_obj.data[0, :, :3] magnitudes = np.linalg.norm(translations, axis=1) results['max_displacement'] = float(np.max(magnitudes)) ''' if 'extract_max_stress' in extractors: code += ''' # Extract max stress (auto-matched to stress results) if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0: subcase = list(model.cquad4_stress.keys())[0] stress_obj = model.cquad4_stress[subcase] von_mises = stress_obj.data[0, :, 7] results['max_stress'] = float(np.max(von_mises)) ''' code += ''' return results ''' return code if __name__ == "__main__": # Example usage setup = IntelligentSetup() # Run complete analysis results = setup.run_complete_benchmarking( prt_file=Path("path/to/model.prt"), sim_file=Path("path/to/model.sim"), workflow={'objectives': [{'name': 'freq', 'extraction': {'action': 'extract_frequency'}}]} ) print("Analysis complete:") print(json.dumps(results, indent=2, default=str))