feat: Add complete optimization runner pipeline

Implement core optimization engine with:
- OptimizationRunner class with Optuna integration
- NXParameterUpdater for updating .prt file expressions
- Result extractor wrappers for OP2 files
- Complete end-to-end example workflow

Features:
- runner.py: Main optimization loop, multi-objective support, constraint handling
- nx_updater.py: Binary .prt file parameter updates (tested successfully)
- extractors.py: Wrappers for mass/stress/displacement extraction
- run_optimization.py: Complete example showing full workflow

NX Updater tested with bracket example:
- Successfully found 4 expressions (support_angle, tip_thickness, p3, support_blend_radius)
- Updated support_angle 30.0 -> 33.0 and verified

Next steps:
- Install pyNastran for OP2 extraction
- Integrate NX solver execution
- Replace dummy extractors with real OP2 readers

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-15 10:29:33 -05:00
parent c534483043
commit be3b9ee5d5
4 changed files with 1079 additions and 0 deletions

View File

@@ -0,0 +1,206 @@
"""
Example: Running Complete Optimization
This example demonstrates the complete optimization workflow:
1. Load optimization configuration
2. Update NX model parameters
3. Run simulation (dummy for now - would call NX solver)
4. Extract results from OP2
5. Optimize with Optuna
For a real run, you would need:
- pyNastran installed for OP2 extraction
- NX solver accessible to run simulations
"""
from pathlib import Path
import sys
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from optimization_engine.runner import OptimizationRunner
from optimization_engine.nx_updater import update_nx_model
# ==================================================
# STEP 1: Define model updater function
# ==================================================
def bracket_model_updater(design_vars: dict):
"""
Update the bracket model with new design variable values.
Args:
design_vars: Dict like {'tip_thickness': 22.5, 'support_angle': 35.0}
"""
prt_file = project_root / "examples/bracket/Bracket.prt"
print(f"\n[MODEL UPDATE] Updating {prt_file.name} with:")
for name, value in design_vars.items():
print(f" {name} = {value:.4f}")
# Update the .prt file with new parameter values
update_nx_model(prt_file, design_vars, backup=False)
print("[MODEL UPDATE] Complete")
# ==================================================
# STEP 2: Define simulation runner function
# ==================================================
def bracket_simulation_runner() -> Path:
"""
Run NX simulation and return path to result files.
In a real implementation, this would:
1. Open NX (or use batch mode)
2. Update the .sim file
3. Run the solver
4. Wait for completion
5. Return path to .op2 file
For now, we return the path to existing results.
"""
print("\n[SIMULATION] Running NX Nastran solver...")
print("[SIMULATION] (Using existing results for demonstration)")
# In real use, this would run the actual solver
# For now, return path to existing OP2 file
result_file = project_root / "examples/bracket/bracket_sim1-solution_1.op2"
if not result_file.exists():
raise FileNotFoundError(f"Result file not found: {result_file}")
print(f"[SIMULATION] Results: {result_file.name}")
return result_file
# ==================================================
# STEP 3: Define result extractors (dummy versions)
# ==================================================
def dummy_mass_extractor(result_path: Path) -> dict:
"""
Dummy mass extractor.
In real use, would call: from optimization_engine.result_extractors.extractors import mass_extractor
"""
import random
# Simulate varying mass based on a simple model
# In reality, this would extract from OP2
base_mass = 0.45 # kg
variation = random.uniform(-0.05, 0.05)
return {
'total_mass': base_mass + variation,
'cg_x': 0.0,
'cg_y': 0.0,
'cg_z': 0.0,
'units': 'kg'
}
def dummy_stress_extractor(result_path: Path) -> dict:
"""
Dummy stress extractor.
In real use, would call: from optimization_engine.result_extractors.extractors import stress_extractor
"""
import random
# Simulate stress results
base_stress = 180.0 # MPa
variation = random.uniform(-30.0, 30.0)
return {
'max_von_mises': base_stress + variation,
'stress_type': 'von_mises',
'element_id': 1234,
'units': 'MPa'
}
def dummy_displacement_extractor(result_path: Path) -> dict:
"""
Dummy displacement extractor.
In real use, would call: from optimization_engine.result_extractors.extractors import displacement_extractor
"""
import random
# Simulate displacement results
base_disp = 0.9 # mm
variation = random.uniform(-0.2, 0.2)
return {
'max_displacement': base_disp + variation,
'max_node_id': 5678,
'dx': 0.0,
'dy': 0.0,
'dz': base_disp + variation,
'units': 'mm'
}
# ==================================================
# MAIN: Run optimization
# ==================================================
if __name__ == "__main__":
print("="*60)
print("ATOMIZER - OPTIMIZATION EXAMPLE")
print("="*60)
# Path to optimization configuration
config_path = project_root / "examples/bracket/optimization_config.json"
if not config_path.exists():
print(f"Error: Configuration file not found: {config_path}")
print("Please run the MCP build_optimization_config tool first.")
sys.exit(1)
print(f"\nConfiguration: {config_path}")
# Create result extractors dict
extractors = {
'mass_extractor': dummy_mass_extractor,
'stress_extractor': dummy_stress_extractor,
'displacement_extractor': dummy_displacement_extractor
}
# Create optimization runner
runner = OptimizationRunner(
config_path=config_path,
model_updater=bracket_model_updater,
simulation_runner=bracket_simulation_runner,
result_extractors=extractors
)
# Run optimization (use fewer trials for demo)
print("\n" + "="*60)
print("Starting optimization with 10 trials (demo)")
print("For full optimization, modify n_trials in config")
print("="*60)
# Override n_trials for demo
runner.config['optimization_settings']['n_trials'] = 10
# Run!
study = runner.run(study_name="bracket_optimization_demo")
print("\n" + "="*60)
print("OPTIMIZATION RESULTS")
print("="*60)
print(f"\nBest parameters found:")
for param, value in study.best_params.items():
print(f" {param}: {value:.4f}")
print(f"\nBest objective value: {study.best_value:.6f}")
print(f"\nResults saved to: {runner.output_dir}")
print(" - history.csv (all trials)")
print(" - history.json (detailed results)")
print(" - optimization_summary.json (best results)")
print("\n" + "="*60)
print("NEXT STEPS:")
print("="*60)
print("1. Install pyNastran: conda install -c conda-forge pynastran")
print("2. Replace dummy extractors with real OP2 extractors")
print("3. Integrate with NX solver (batch mode or NXOpen)")
print("4. Run full optimization with n_trials=100+")
print("="*60)

