Files
Atomizer/optimization_engine/intelligent_setup.py
Anto01 e3bdb08a22 feat: Major update with validators, skills, dashboard, and docs reorganization
- Add validation framework (config, model, results, study validators)
- Add Claude Code skills (create-study, run-optimization, generate-report,
  troubleshoot, analyze-model)
- Add Atomizer Dashboard (React frontend + FastAPI backend)
- Reorganize docs into structured directories (00-09)
- Add neural surrogate modules and training infrastructure
- Add multi-objective optimization support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 19:23:58 -05:00

695 lines
24 KiB
Python

"""
Intelligent Setup System for Atomizer
This module provides COMPLETE autonomy for optimization setup:
1. Solves ALL solutions in .sim file
2. Discovers all available results (eigenvalues, displacements, stresses, etc.)
3. Catalogs expressions and parameters
4. Matches workflow objectives to available results
5. Auto-selects correct solution for optimization
6. Generates optimized runner code
This is the level of intelligence Atomizer should have.
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import json
from datetime import datetime
class IntelligentSetup:
"""
Intelligent benchmarking and setup system.
Proactively discovers EVERYTHING about a simulation:
- All solutions (Static, Modal, Buckling, etc.)
- All result types (displacements, stresses, eigenvalues, etc.)
- All expressions and parameters
- Matches user objectives to available data
"""
def __init__(self):
self.project_root = Path(__file__).parent.parent
def run_complete_benchmarking(
self,
prt_file: Path,
sim_file: Path,
workflow: Dict[str, Any]
) -> Dict[str, Any]:
"""
Run COMPLETE benchmarking:
1. Extract ALL expressions from .prt
2. Solve ALL solutions in .sim
3. Analyze ALL result files
4. Match objectives to available results
5. Determine optimal solution for each objective
Returns:
Complete catalog of available data and recommendations
"""
print()
print("="*80)
print(" INTELLIGENT SETUP - COMPLETE ANALYSIS")
print("="*80)
print()
results = {
'success': False,
'expressions': {},
'solutions': {},
'available_results': {},
'objective_mapping': {},
'recommended_solution': None,
'errors': []
}
try:
# Phase 1: Extract ALL expressions
print("[Phase 1/4] Extracting ALL expressions from model...")
expressions = self._extract_all_expressions(prt_file)
results['expressions'] = expressions
print(f" [OK] Found {len(expressions)} expressions")
for name, info in list(expressions.items())[:5]:
val = info.get('value', 'N/A')
units = info.get('units', '')
print(f" - {name}: {val} {units}")
if len(expressions) > 5:
print(f" ... and {len(expressions) - 5} more")
print()
# Phase 2: Solve ALL solutions
print("[Phase 2/4] Solving ALL solutions in .sim file...")
solutions_info = self._solve_all_solutions(sim_file)
results['solutions'] = solutions_info
print(f" [OK] Solved {solutions_info['num_solved']} solutions")
for sol_name in solutions_info['solution_names']:
print(f" - {sol_name}")
print()
# Phase 3: Analyze ALL result files
print("[Phase 3/4] Analyzing ALL result files...")
available_results = self._analyze_all_results(sim_file.parent, solutions_info)
results['available_results'] = available_results
print(f" [OK] Found {len(available_results)} result files")
for result_type, details in available_results.items():
print(f" - {result_type}: {details['count']} entries in {details['file']}")
print()
# Phase 4: Match objectives to results
print("[Phase 4/4] Matching objectives to available results...")
mapping = self._match_objectives_to_results(workflow, available_results, solutions_info)
results['objective_mapping'] = mapping
results['recommended_solution'] = mapping.get('primary_solution')
print(f" [OK] Objective mapping complete")
for obj_name, obj_info in mapping['objectives'].items():
print(f" - {obj_name}")
print(f" Solution: {obj_info.get('solution', 'NONE')}")
print(f" Result type: {obj_info.get('result_type', 'Unknown')}")
print(f" Extractor: {obj_info.get('extractor', 'Unknown')}")
if 'error' in obj_info:
print(f" [WARNING] {obj_info['error']}")
print()
if mapping.get('primary_solution'):
print(f" [RECOMMENDATION] Use solution: {mapping['primary_solution']}")
print()
results['success'] = True
except Exception as e:
results['errors'].append(str(e))
print(f" [ERROR] {e}")
print()
print("="*80)
print(" ANALYSIS COMPLETE")
print("="*80)
print()
return results
def _extract_all_expressions(self, prt_file: Path) -> Dict[str, Any]:
"""Extract ALL expressions from .prt file."""
from optimization_engine.nx_updater import NXParameterUpdater
updater = NXParameterUpdater(prt_file)
return updater.get_all_expressions()
def _solve_all_solutions(self, sim_file: Path) -> Dict[str, Any]:
"""
Solve ALL solutions in .sim file using NXOpen journal approach.
CRITICAL: This method updates the .fem file from the .prt before solving!
This is required when geometry changes (modal analysis, etc.)
Returns dict with:
- num_solved: int
- num_failed: int
- num_skipped: int
- solution_names: List[str]
"""
# Create journal to solve all solutions
journal_code = f'''
import sys
import NXOpen
import NXOpen.CAE
def main(args):
if len(args) < 1:
print("ERROR: No .sim file path provided")
return False
sim_file_path = args[0]
theSession = NXOpen.Session.GetSession()
# Open the .sim file
print(f"[JOURNAL] Opening simulation: {{sim_file_path}}")
basePart1, partLoadStatus1 = theSession.Parts.OpenActiveDisplay(
sim_file_path,
NXOpen.DisplayPartOption.AllowAdditional
)
partLoadStatus1.Dispose()
workSimPart = theSession.Parts.BaseWork
print(f"[JOURNAL] Simulation opened successfully")
# CRITICAL: Update FEM from master model (.prt)
# This is required when geometry has changed (modal analysis, etc.)
print("[JOURNAL] Updating FEM from master model...")
simSimulation = workSimPart.Simulation
# Get all FEModels and update them
femModels = simSimulation.FemParts
for i in range(femModels.Length):
femPart = femModels.Item(i)
print(f"[JOURNAL] Updating FEM: {{femPart.Name}}")
# Update the FEM from associated CAD part
femPart.UpdateFemodel()
# Save after FEM update
print("[JOURNAL] Saving after FEM update...")
partSaveStatus = workSimPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue,
NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus.Dispose()
# Get all solutions
theCAESimSolveManager = NXOpen.CAE.SimSolveManager.GetSimSolveManager(theSession)
# Solve all solutions
print("[JOURNAL] Solving ALL solutions...")
num_solved, num_failed, num_skipped = theCAESimSolveManager.SolveAllSolutions(
NXOpen.CAE.SimSolution.SolveOption.Solve,
NXOpen.CAE.SimSolution.SetupCheckOption.CompleteCheckAndOutputErrors,
NXOpen.CAE.SimSolution.SolveMode.Foreground,
False
)
# Get solution names
simSimulation = workSimPart.FindObject("Simulation")
solutions = []
for obj in simSimulation.GetAllDescendents():
if "Solution[" in str(obj):
solutions.append(str(obj))
# Save to write output files
print("[JOURNAL] Saving simulation to write output files...")
partSaveStatus = workSimPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue,
NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus.Dispose()
# Output results
print(f"ATOMIZER_SOLUTIONS_SOLVED: {{num_solved}}")
print(f"ATOMIZER_SOLUTIONS_FAILED: {{num_failed}}")
print(f"ATOMIZER_SOLUTIONS_SKIPPED: {{num_skipped}}")
for sol in solutions:
print(f"ATOMIZER_SOLUTION: {{sol}}")
return True
if __name__ == '__main__':
success = main(sys.argv[1:])
sys.exit(0 if success else 1)
'''
# Write and execute journal
journal_path = sim_file.parent / "_solve_all_solutions.py"
with open(journal_path, 'w') as f:
f.write(journal_code)
# Run journal via NX
from optimization_engine.nx_solver import NXSolver
solver = NXSolver()
import subprocess
from config import NX_RUN_JOURNAL
result = subprocess.run(
[str(NX_RUN_JOURNAL), str(journal_path), str(sim_file)],
capture_output=True,
text=True,
timeout=600
)
# Parse output
num_solved = 0
num_failed = 0
num_skipped = 0
solution_names = []
for line in result.stdout.split('\n'):
if 'ATOMIZER_SOLUTIONS_SOLVED:' in line:
num_solved = int(line.split(':')[1].strip())
elif 'ATOMIZER_SOLUTIONS_FAILED:' in line:
num_failed = int(line.split(':')[1].strip())
elif 'ATOMIZER_SOLUTIONS_SKIPPED:' in line:
num_skipped = int(line.split(':')[1].strip())
elif 'ATOMIZER_SOLUTION:' in line:
sol_name = line.split(':', 1)[1].strip()
solution_names.append(sol_name)
# Clean up
journal_path.unlink()
return {
'num_solved': num_solved,
'num_failed': num_failed,
'num_skipped': num_skipped,
'solution_names': solution_names
}
def _analyze_all_results(
self,
model_dir: Path,
solutions_info: Dict[str, Any]
) -> Dict[str, Any]:
"""
Analyze ALL .op2 files to discover available results.
Returns dict mapping result types to details:
{
'eigenvalues': {'file': 'xxx.op2', 'count': 10, 'solution': 'Modal'},
'displacements': {'file': 'yyy.op2', 'count': 613, 'solution': 'Static'},
'stress_quad4': {'file': 'yyy.op2', 'count': 561, 'solution': 'Static'},
...
}
"""
from pyNastran.op2.op2 import OP2
available = {}
# Find all .op2 files
op2_files = list(model_dir.glob("*.op2"))
for op2_file in op2_files:
try:
model = OP2()
model.read_op2(str(op2_file))
# Check for eigenvalues
if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0:
subcase = list(model.eigenvalues.keys())[0]
eig_obj = model.eigenvalues[subcase]
available['eigenvalues'] = {
'file': op2_file.name,
'count': len(eig_obj.eigenvalues),
'solution': self._guess_solution_from_filename(op2_file.name),
'op2_path': op2_file
}
# Check for displacements
if hasattr(model, 'displacements') and len(model.displacements) > 0:
subcase = list(model.displacements.keys())[0]
disp_obj = model.displacements[subcase]
available['displacements'] = {
'file': op2_file.name,
'count': disp_obj.data.shape[1], # Number of nodes
'solution': self._guess_solution_from_filename(op2_file.name),
'op2_path': op2_file
}
# Check for stresses
if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0:
subcase = list(model.cquad4_stress.keys())[0]
stress_obj = model.cquad4_stress[subcase]
available['stress_quad4'] = {
'file': op2_file.name,
'count': stress_obj.data.shape[1], # Number of elements
'solution': self._guess_solution_from_filename(op2_file.name),
'op2_path': op2_file
}
# Check for forces
if hasattr(model, 'cquad4_force') and len(model.cquad4_force) > 0:
available['force_quad4'] = {
'file': op2_file.name,
'count': len(model.cquad4_force),
'solution': self._guess_solution_from_filename(op2_file.name),
'op2_path': op2_file
}
except Exception as e:
print(f" [WARNING] Could not analyze {op2_file.name}: {e}")
return available
def _guess_solution_from_filename(self, filename: str) -> str:
"""Guess solution type from filename."""
filename_lower = filename.lower()
if 'normal_modes' in filename_lower or 'modal' in filename_lower:
return 'Solution_Normal_Modes'
elif 'buckling' in filename_lower:
return 'Solution_Buckling'
elif 'static' in filename_lower or 'solution_1' in filename_lower:
return 'Solution_1'
else:
return 'Unknown'
def _match_objectives_to_results(
self,
workflow: Dict[str, Any],
available_results: Dict[str, Any],
solutions_info: Dict[str, Any]
) -> Dict[str, Any]:
"""
Intelligently match workflow objectives to available results.
Returns:
{
'objectives': {
'obj_name': {
'solution': 'Solution_Normal_Modes',
'result_type': 'eigenvalues',
'extractor': 'extract_first_frequency',
'op2_file': Path(...)
}
},
'primary_solution': 'Solution_Normal_Modes' # Most important solution
}
"""
mapping = {
'objectives': {},
'primary_solution': None
}
for obj in workflow.get('objectives', []):
obj_name = obj.get('name', 'unnamed')
extraction = obj.get('extraction', {})
action = extraction.get('action', '').lower()
# Match based on objective type
if 'frequency' in action or 'eigenvalue' in action or 'modal' in action:
if 'eigenvalues' in available_results:
result_info = available_results['eigenvalues']
mapping['objectives'][obj_name] = {
'solution': result_info['solution'],
'result_type': 'eigenvalues',
'extractor': 'extract_first_frequency',
'op2_file': result_info['op2_path'],
'match_confidence': 'HIGH'
}
if not mapping['primary_solution']:
mapping['primary_solution'] = result_info['solution']
else:
mapping['objectives'][obj_name] = {
'solution': 'NONE',
'result_type': 'eigenvalues',
'extractor': 'extract_first_frequency',
'op2_file': None,
'match_confidence': 'ERROR',
'error': 'No eigenvalue results found - check if modal solution exists'
}
elif 'displacement' in action or 'deflection' in action:
if 'displacements' in available_results:
result_info = available_results['displacements']
mapping['objectives'][obj_name] = {
'solution': result_info['solution'],
'result_type': 'displacements',
'extractor': 'extract_max_displacement',
'op2_file': result_info['op2_path'],
'match_confidence': 'HIGH'
}
if not mapping['primary_solution']:
mapping['primary_solution'] = result_info['solution']
elif 'stress' in action or 'von_mises' in action:
if 'stress_quad4' in available_results:
result_info = available_results['stress_quad4']
mapping['objectives'][obj_name] = {
'solution': result_info['solution'],
'result_type': 'stress',
'extractor': 'extract_max_stress',
'op2_file': result_info['op2_path'],
'match_confidence': 'HIGH'
}
if not mapping['primary_solution']:
mapping['primary_solution'] = result_info['solution']
return mapping
def generate_intelligent_runner(
self,
study_dir: Path,
workflow: Dict[str, Any],
benchmark_results: Dict[str, Any]
) -> Path:
"""
Generate optimized runner based on intelligent analysis.
Uses benchmark results to:
1. Select correct solution to solve
2. Generate correct extractors
3. Optimize for speed (only solve what's needed)
"""
runner_path = study_dir / "run_optimization.py"
# Get recommended solution
recommended_solution = benchmark_results.get('recommended_solution', 'Solution_1')
objective_mapping = benchmark_results.get('objective_mapping', {})
# Generate extractor functions based on actual available results
extractor_code = self._generate_intelligent_extractors(objective_mapping)
runner_code = f'''"""
Auto-generated INTELLIGENT optimization runner
Created: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
Intelligently configured based on complete benchmarking:
- Solution: {recommended_solution}
- Extractors: Auto-matched to available results
"""
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
import json
import optuna
from optimization_engine.nx_updater import NXParameterUpdater
from optimization_engine.nx_solver import NXSolver
{extractor_code}
def main():
print("="*80)
print(" {workflow.get('study_name', 'OPTIMIZATION').upper()}")
print(" Intelligent Setup - Auto-configured")
print("="*80)
print()
# Load workflow
config_file = Path(__file__).parent / "1_setup/workflow_config.json"
with open(config_file) as f:
workflow = json.load(f)
print("Configuration:")
print(f" Target solution: {recommended_solution}")
print(f" Objectives: {len(workflow.get('objectives', []))}")
print(f" Variables: {len(workflow.get('design_variables', []))}")
print()
# Setup paths
model_dir = Path(__file__).parent / "1_setup/model"
prt_file = list(model_dir.glob("*.prt"))[0]
sim_file = list(model_dir.glob("*.sim"))[0]
output_dir = Path(__file__).parent / "2_substudies/results"
output_dir.mkdir(parents=True, exist_ok=True)
# Initialize
updater = NXParameterUpdater(prt_file)
solver = NXSolver()
# Create Optuna study
study_name = "{workflow.get('study_name', 'optimization')}"
storage = f"sqlite:///{{output_dir / 'study.db'}}"
study = optuna.create_study(
study_name=study_name,
storage=storage,
load_if_exists=True,
direction="minimize"
)
def objective(trial):
# Sample design variables
params = {{}}
for var in workflow['design_variables']:
name = var['parameter']
bounds = var['bounds']
params[name] = trial.suggest_float(name, bounds[0], bounds[1])
print(f"\\nTrial {{trial.number}}:")
for name, value in params.items():
print(f" {{name}} = {{value:.2f}}")
# Update model
updater.update_expressions(params)
# Run SPECIFIC solution (optimized - only what's needed)
result = solver.run_simulation(
sim_file,
solution_name="{recommended_solution}"
)
if not result['success']:
raise RuntimeError(f"Simulation failed: {{result.get('errors', 'Unknown')}}")
op2_file = result['op2_file']
# Extract results
results = extract_results(op2_file, workflow)
# Print results
for name, value in results.items():
print(f" {{name}} = {{value:.4f}}")
# Calculate objective
obj_config = workflow['objectives'][0]
result_name = list(results.keys())[0]
if obj_config['goal'] == 'minimize':
objective_value = results[result_name]
else:
objective_value = -results[result_name]
print(f" Objective = {{objective_value:.4f}}")
return objective_value
# Run optimization
n_trials = 10
print(f"\\nRunning {{n_trials}} trials...")
print("="*80)
print()
study.optimize(objective, n_trials=n_trials)
# Results
print()
print("="*80)
print(" OPTIMIZATION COMPLETE")
print("="*80)
print()
print(f"Best trial: #{{study.best_trial.number}}")
for name, value in study.best_params.items():
print(f" {{name}} = {{value:.2f}}")
print(f"\\nBest objective = {{study.best_value:.4f}}")
print()
if __name__ == "__main__":
main()
'''
with open(runner_path, 'w') as f:
f.write(runner_code)
return runner_path
def _generate_intelligent_extractors(self, objective_mapping: Dict[str, Any]) -> str:
"""Generate extractor functions based on intelligent mapping."""
extractors = set()
for obj_name, obj_info in objective_mapping.get('objectives', {}).items():
if 'extractor' in obj_info:
extractors.add(obj_info['extractor'])
code = '''
def extract_results(op2_file, workflow):
"""Intelligently extract results based on benchmarking."""
from pyNastran.op2.op2 import OP2
import numpy as np
model = OP2()
model.read_op2(str(op2_file))
results = {}
'''
if 'extract_first_frequency' in extractors:
code += '''
# Extract first frequency (auto-matched to eigenvalues)
if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0:
subcase = list(model.eigenvalues.keys())[0]
eig_obj = model.eigenvalues[subcase]
eigenvalue = eig_obj.eigenvalues[0]
angular_freq = np.sqrt(eigenvalue)
frequency_hz = angular_freq / (2 * np.pi)
results['first_frequency'] = float(frequency_hz)
'''
if 'extract_max_displacement' in extractors:
code += '''
# Extract max displacement (auto-matched to displacements)
if hasattr(model, 'displacements') and len(model.displacements) > 0:
subcase = list(model.displacements.keys())[0]
disp_obj = model.displacements[subcase]
translations = disp_obj.data[0, :, :3]
magnitudes = np.linalg.norm(translations, axis=1)
results['max_displacement'] = float(np.max(magnitudes))
'''
if 'extract_max_stress' in extractors:
code += '''
# Extract max stress (auto-matched to stress results)
if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0:
subcase = list(model.cquad4_stress.keys())[0]
stress_obj = model.cquad4_stress[subcase]
von_mises = stress_obj.data[0, :, 7]
results['max_stress'] = float(np.max(von_mises))
'''
code += '''
return results
'''
return code
if __name__ == "__main__":
# Example usage
setup = IntelligentSetup()
# Run complete analysis
results = setup.run_complete_benchmarking(
prt_file=Path("path/to/model.prt"),
sim_file=Path("path/to/model.sim"),
workflow={'objectives': [{'name': 'freq', 'extraction': {'action': 'extract_frequency'}}]}
)
print("Analysis complete:")
print(json.dumps(results, indent=2, default=str))