- Add validation framework (config, model, results, study validators) - Add Claude Code skills (create-study, run-optimization, generate-report, troubleshoot, analyze-model) - Add Atomizer Dashboard (React frontend + FastAPI backend) - Reorganize docs into structured directories (00-09) - Add neural surrogate modules and training infrastructure - Add multi-objective optimization support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
695 lines
24 KiB
Python
695 lines
24 KiB
Python
"""
|
|
Intelligent Setup System for Atomizer
|
|
|
|
This module provides COMPLETE autonomy for optimization setup:
|
|
1. Solves ALL solutions in .sim file
|
|
2. Discovers all available results (eigenvalues, displacements, stresses, etc.)
|
|
3. Catalogs expressions and parameters
|
|
4. Matches workflow objectives to available results
|
|
5. Auto-selects correct solution for optimization
|
|
6. Generates optimized runner code
|
|
|
|
This is the level of intelligence Atomizer should have.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
import json
|
|
from datetime import datetime
|
|
|
|
|
|
class IntelligentSetup:
|
|
"""
|
|
Intelligent benchmarking and setup system.
|
|
|
|
Proactively discovers EVERYTHING about a simulation:
|
|
- All solutions (Static, Modal, Buckling, etc.)
|
|
- All result types (displacements, stresses, eigenvalues, etc.)
|
|
- All expressions and parameters
|
|
- Matches user objectives to available data
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.project_root = Path(__file__).parent.parent
|
|
|
|
def run_complete_benchmarking(
|
|
self,
|
|
prt_file: Path,
|
|
sim_file: Path,
|
|
workflow: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run COMPLETE benchmarking:
|
|
1. Extract ALL expressions from .prt
|
|
2. Solve ALL solutions in .sim
|
|
3. Analyze ALL result files
|
|
4. Match objectives to available results
|
|
5. Determine optimal solution for each objective
|
|
|
|
Returns:
|
|
Complete catalog of available data and recommendations
|
|
"""
|
|
print()
|
|
print("="*80)
|
|
print(" INTELLIGENT SETUP - COMPLETE ANALYSIS")
|
|
print("="*80)
|
|
print()
|
|
|
|
results = {
|
|
'success': False,
|
|
'expressions': {},
|
|
'solutions': {},
|
|
'available_results': {},
|
|
'objective_mapping': {},
|
|
'recommended_solution': None,
|
|
'errors': []
|
|
}
|
|
|
|
try:
|
|
# Phase 1: Extract ALL expressions
|
|
print("[Phase 1/4] Extracting ALL expressions from model...")
|
|
expressions = self._extract_all_expressions(prt_file)
|
|
results['expressions'] = expressions
|
|
print(f" [OK] Found {len(expressions)} expressions")
|
|
for name, info in list(expressions.items())[:5]:
|
|
val = info.get('value', 'N/A')
|
|
units = info.get('units', '')
|
|
print(f" - {name}: {val} {units}")
|
|
if len(expressions) > 5:
|
|
print(f" ... and {len(expressions) - 5} more")
|
|
print()
|
|
|
|
# Phase 2: Solve ALL solutions
|
|
print("[Phase 2/4] Solving ALL solutions in .sim file...")
|
|
solutions_info = self._solve_all_solutions(sim_file)
|
|
results['solutions'] = solutions_info
|
|
print(f" [OK] Solved {solutions_info['num_solved']} solutions")
|
|
for sol_name in solutions_info['solution_names']:
|
|
print(f" - {sol_name}")
|
|
print()
|
|
|
|
# Phase 3: Analyze ALL result files
|
|
print("[Phase 3/4] Analyzing ALL result files...")
|
|
available_results = self._analyze_all_results(sim_file.parent, solutions_info)
|
|
results['available_results'] = available_results
|
|
|
|
print(f" [OK] Found {len(available_results)} result files")
|
|
for result_type, details in available_results.items():
|
|
print(f" - {result_type}: {details['count']} entries in {details['file']}")
|
|
print()
|
|
|
|
# Phase 4: Match objectives to results
|
|
print("[Phase 4/4] Matching objectives to available results...")
|
|
mapping = self._match_objectives_to_results(workflow, available_results, solutions_info)
|
|
results['objective_mapping'] = mapping
|
|
results['recommended_solution'] = mapping.get('primary_solution')
|
|
|
|
print(f" [OK] Objective mapping complete")
|
|
for obj_name, obj_info in mapping['objectives'].items():
|
|
print(f" - {obj_name}")
|
|
print(f" Solution: {obj_info.get('solution', 'NONE')}")
|
|
print(f" Result type: {obj_info.get('result_type', 'Unknown')}")
|
|
print(f" Extractor: {obj_info.get('extractor', 'Unknown')}")
|
|
if 'error' in obj_info:
|
|
print(f" [WARNING] {obj_info['error']}")
|
|
print()
|
|
|
|
if mapping.get('primary_solution'):
|
|
print(f" [RECOMMENDATION] Use solution: {mapping['primary_solution']}")
|
|
print()
|
|
|
|
results['success'] = True
|
|
|
|
except Exception as e:
|
|
results['errors'].append(str(e))
|
|
print(f" [ERROR] {e}")
|
|
print()
|
|
|
|
print("="*80)
|
|
print(" ANALYSIS COMPLETE")
|
|
print("="*80)
|
|
print()
|
|
|
|
return results
|
|
|
|
def _extract_all_expressions(self, prt_file: Path) -> Dict[str, Any]:
|
|
"""Extract ALL expressions from .prt file."""
|
|
from optimization_engine.nx_updater import NXParameterUpdater
|
|
|
|
updater = NXParameterUpdater(prt_file)
|
|
return updater.get_all_expressions()
|
|
|
|
def _solve_all_solutions(self, sim_file: Path) -> Dict[str, Any]:
|
|
"""
|
|
Solve ALL solutions in .sim file using NXOpen journal approach.
|
|
|
|
CRITICAL: This method updates the .fem file from the .prt before solving!
|
|
This is required when geometry changes (modal analysis, etc.)
|
|
|
|
Returns dict with:
|
|
- num_solved: int
|
|
- num_failed: int
|
|
- num_skipped: int
|
|
- solution_names: List[str]
|
|
"""
|
|
# Create journal to solve all solutions
|
|
journal_code = f'''
|
|
import sys
|
|
import NXOpen
|
|
import NXOpen.CAE
|
|
|
|
def main(args):
|
|
if len(args) < 1:
|
|
print("ERROR: No .sim file path provided")
|
|
return False
|
|
|
|
sim_file_path = args[0]
|
|
|
|
theSession = NXOpen.Session.GetSession()
|
|
|
|
# Open the .sim file
|
|
print(f"[JOURNAL] Opening simulation: {{sim_file_path}}")
|
|
basePart1, partLoadStatus1 = theSession.Parts.OpenActiveDisplay(
|
|
sim_file_path,
|
|
NXOpen.DisplayPartOption.AllowAdditional
|
|
)
|
|
partLoadStatus1.Dispose()
|
|
|
|
workSimPart = theSession.Parts.BaseWork
|
|
print(f"[JOURNAL] Simulation opened successfully")
|
|
|
|
# CRITICAL: Update FEM from master model (.prt)
|
|
# This is required when geometry has changed (modal analysis, etc.)
|
|
print("[JOURNAL] Updating FEM from master model...")
|
|
simSimulation = workSimPart.Simulation
|
|
|
|
# Get all FEModels and update them
|
|
femModels = simSimulation.FemParts
|
|
for i in range(femModels.Length):
|
|
femPart = femModels.Item(i)
|
|
print(f"[JOURNAL] Updating FEM: {{femPart.Name}}")
|
|
|
|
# Update the FEM from associated CAD part
|
|
femPart.UpdateFemodel()
|
|
|
|
# Save after FEM update
|
|
print("[JOURNAL] Saving after FEM update...")
|
|
partSaveStatus = workSimPart.Save(
|
|
NXOpen.BasePart.SaveComponents.TrueValue,
|
|
NXOpen.BasePart.CloseAfterSave.FalseValue
|
|
)
|
|
partSaveStatus.Dispose()
|
|
|
|
# Get all solutions
|
|
theCAESimSolveManager = NXOpen.CAE.SimSolveManager.GetSimSolveManager(theSession)
|
|
|
|
# Solve all solutions
|
|
print("[JOURNAL] Solving ALL solutions...")
|
|
num_solved, num_failed, num_skipped = theCAESimSolveManager.SolveAllSolutions(
|
|
NXOpen.CAE.SimSolution.SolveOption.Solve,
|
|
NXOpen.CAE.SimSolution.SetupCheckOption.CompleteCheckAndOutputErrors,
|
|
NXOpen.CAE.SimSolution.SolveMode.Foreground,
|
|
False
|
|
)
|
|
|
|
# Get solution names
|
|
simSimulation = workSimPart.FindObject("Simulation")
|
|
solutions = []
|
|
for obj in simSimulation.GetAllDescendents():
|
|
if "Solution[" in str(obj):
|
|
solutions.append(str(obj))
|
|
|
|
# Save to write output files
|
|
print("[JOURNAL] Saving simulation to write output files...")
|
|
partSaveStatus = workSimPart.Save(
|
|
NXOpen.BasePart.SaveComponents.TrueValue,
|
|
NXOpen.BasePart.CloseAfterSave.FalseValue
|
|
)
|
|
partSaveStatus.Dispose()
|
|
|
|
# Output results
|
|
print(f"ATOMIZER_SOLUTIONS_SOLVED: {{num_solved}}")
|
|
print(f"ATOMIZER_SOLUTIONS_FAILED: {{num_failed}}")
|
|
print(f"ATOMIZER_SOLUTIONS_SKIPPED: {{num_skipped}}")
|
|
for sol in solutions:
|
|
print(f"ATOMIZER_SOLUTION: {{sol}}")
|
|
|
|
return True
|
|
|
|
if __name__ == '__main__':
|
|
success = main(sys.argv[1:])
|
|
sys.exit(0 if success else 1)
|
|
'''
|
|
|
|
# Write and execute journal
|
|
journal_path = sim_file.parent / "_solve_all_solutions.py"
|
|
with open(journal_path, 'w') as f:
|
|
f.write(journal_code)
|
|
|
|
# Run journal via NX
|
|
from optimization_engine.nx_solver import NXSolver
|
|
solver = NXSolver()
|
|
|
|
import subprocess
|
|
from config import NX_RUN_JOURNAL
|
|
|
|
result = subprocess.run(
|
|
[str(NX_RUN_JOURNAL), str(journal_path), str(sim_file)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=600
|
|
)
|
|
|
|
# Parse output
|
|
num_solved = 0
|
|
num_failed = 0
|
|
num_skipped = 0
|
|
solution_names = []
|
|
|
|
for line in result.stdout.split('\n'):
|
|
if 'ATOMIZER_SOLUTIONS_SOLVED:' in line:
|
|
num_solved = int(line.split(':')[1].strip())
|
|
elif 'ATOMIZER_SOLUTIONS_FAILED:' in line:
|
|
num_failed = int(line.split(':')[1].strip())
|
|
elif 'ATOMIZER_SOLUTIONS_SKIPPED:' in line:
|
|
num_skipped = int(line.split(':')[1].strip())
|
|
elif 'ATOMIZER_SOLUTION:' in line:
|
|
sol_name = line.split(':', 1)[1].strip()
|
|
solution_names.append(sol_name)
|
|
|
|
# Clean up
|
|
journal_path.unlink()
|
|
|
|
return {
|
|
'num_solved': num_solved,
|
|
'num_failed': num_failed,
|
|
'num_skipped': num_skipped,
|
|
'solution_names': solution_names
|
|
}
|
|
|
|
def _analyze_all_results(
|
|
self,
|
|
model_dir: Path,
|
|
solutions_info: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Analyze ALL .op2 files to discover available results.
|
|
|
|
Returns dict mapping result types to details:
|
|
{
|
|
'eigenvalues': {'file': 'xxx.op2', 'count': 10, 'solution': 'Modal'},
|
|
'displacements': {'file': 'yyy.op2', 'count': 613, 'solution': 'Static'},
|
|
'stress_quad4': {'file': 'yyy.op2', 'count': 561, 'solution': 'Static'},
|
|
...
|
|
}
|
|
"""
|
|
from pyNastran.op2.op2 import OP2
|
|
|
|
available = {}
|
|
|
|
# Find all .op2 files
|
|
op2_files = list(model_dir.glob("*.op2"))
|
|
|
|
for op2_file in op2_files:
|
|
try:
|
|
model = OP2()
|
|
model.read_op2(str(op2_file))
|
|
|
|
# Check for eigenvalues
|
|
if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0:
|
|
subcase = list(model.eigenvalues.keys())[0]
|
|
eig_obj = model.eigenvalues[subcase]
|
|
available['eigenvalues'] = {
|
|
'file': op2_file.name,
|
|
'count': len(eig_obj.eigenvalues),
|
|
'solution': self._guess_solution_from_filename(op2_file.name),
|
|
'op2_path': op2_file
|
|
}
|
|
|
|
# Check for displacements
|
|
if hasattr(model, 'displacements') and len(model.displacements) > 0:
|
|
subcase = list(model.displacements.keys())[0]
|
|
disp_obj = model.displacements[subcase]
|
|
available['displacements'] = {
|
|
'file': op2_file.name,
|
|
'count': disp_obj.data.shape[1], # Number of nodes
|
|
'solution': self._guess_solution_from_filename(op2_file.name),
|
|
'op2_path': op2_file
|
|
}
|
|
|
|
# Check for stresses
|
|
if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0:
|
|
subcase = list(model.cquad4_stress.keys())[0]
|
|
stress_obj = model.cquad4_stress[subcase]
|
|
available['stress_quad4'] = {
|
|
'file': op2_file.name,
|
|
'count': stress_obj.data.shape[1], # Number of elements
|
|
'solution': self._guess_solution_from_filename(op2_file.name),
|
|
'op2_path': op2_file
|
|
}
|
|
|
|
# Check for forces
|
|
if hasattr(model, 'cquad4_force') and len(model.cquad4_force) > 0:
|
|
available['force_quad4'] = {
|
|
'file': op2_file.name,
|
|
'count': len(model.cquad4_force),
|
|
'solution': self._guess_solution_from_filename(op2_file.name),
|
|
'op2_path': op2_file
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f" [WARNING] Could not analyze {op2_file.name}: {e}")
|
|
|
|
return available
|
|
|
|
def _guess_solution_from_filename(self, filename: str) -> str:
|
|
"""Guess solution type from filename."""
|
|
filename_lower = filename.lower()
|
|
if 'normal_modes' in filename_lower or 'modal' in filename_lower:
|
|
return 'Solution_Normal_Modes'
|
|
elif 'buckling' in filename_lower:
|
|
return 'Solution_Buckling'
|
|
elif 'static' in filename_lower or 'solution_1' in filename_lower:
|
|
return 'Solution_1'
|
|
else:
|
|
return 'Unknown'
|
|
|
|
def _match_objectives_to_results(
|
|
self,
|
|
workflow: Dict[str, Any],
|
|
available_results: Dict[str, Any],
|
|
solutions_info: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Intelligently match workflow objectives to available results.
|
|
|
|
Returns:
|
|
{
|
|
'objectives': {
|
|
'obj_name': {
|
|
'solution': 'Solution_Normal_Modes',
|
|
'result_type': 'eigenvalues',
|
|
'extractor': 'extract_first_frequency',
|
|
'op2_file': Path(...)
|
|
}
|
|
},
|
|
'primary_solution': 'Solution_Normal_Modes' # Most important solution
|
|
}
|
|
"""
|
|
mapping = {
|
|
'objectives': {},
|
|
'primary_solution': None
|
|
}
|
|
|
|
for obj in workflow.get('objectives', []):
|
|
obj_name = obj.get('name', 'unnamed')
|
|
extraction = obj.get('extraction', {})
|
|
action = extraction.get('action', '').lower()
|
|
|
|
# Match based on objective type
|
|
if 'frequency' in action or 'eigenvalue' in action or 'modal' in action:
|
|
if 'eigenvalues' in available_results:
|
|
result_info = available_results['eigenvalues']
|
|
mapping['objectives'][obj_name] = {
|
|
'solution': result_info['solution'],
|
|
'result_type': 'eigenvalues',
|
|
'extractor': 'extract_first_frequency',
|
|
'op2_file': result_info['op2_path'],
|
|
'match_confidence': 'HIGH'
|
|
}
|
|
if not mapping['primary_solution']:
|
|
mapping['primary_solution'] = result_info['solution']
|
|
else:
|
|
mapping['objectives'][obj_name] = {
|
|
'solution': 'NONE',
|
|
'result_type': 'eigenvalues',
|
|
'extractor': 'extract_first_frequency',
|
|
'op2_file': None,
|
|
'match_confidence': 'ERROR',
|
|
'error': 'No eigenvalue results found - check if modal solution exists'
|
|
}
|
|
|
|
elif 'displacement' in action or 'deflection' in action:
|
|
if 'displacements' in available_results:
|
|
result_info = available_results['displacements']
|
|
mapping['objectives'][obj_name] = {
|
|
'solution': result_info['solution'],
|
|
'result_type': 'displacements',
|
|
'extractor': 'extract_max_displacement',
|
|
'op2_file': result_info['op2_path'],
|
|
'match_confidence': 'HIGH'
|
|
}
|
|
if not mapping['primary_solution']:
|
|
mapping['primary_solution'] = result_info['solution']
|
|
|
|
elif 'stress' in action or 'von_mises' in action:
|
|
if 'stress_quad4' in available_results:
|
|
result_info = available_results['stress_quad4']
|
|
mapping['objectives'][obj_name] = {
|
|
'solution': result_info['solution'],
|
|
'result_type': 'stress',
|
|
'extractor': 'extract_max_stress',
|
|
'op2_file': result_info['op2_path'],
|
|
'match_confidence': 'HIGH'
|
|
}
|
|
if not mapping['primary_solution']:
|
|
mapping['primary_solution'] = result_info['solution']
|
|
|
|
return mapping
|
|
|
|
def generate_intelligent_runner(
|
|
self,
|
|
study_dir: Path,
|
|
workflow: Dict[str, Any],
|
|
benchmark_results: Dict[str, Any]
|
|
) -> Path:
|
|
"""
|
|
Generate optimized runner based on intelligent analysis.
|
|
|
|
Uses benchmark results to:
|
|
1. Select correct solution to solve
|
|
2. Generate correct extractors
|
|
3. Optimize for speed (only solve what's needed)
|
|
"""
|
|
runner_path = study_dir / "run_optimization.py"
|
|
|
|
# Get recommended solution
|
|
recommended_solution = benchmark_results.get('recommended_solution', 'Solution_1')
|
|
objective_mapping = benchmark_results.get('objective_mapping', {})
|
|
|
|
# Generate extractor functions based on actual available results
|
|
extractor_code = self._generate_intelligent_extractors(objective_mapping)
|
|
|
|
runner_code = f'''"""
|
|
Auto-generated INTELLIGENT optimization runner
|
|
Created: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
|
|
Intelligently configured based on complete benchmarking:
|
|
- Solution: {recommended_solution}
|
|
- Extractors: Auto-matched to available results
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
import json
|
|
import optuna
|
|
from optimization_engine.nx_updater import NXParameterUpdater
|
|
from optimization_engine.nx_solver import NXSolver
|
|
|
|
|
|
{extractor_code}
|
|
|
|
|
|
def main():
|
|
print("="*80)
|
|
print(" {workflow.get('study_name', 'OPTIMIZATION').upper()}")
|
|
print(" Intelligent Setup - Auto-configured")
|
|
print("="*80)
|
|
print()
|
|
|
|
# Load workflow
|
|
config_file = Path(__file__).parent / "1_setup/workflow_config.json"
|
|
with open(config_file) as f:
|
|
workflow = json.load(f)
|
|
|
|
print("Configuration:")
|
|
print(f" Target solution: {recommended_solution}")
|
|
print(f" Objectives: {len(workflow.get('objectives', []))}")
|
|
print(f" Variables: {len(workflow.get('design_variables', []))}")
|
|
print()
|
|
|
|
# Setup paths
|
|
model_dir = Path(__file__).parent / "1_setup/model"
|
|
prt_file = list(model_dir.glob("*.prt"))[0]
|
|
sim_file = list(model_dir.glob("*.sim"))[0]
|
|
output_dir = Path(__file__).parent / "2_substudies/results"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize
|
|
updater = NXParameterUpdater(prt_file)
|
|
solver = NXSolver()
|
|
|
|
# Create Optuna study
|
|
study_name = "{workflow.get('study_name', 'optimization')}"
|
|
storage = f"sqlite:///{{output_dir / 'study.db'}}"
|
|
study = optuna.create_study(
|
|
study_name=study_name,
|
|
storage=storage,
|
|
load_if_exists=True,
|
|
direction="minimize"
|
|
)
|
|
|
|
def objective(trial):
|
|
# Sample design variables
|
|
params = {{}}
|
|
for var in workflow['design_variables']:
|
|
name = var['parameter']
|
|
bounds = var['bounds']
|
|
params[name] = trial.suggest_float(name, bounds[0], bounds[1])
|
|
|
|
print(f"\\nTrial {{trial.number}}:")
|
|
for name, value in params.items():
|
|
print(f" {{name}} = {{value:.2f}}")
|
|
|
|
# Update model
|
|
updater.update_expressions(params)
|
|
|
|
# Run SPECIFIC solution (optimized - only what's needed)
|
|
result = solver.run_simulation(
|
|
sim_file,
|
|
solution_name="{recommended_solution}"
|
|
)
|
|
if not result['success']:
|
|
raise RuntimeError(f"Simulation failed: {{result.get('errors', 'Unknown')}}")
|
|
|
|
op2_file = result['op2_file']
|
|
|
|
# Extract results
|
|
results = extract_results(op2_file, workflow)
|
|
|
|
# Print results
|
|
for name, value in results.items():
|
|
print(f" {{name}} = {{value:.4f}}")
|
|
|
|
# Calculate objective
|
|
obj_config = workflow['objectives'][0]
|
|
result_name = list(results.keys())[0]
|
|
|
|
if obj_config['goal'] == 'minimize':
|
|
objective_value = results[result_name]
|
|
else:
|
|
objective_value = -results[result_name]
|
|
|
|
print(f" Objective = {{objective_value:.4f}}")
|
|
|
|
return objective_value
|
|
|
|
# Run optimization
|
|
n_trials = 10
|
|
print(f"\\nRunning {{n_trials}} trials...")
|
|
print("="*80)
|
|
print()
|
|
|
|
study.optimize(objective, n_trials=n_trials)
|
|
|
|
# Results
|
|
print()
|
|
print("="*80)
|
|
print(" OPTIMIZATION COMPLETE")
|
|
print("="*80)
|
|
print()
|
|
print(f"Best trial: #{{study.best_trial.number}}")
|
|
for name, value in study.best_params.items():
|
|
print(f" {{name}} = {{value:.2f}}")
|
|
print(f"\\nBest objective = {{study.best_value:.4f}}")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
'''
|
|
|
|
with open(runner_path, 'w') as f:
|
|
f.write(runner_code)
|
|
|
|
return runner_path
|
|
|
|
def _generate_intelligent_extractors(self, objective_mapping: Dict[str, Any]) -> str:
|
|
"""Generate extractor functions based on intelligent mapping."""
|
|
|
|
extractors = set()
|
|
for obj_name, obj_info in objective_mapping.get('objectives', {}).items():
|
|
if 'extractor' in obj_info:
|
|
extractors.add(obj_info['extractor'])
|
|
|
|
code = '''
|
|
def extract_results(op2_file, workflow):
|
|
"""Intelligently extract results based on benchmarking."""
|
|
from pyNastran.op2.op2 import OP2
|
|
import numpy as np
|
|
|
|
model = OP2()
|
|
model.read_op2(str(op2_file))
|
|
|
|
results = {}
|
|
'''
|
|
|
|
if 'extract_first_frequency' in extractors:
|
|
code += '''
|
|
# Extract first frequency (auto-matched to eigenvalues)
|
|
if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0:
|
|
subcase = list(model.eigenvalues.keys())[0]
|
|
eig_obj = model.eigenvalues[subcase]
|
|
eigenvalue = eig_obj.eigenvalues[0]
|
|
angular_freq = np.sqrt(eigenvalue)
|
|
frequency_hz = angular_freq / (2 * np.pi)
|
|
results['first_frequency'] = float(frequency_hz)
|
|
'''
|
|
|
|
if 'extract_max_displacement' in extractors:
|
|
code += '''
|
|
# Extract max displacement (auto-matched to displacements)
|
|
if hasattr(model, 'displacements') and len(model.displacements) > 0:
|
|
subcase = list(model.displacements.keys())[0]
|
|
disp_obj = model.displacements[subcase]
|
|
translations = disp_obj.data[0, :, :3]
|
|
magnitudes = np.linalg.norm(translations, axis=1)
|
|
results['max_displacement'] = float(np.max(magnitudes))
|
|
'''
|
|
|
|
if 'extract_max_stress' in extractors:
|
|
code += '''
|
|
# Extract max stress (auto-matched to stress results)
|
|
if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0:
|
|
subcase = list(model.cquad4_stress.keys())[0]
|
|
stress_obj = model.cquad4_stress[subcase]
|
|
von_mises = stress_obj.data[0, :, 7]
|
|
results['max_stress'] = float(np.max(von_mises))
|
|
'''
|
|
|
|
code += '''
|
|
return results
|
|
'''
|
|
|
|
return code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Example usage
|
|
setup = IntelligentSetup()
|
|
|
|
# Run complete analysis
|
|
results = setup.run_complete_benchmarking(
|
|
prt_file=Path("path/to/model.prt"),
|
|
sim_file=Path("path/to/model.sim"),
|
|
workflow={'objectives': [{'name': 'freq', 'extraction': {'action': 'extract_frequency'}}]}
|
|
)
|
|
|
|
print("Analysis complete:")
|
|
print(json.dumps(results, indent=2, default=str))
|