View File

@@ -0,0 +1,292 @@
"""
NX Parameter Updater
Updates design variable values in NX .prt files.
NX .prt files are binary, but expressions are stored in readable text sections.
This module can update expression values by:
1. Reading the binary file
2. Finding and replacing expression value patterns
3. Writing back the updated file
Alternative: Use NXOpen API if NX is running (future enhancement)
"""
from pathlib import Path
from typing import Dict, List
import re
import shutil
from datetime import datetime
class NXParameterUpdater:
"""
Updates parametric expression values in NX .prt files.
NX Expression Format in binary .prt files:
#(Number [mm]) tip_thickness: 20.0;
*(Number [degrees]) support_angle: 30.0;
"""
def __init__(self, prt_file_path: Path, backup: bool = True):
"""
Initialize updater for a specific .prt file.
Args:
prt_file_path: Path to NX .prt file
backup: If True, create backup before modifying
"""
self.prt_path = Path(prt_file_path)
if not self.prt_path.exists():
raise FileNotFoundError(f".prt file not found: {prt_file_path}")
self.backup_enabled = backup
self.content = None
self.text_content = None
self._load_file()
def _load_file(self):
"""Load .prt file as binary."""
with open(self.prt_path, 'rb') as f:
self.content = bytearray(f.read())
# Decode as latin-1 for text operations (preserves all bytes)
self.text_content = self.content.decode('latin-1', errors='ignore')
def _create_backup(self):
"""Create timestamped backup of original file."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.prt_path.with_suffix(f'.prt.bak_{timestamp}')
shutil.copy2(self.prt_path, backup_path)
print(f"Backup created: {backup_path}")
return backup_path
def find_expressions(self) -> List[Dict[str, any]]:
"""
Find all expressions in the .prt file.
Returns:
List of dicts with name, value, units
"""
expressions = []
# Pattern for NX expressions:
# #(Number [mm]) tip_thickness: 20.0;
# *(Number [mm]) p3: 10.0;
# ((Number [degrees]) support_angle: 30.0;
pattern = r'[#*\(]*\((\w+)\s*\[([^\]]*)\]\)\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)'
for match in re.finditer(pattern, self.text_content):
expr_type, units, name, value = match.groups()
expressions.append({
'name': name,
'value': float(value),
'units': units,
'type': expr_type
})
return expressions
def update_expression(self, name: str, new_value: float) -> bool:
"""
Update a single expression value.
Args:
name: Expression name
new_value: New value
Returns:
True if updated, False if not found
"""
# Find the expression pattern
# Match: (Type [units]) name: old_value;
# We need to be careful to match the exact name and preserve formatting
# Pattern that captures the full expression line
pattern = rf'([#*\(]*\(\w+\s*\[[^\]]*\]\)\s*)({re.escape(name)})\s*:\s*([-+]?\d*\.?\d+(?:[eE][-+]?\d+)?)'
matches = list(re.finditer(pattern, self.text_content))
if not matches:
print(f"Warning: Expression '{name}' not found in .prt file")
return False
if len(matches) > 1:
print(f"Warning: Multiple matches for '{name}', updating first occurrence")
# Get the first match
match = matches[0]
prefix, expr_name, old_value = match.groups()
# Format new value (preserve decimal places if possible)
# Check if old value had decimal point
if '.' in old_value or 'e' in old_value.lower():
# Use same precision as old value
decimal_places = len(old_value.split('.')[-1]) if '.' in old_value else 2
new_value_str = f"{new_value:.{decimal_places}f}"
else:
# Integer format
new_value_str = f"{int(new_value)}"
# Build replacement string
replacement = f"{prefix}{expr_name}: {new_value_str}"
# Replace in text content
old_match = match.group(0)
self.text_content = self.text_content.replace(old_match, replacement, 1)
# Also update in binary content
old_bytes = old_match.encode('latin-1')
new_bytes = replacement.encode('latin-1')
# Find and replace in binary content
start_pos = self.content.find(old_bytes)
if start_pos != -1:
# Replace bytes
self.content[start_pos:start_pos+len(old_bytes)] = new_bytes
print(f"Updated: {name} = {old_value} -> {new_value_str}")
return True
else:
print(f"Warning: Could not update binary content for '{name}'")
return False
def update_expressions(self, updates: Dict[str, float]):
"""
Update multiple expressions at once.
Args:
updates: Dict mapping expression name to new value
{'tip_thickness': 22.5, 'support_angle': 35.0}
"""
print(f"\nUpdating {len(updates)} expressions in {self.prt_path.name}:")
updated_count = 0
for name, value in updates.items():
if self.update_expression(name, value):
updated_count += 1
print(f"Successfully updated {updated_count}/{len(updates)} expressions")
def save(self, output_path: Path = None):
"""
Save modified .prt file.
Args:
output_path: Optional different path to save to.
If None, overwrites original (with backup if enabled)
"""
if output_path is None:
output_path = self.prt_path
if self.backup_enabled:
self._create_backup()
# Write updated binary content
with open(output_path, 'wb') as f:
f.write(self.content)
print(f"Saved to: {output_path}")
def verify_update(self, name: str, expected_value: float, tolerance: float = 1e-6) -> bool:
"""
Verify that an expression was updated correctly.
Args:
name: Expression name
expected_value: Expected value
tolerance: Acceptable difference
Returns:
True if value matches (within tolerance)
"""
expressions = self.find_expressions()
expr = next((e for e in expressions if e['name'] == name), None)
if expr is None:
print(f"Expression '{name}' not found")
return False
actual_value = expr['value']
difference = abs(actual_value - expected_value)
if difference <= tolerance:
print(f"OK Verified: {name} = {actual_value} (expected {expected_value})")
return True
else:
print(f"FAIL Verification failed: {name} = {actual_value}, expected {expected_value} (diff: {difference})")
return False
# Convenience function for optimization loop
def update_nx_model(prt_file_path: Path, design_variables: Dict[str, float], backup: bool = False):
"""
Convenience function to update NX model parameters.
Args:
prt_file_path: Path to .prt file
design_variables: Dict of parameter name -> value
backup: Whether to create backup
Example:
>>> update_nx_model(
... Path("Bracket.prt"),
... {'tip_thickness': 22.5, 'support_angle': 35.0}
... )
"""
updater = NXParameterUpdater(prt_file_path, backup=backup)
updater.update_expressions(design_variables)
updater.save()
# Example usage
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python nx_updater.py <path_to_prt_file>")
sys.exit(1)
prt_path = Path(sys.argv[1])
# Test: Find and print all expressions
print("="*60)
print("NX PARAMETER UPDATER TEST")
print("="*60)
updater = NXParameterUpdater(prt_path, backup=True)
print("\nCurrent expressions in file:")
expressions = updater.find_expressions()
for expr in expressions:
print(f" {expr['name']}: {expr['value']} {expr['units']}")
# Test update (if expressions found)
if expressions:
print("\n" + "="*60)
print("TEST UPDATE")
print("="*60)
# Update first expression
first_expr = expressions[0]
test_name = first_expr['name']
test_new_value = first_expr['value'] * 1.1 # Increase by 10%
print(f"\nUpdating {test_name} from {first_expr['value']} to {test_new_value}")
updater.update_expression(test_name, test_new_value)
# Save to test file
test_output = prt_path.with_suffix('.prt.test')
updater.save(test_output)
# Verify by re-reading
print("\n" + "="*60)
print("VERIFICATION")
print("="*60)
verifier = NXParameterUpdater(test_output, backup=False)
verifier.verify_update(test_name, test_new_value)
print(f"\nTest complete. Modified file: {test_output}")
else:
print("\nNo expressions found in file. Nothing to test.")

View File

@@ -0,0 +1,207 @@
"""
Result Extractors
Wrapper functions that integrate with the optimization runner.
These extract optimization metrics from NX Nastran result files.
"""
from pathlib import Path
from typing import Dict, Any
import sys
# Add project root to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from optimization_engine.result_extractors.op2_extractor_example import (
extract_max_displacement,
extract_max_stress,
extract_mass
)
def mass_extractor(result_path: Path) -> Dict[str, float]:
"""
Extract mass metrics for optimization.
Args:
result_path: Path to .op2 file or directory containing results
Returns:
Dict with 'total_mass' and other mass-related metrics
"""
# If result_path is a directory, find the .op2 file
if result_path.is_dir():
op2_files = list(result_path.glob("*.op2"))
if not op2_files:
raise FileNotFoundError(f"No .op2 files found in {result_path}")
op2_path = op2_files[0] # Use first .op2 file
else:
op2_path = result_path
if not op2_path.exists():
raise FileNotFoundError(f"Result file not found: {op2_path}")
result = extract_mass(op2_path)
# Ensure total_mass key exists
if 'total_mass' not in result or result['total_mass'] is None:
raise ValueError(f"Could not extract total_mass from {op2_path}")
return result
def stress_extractor(result_path: Path) -> Dict[str, float]:
"""
Extract stress metrics for optimization.
Args:
result_path: Path to .op2 file or directory containing results
Returns:
Dict with 'max_von_mises' and other stress metrics
"""
# If result_path is a directory, find the .op2 file
if result_path.is_dir():
op2_files = list(result_path.glob("*.op2"))
if not op2_files:
raise FileNotFoundError(f"No .op2 files found in {result_path}")
op2_path = op2_files[0]
else:
op2_path = result_path
if not op2_path.exists():
raise FileNotFoundError(f"Result file not found: {op2_path}")
result = extract_max_stress(op2_path, stress_type='von_mises')
# Ensure max_von_mises key exists
if 'max_stress' in result:
result['max_von_mises'] = result['max_stress']
if 'max_von_mises' not in result or result['max_von_mises'] is None:
raise ValueError(f"Could not extract max_von_mises from {op2_path}")
return result
def displacement_extractor(result_path: Path) -> Dict[str, float]:
"""
Extract displacement metrics for optimization.
Args:
result_path: Path to .op2 file or directory containing results
Returns:
Dict with 'max_displacement' and other displacement metrics
"""
# If result_path is a directory, find the .op2 file
if result_path.is_dir():
op2_files = list(result_path.glob("*.op2"))
if not op2_files:
raise FileNotFoundError(f"No .op2 files found in {result_path}")
op2_path = op2_files[0]
else:
op2_path = result_path
if not op2_path.exists():
raise FileNotFoundError(f"Result file not found: {op2_path}")
result = extract_max_displacement(op2_path)
# Ensure max_displacement key exists
if 'max_displacement' not in result or result['max_displacement'] is None:
raise ValueError(f"Could not extract max_displacement from {op2_path}")
return result
def volume_extractor(result_path: Path) -> Dict[str, float]:
"""
Extract volume metrics for optimization.
Note: Volume is often not directly in OP2 files.
This is a placeholder that could be extended to:
- Calculate from mass and density
- Extract from .f06 file
- Query from NX model directly
Args:
result_path: Path to result files
Returns:
Dict with 'total_volume'
"""
# For now, estimate from mass (would need material density)
# This is a placeholder implementation
mass_result = mass_extractor(result_path)
# Assuming steel density ~7850 kg/m^3 = 7.85e-6 kg/mm^3
# volume (mm^3) = mass (kg) / density (kg/mm^3)
assumed_density = 7.85e-6 # kg/mm^3
if mass_result['total_mass']:
total_volume = mass_result['total_mass'] / assumed_density
else:
total_volume = None
return {
'total_volume': total_volume,
'note': 'Volume estimated from mass using assumed density'
}
# Registry of all available extractors
EXTRACTOR_REGISTRY = {
'mass_extractor': mass_extractor,
'stress_extractor': stress_extractor,
'displacement_extractor': displacement_extractor,
'volume_extractor': volume_extractor
}
def get_extractor(extractor_name: str):
"""
Get an extractor function by name.
Args:
extractor_name: Name of the extractor
Returns:
Extractor function
Raises:
ValueError: If extractor not found
"""
if extractor_name not in EXTRACTOR_REGISTRY:
available = ', '.join(EXTRACTOR_REGISTRY.keys())
raise ValueError(f"Unknown extractor: {extractor_name}. Available: {available}")
return EXTRACTOR_REGISTRY[extractor_name]
# Example usage
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python extractors.py <path_to_op2_file>")
print("\nAvailable extractors:")
for name in EXTRACTOR_REGISTRY.keys():
print(f" - {name}")
sys.exit(1)
result_path = Path(sys.argv[1])
print("="*60)
print("TESTING ALL EXTRACTORS")
print("="*60)
for extractor_name, extractor_func in EXTRACTOR_REGISTRY.items():
print(f"\n{extractor_name}:")
try:
result = extractor_func(result_path)
for key, value in result.items():
print(f" {key}: {value}")
except Exception as e:
print(f" Error: {e}")

View File

@@ -0,0 +1,374 @@
"""
Optimization Runner
Orchestrates the optimization loop:
1. Load configuration
2. Initialize Optuna study
3. For each trial:
- Update design variables in NX model
- Run simulation
- Extract results (OP2 file)
- Return objective/constraint values to Optuna
4. Save optimization history
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable
import json
import time
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, GPSampler
import pandas as pd
from datetime import datetime
class OptimizationRunner:
"""
Main optimization runner that coordinates:
- Optuna optimization loop
- NX model parameter updates
- Simulation execution
- Result extraction
"""
def __init__(
self,
config_path: Path,
model_updater: Callable,
simulation_runner: Callable,
result_extractors: Dict[str, Callable]
):
"""
Initialize optimization runner.
Args:
config_path: Path to optimization_config.json
model_updater: Function(design_vars: Dict) -> None
Updates NX model with new parameter values
simulation_runner: Function() -> Path
Runs simulation and returns path to result files
result_extractors: Dict mapping extractor name to extraction function
e.g., {'mass_extractor': extract_mass_func}
"""
self.config_path = Path(config_path)
self.config = self._load_config()
self.model_updater = model_updater
self.simulation_runner = simulation_runner
self.result_extractors = result_extractors
# Initialize storage
self.history = []
self.study = None
self.best_params = None
self.best_value = None
# Paths
self.output_dir = self.config_path.parent / 'optimization_results'
self.output_dir.mkdir(exist_ok=True)
def _load_config(self) -> Dict[str, Any]:
"""Load and validate optimization configuration."""
with open(self.config_path, 'r') as f:
config = json.load(f)
# Validate required fields
required = ['design_variables', 'objectives', 'optimization_settings']
for field in required:
if field not in config:
raise ValueError(f"Missing required field in config: {field}")
return config
def _get_sampler(self, sampler_name: str):
"""Get Optuna sampler instance."""
samplers = {
'TPE': TPESampler,
'CMAES': CmaEsSampler,
'GP': GPSampler
}
if sampler_name not in samplers:
raise ValueError(f"Unknown sampler: {sampler_name}. Choose from {list(samplers.keys())}")
return samplers[sampler_name]()
def _objective_function(self, trial: optuna.Trial) -> float:
"""
Optuna objective function.
This is called for each optimization trial.
Args:
trial: Optuna trial object
Returns:
Objective value (float) or tuple of values for multi-objective
"""
# 1. Sample design variables
design_vars = {}
for dv in self.config['design_variables']:
if dv['type'] == 'continuous':
design_vars[dv['name']] = trial.suggest_float(
dv['name'],
dv['bounds'][0],
dv['bounds'][1]
)
elif dv['type'] == 'discrete':
design_vars[dv['name']] = trial.suggest_int(
dv['name'],
int(dv['bounds'][0]),
int(dv['bounds'][1])
)
# 2. Update NX model with new parameters
try:
self.model_updater(design_vars)
except Exception as e:
print(f"Error updating model: {e}")
raise optuna.TrialPruned()
# 3. Run simulation
try:
result_path = self.simulation_runner()
except Exception as e:
print(f"Error running simulation: {e}")
raise optuna.TrialPruned()
# 4. Extract results
extracted_results = {}
for obj in self.config['objectives']:
extractor_name = obj['extractor']
if extractor_name not in self.result_extractors:
raise ValueError(f"Missing result extractor: {extractor_name}")
extractor_func = self.result_extractors[extractor_name]
try:
result = extractor_func(result_path)
metric_name = obj['metric']
extracted_results[obj['name']] = result[metric_name]
except Exception as e:
print(f"Error extracting {obj['name']}: {e}")
raise optuna.TrialPruned()
# Extract constraints
for const in self.config.get('constraints', []):
extractor_name = const['extractor']
if extractor_name not in self.result_extractors:
raise ValueError(f"Missing result extractor: {extractor_name}")
extractor_func = self.result_extractors[extractor_name]
try:
result = extractor_func(result_path)
metric_name = const['metric']
extracted_results[const['name']] = result[metric_name]
except Exception as e:
print(f"Error extracting {const['name']}: {e}")
raise optuna.TrialPruned()
# 5. Evaluate constraints
for const in self.config.get('constraints', []):
value = extracted_results[const['name']]
limit = const['limit']
if const['type'] == 'upper_bound' and value > limit:
# Constraint violated - prune trial or penalize
print(f"Constraint violated: {const['name']} = {value:.4f} > {limit:.4f}")
raise optuna.TrialPruned()
elif const['type'] == 'lower_bound' and value < limit:
print(f"Constraint violated: {const['name']} = {value:.4f} < {limit:.4f}")
raise optuna.TrialPruned()
# 6. Calculate weighted objective
# For multi-objective: weighted sum approach
total_objective = 0.0
for obj in self.config['objectives']:
value = extracted_results[obj['name']]
weight = obj.get('weight', 1.0)
direction = obj.get('direction', 'minimize')
# Normalize by weight
if direction == 'minimize':
total_objective += weight * value
else: # maximize
total_objective -= weight * value
# 7. Store results in history
history_entry = {
'trial_number': trial.number,
'timestamp': datetime.now().isoformat(),
'design_variables': design_vars,
'objectives': {obj['name']: extracted_results[obj['name']] for obj in self.config['objectives']},
'constraints': {const['name']: extracted_results[const['name']] for const in self.config.get('constraints', [])},
'total_objective': total_objective
}
self.history.append(history_entry)
# Save history after each trial
self._save_history()
print(f"\nTrial {trial.number} completed:")
print(f" Design vars: {design_vars}")
print(f" Objectives: {history_entry['objectives']}")
print(f" Total objective: {total_objective:.6f}")
return total_objective
def run(self, study_name: Optional[str] = None) -> optuna.Study:
"""
Run the optimization.
Args:
study_name: Optional name for the study
Returns:
Completed Optuna study
"""
if study_name is None:
study_name = f"optimization_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get optimization settings
settings = self.config['optimization_settings']
n_trials = settings.get('n_trials', 100)
sampler_name = settings.get('sampler', 'TPE')
# Create Optuna study
sampler = self._get_sampler(sampler_name)
self.study = optuna.create_study(
study_name=study_name,
direction='minimize', # Total weighted objective is always minimized
sampler=sampler
)
print("="*60)
print(f"STARTING OPTIMIZATION: {study_name}")
print("="*60)
print(f"Design Variables: {len(self.config['design_variables'])}")
print(f"Objectives: {len(self.config['objectives'])}")
print(f"Constraints: {len(self.config.get('constraints', []))}")
print(f"Trials: {n_trials}")
print(f"Sampler: {sampler_name}")
print("="*60)
# Run optimization
start_time = time.time()
self.study.optimize(self._objective_function, n_trials=n_trials)
elapsed_time = time.time() - start_time
# Get best results
self.best_params = self.study.best_params
self.best_value = self.study.best_value
print("\n" + "="*60)
print("OPTIMIZATION COMPLETE")
print("="*60)
print(f"Total time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)")
print(f"Best objective value: {self.best_value:.6f}")
print(f"Best parameters:")
for param, value in self.best_params.items():
print(f" {param}: {value:.4f}")
print("="*60)
# Save final results
self._save_final_results()
return self.study
def _save_history(self):
"""Save optimization history to CSV and JSON."""
# Save as JSON
history_json_path = self.output_dir / 'history.json'
with open(history_json_path, 'w') as f:
json.dump(self.history, f, indent=2)
# Save as CSV (flattened)
if self.history:
# Flatten nested dicts for CSV
rows = []
for entry in self.history:
row = {
'trial_number': entry['trial_number'],
'timestamp': entry['timestamp'],
'total_objective': entry['total_objective']
}
# Add design variables
for var_name, var_value in entry['design_variables'].items():
row[f'dv_{var_name}'] = var_value
# Add objectives
for obj_name, obj_value in entry['objectives'].items():
row[f'obj_{obj_name}'] = obj_value
# Add constraints
for const_name, const_value in entry['constraints'].items():
row[f'const_{const_name}'] = const_value
rows.append(row)
df = pd.DataFrame(rows)
csv_path = self.output_dir / 'history.csv'
df.to_csv(csv_path, index=False)
def _save_final_results(self):
"""Save final optimization results summary."""
if self.study is None:
return
summary = {
'study_name': self.study.study_name,
'best_value': self.best_value,
'best_params': self.best_params,
'n_trials': len(self.study.trials),
'configuration': self.config,
'timestamp': datetime.now().isoformat()
}
summary_path = self.output_dir / 'optimization_summary.json'
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)
print(f"\nResults saved to: {self.output_dir}")
print(f" - history.json")
print(f" - history.csv")
print(f" - optimization_summary.json")
# Example usage
if __name__ == "__main__":
# This would be replaced with actual NX integration functions
def dummy_model_updater(design_vars: Dict[str, float]):
"""Dummy function - would update NX model."""
print(f"Updating model with: {design_vars}")
def dummy_simulation_runner() -> Path:
"""Dummy function - would run NX simulation."""
print("Running simulation...")
time.sleep(0.5) # Simulate work
return Path("examples/bracket/bracket_sim1-solution_1.op2")
def dummy_mass_extractor(result_path: Path) -> Dict[str, float]:
"""Dummy function - would extract from OP2."""
import random
return {'total_mass': 0.4 + random.random() * 0.1}
def dummy_stress_extractor(result_path: Path) -> Dict[str, float]:
"""Dummy function - would extract from OP2."""
import random
return {'max_von_mises': 150.0 + random.random() * 50.0}
def dummy_displacement_extractor(result_path: Path) -> Dict[str, float]:
"""Dummy function - would extract from OP2."""
import random
return {'max_displacement': 0.8 + random.random() * 0.3}
# Create runner
runner = OptimizationRunner(
config_path=Path("examples/bracket/optimization_config.json"),
model_updater=dummy_model_updater,
simulation_runner=dummy_simulation_runner,
result_extractors={
'mass_extractor': dummy_mass_extractor,
'stress_extractor': dummy_stress_extractor,
'displacement_extractor': dummy_displacement_extractor
}
)
# Run optimization
study = runner.run(study_name="test_bracket_optimization")