Files
Anto01 eabcc4c3ca refactor: Major reorganization of optimization_engine module structure
BREAKING CHANGE: Module paths have been reorganized for better maintainability.
Backwards compatibility aliases with deprecation warnings are provided.

New Structure:
- core/           - Optimization runners (runner, intelligent_optimizer, etc.)
- processors/     - Data processing
  - surrogates/   - Neural network surrogates
- nx/             - NX/Nastran integration (solver, updater, session_manager)
- study/          - Study management (creator, wizard, state, reset)
- reporting/      - Reports and analysis (visualizer, report_generator)
- config/         - Configuration management (manager, builder)
- utils/          - Utilities (logger, auto_doc, etc.)
- future/         - Research/experimental code

Migration:
- ~200 import changes across 125 files
- All __init__.py files use lazy loading to avoid circular imports
- Backwards compatibility layer supports old import paths with warnings
- All existing functionality preserved

To migrate existing code:
  OLD: from optimization_engine.nx_solver import NXSolver
  NEW: from optimization_engine.nx.solver import NXSolver

  OLD: from optimization_engine.runner import OptimizationRunner
  NEW: from optimization_engine.core.runner import OptimizationRunner

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 12:30:59 -05:00

820 lines
32 KiB
Python

"""
Optimization Runner
Orchestrates the optimization loop:
1. Load configuration
2. Initialize Optuna study
3. For each trial:
- Update design variables in NX model
- Run simulation
- Extract results (OP2 file)
- Return objective/constraint values to Optuna
4. Save optimization history
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable
import json
import time
import hashlib
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, GPSampler
import pandas as pd
from datetime import datetime
import pickle
from optimization_engine.plugins import HookManager
from optimization_engine.processors.surrogates.training_data_exporter import create_exporter_from_config
class OptimizationRunner:
"""
Main optimization runner that coordinates:
- Optuna optimization loop
- NX model parameter updates
- Simulation execution
- Result extraction
"""
def __init__(
self,
config_path: Path,
model_updater: Callable,
simulation_runner: Callable,
result_extractors: Dict[str, Callable]
):
"""
Initialize optimization runner.
Args:
config_path: Path to optimization_config.json
model_updater: Function(design_vars: Dict) -> None
Updates NX model with new parameter values
simulation_runner: Function() -> Path
Runs simulation and returns path to result files
result_extractors: Dict mapping extractor name to extraction function
e.g., {'mass_extractor': extract_mass_func}
"""
self.config_path = Path(config_path)
self.config = self._load_config()
self.model_updater = model_updater
self.simulation_runner = simulation_runner
self.result_extractors = result_extractors
# Initialize storage
self.history = []
self.study = None
self.best_params = None
self.best_value = None
# Paths
self.output_dir = self.config_path.parent / 'optimization_results'
self.output_dir.mkdir(exist_ok=True)
# Initialize plugin/hook system
self.hook_manager = HookManager()
plugins_dir = Path(__file__).parent / 'plugins'
if plugins_dir.exists():
self.hook_manager.load_plugins_from_directory(plugins_dir)
summary = self.hook_manager.get_summary()
if summary['total_hooks'] > 0:
print(f"Loaded {summary['enabled_hooks']}/{summary['total_hooks']} plugins")
# Initialize training data exporter (if enabled in config)
self.training_data_exporter = create_exporter_from_config(self.config)
if self.training_data_exporter:
print(f"Training data export enabled: {self.training_data_exporter.export_dir}")
def _load_config(self) -> Dict[str, Any]:
"""Load and validate optimization configuration."""
with open(self.config_path, 'r') as f:
config = json.load(f)
# Validate required fields
required = ['design_variables', 'objectives', 'optimization_settings']
for field in required:
if field not in config:
raise ValueError(f"Missing required field in config: {field}")
return config
def _get_sampler(self, sampler_name: str):
"""Get Optuna sampler instance with enhanced settings."""
opt_settings = self.config.get('optimization_settings', {})
if sampler_name == 'TPE':
# Enhanced TPE sampler for better exploration/exploitation balance
return TPESampler(
n_startup_trials=opt_settings.get('n_startup_trials', 20),
n_ei_candidates=opt_settings.get('tpe_n_ei_candidates', 24),
multivariate=opt_settings.get('tpe_multivariate', True),
seed=42 # For reproducibility
)
elif sampler_name == 'CMAES':
return CmaEsSampler(seed=42)
elif sampler_name == 'GP':
return GPSampler(seed=42)
else:
raise ValueError(f"Unknown sampler: {sampler_name}. Choose from ['TPE', 'CMAES', 'GP']")
def _get_precision(self, var_name: str, units: str) -> int:
"""
Get appropriate decimal precision based on units.
Args:
var_name: Variable name
units: Physical units (mm, degrees, MPa, etc.)
Returns:
Number of decimal places
"""
precision_map = {
'mm': 4,
'millimeter': 4,
'degrees': 4,
'deg': 4,
'mpa': 4,
'gpa': 6,
'kg': 3,
'n': 2,
'dimensionless': 6
}
units_lower = units.lower() if units else 'dimensionless'
return precision_map.get(units_lower, 4) # Default to 4 decimals
def _get_config_hash(self) -> str:
"""
Generate hash of critical configuration parameters.
Used to detect if configuration has changed between study runs.
Returns:
MD5 hash of design variables, objectives, and constraints
"""
# Extract critical config parts that affect optimization
critical_config = {
'design_variables': self.config.get('design_variables', []),
'objectives': self.config.get('objectives', []),
'constraints': self.config.get('constraints', [])
}
config_str = json.dumps(critical_config, sort_keys=True)
return hashlib.md5(config_str.encode()).hexdigest()
def _get_study_metadata_path(self, study_name: str) -> Path:
"""Get path to study metadata file."""
return self.output_dir / f'study_{study_name}_metadata.json'
def _get_study_db_path(self, study_name: str) -> Path:
"""Get path to Optuna study database."""
return self.output_dir / f'study_{study_name}.db'
def _save_study_metadata(self, study_name: str, is_new: bool = False):
"""
Save study metadata for tracking and resumption.
Args:
study_name: Name of the study
is_new: Whether this is a new study (vs resumed)
"""
metadata_path = self._get_study_metadata_path(study_name)
# Load existing metadata if resuming
if metadata_path.exists() and not is_new:
with open(metadata_path, 'r') as f:
metadata = json.load(f)
else:
metadata = {
'study_name': study_name,
'created_at': datetime.now().isoformat(),
'config_hash': self._get_config_hash(),
'total_trials': 0,
'resume_count': 0
}
# Update metadata
if self.study:
metadata['total_trials'] = len(self.study.trials)
metadata['last_updated'] = datetime.now().isoformat()
if not is_new and 'created_at' in metadata:
metadata['resume_count'] = metadata.get('resume_count', 0) + 1
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
def _load_existing_study(self, study_name: str) -> Optional[optuna.Study]:
"""
Load an existing Optuna study from database.
Args:
study_name: Name of the study to load
Returns:
Loaded study or None if not found
"""
db_path = self._get_study_db_path(study_name)
metadata_path = self._get_study_metadata_path(study_name)
if not db_path.exists():
return None
# Check if metadata exists and validate config
if metadata_path.exists():
with open(metadata_path, 'r') as f:
metadata = json.load(f)
current_hash = self._get_config_hash()
stored_hash = metadata.get('config_hash', '')
if current_hash != stored_hash:
print("\n" + "!"*60)
print("WARNING: Configuration has changed since study was created!")
print("!"*60)
print("This may indicate:")
print(" - Different design variables")
print(" - Different objectives or constraints")
print(" - Topology/geometry changes")
print("\nRecommendation: Create a NEW study instead of resuming.")
print("!"*60)
response = input("\nContinue anyway? (yes/no): ")
if response.lower() not in ['yes', 'y']:
print("Aborting. Please create a new study.")
return None
# Load study from SQLite database
storage = optuna.storages.RDBStorage(
url=f"sqlite:///{db_path}",
engine_kwargs={"connect_args": {"timeout": 10.0}}
)
try:
study = optuna.load_study(
study_name=study_name,
storage=storage
)
print("\n" + "="*60)
print(f"LOADED EXISTING STUDY: {study_name}")
print("="*60)
print(f"Trials completed: {len(study.trials)}")
if len(study.trials) > 0:
print(f"Best value so far: {study.best_value:.6f}")
print(f"Best parameters:")
for param, value in study.best_params.items():
print(f" {param}: {value:.4f}")
print("="*60)
# Load existing history
history_json_path = self.output_dir / 'history.json'
if history_json_path.exists():
with open(history_json_path, 'r') as f:
self.history = json.load(f)
print(f"Loaded {len(self.history)} previous trials from history")
return study
except Exception as e:
print(f"Error loading study: {e}")
return None
def list_studies(self) -> List[Dict[str, Any]]:
"""
List all available studies in the output directory.
Returns:
List of study metadata dictionaries
"""
studies = []
for metadata_file in self.output_dir.glob('study_*_metadata.json'):
try:
with open(metadata_file, 'r') as f:
metadata = json.load(f)
studies.append(metadata)
except Exception as e:
print(f"Error reading {metadata_file}: {e}")
return sorted(studies, key=lambda x: x.get('created_at', ''), reverse=True)
def _objective_function(self, trial: optuna.Trial) -> float:
"""
Optuna objective function.
This is called for each optimization trial.
Args:
trial: Optuna trial object
Returns:
Objective value (float) or tuple of values for multi-objective
"""
# 1. Sample design variables with appropriate precision
design_vars = {}
# Handle both dict and list formats for design_variables
if isinstance(self.config['design_variables'], dict):
# New format: {var_name: {type, min, max, ...}}
for var_name, var_info in self.config['design_variables'].items():
if var_info['type'] == 'continuous':
value = trial.suggest_float(
var_name,
var_info['min'],
var_info['max']
)
# Round to appropriate precision
precision = self._get_precision(var_name, var_info.get('units', ''))
design_vars[var_name] = round(value, precision)
elif var_info['type'] in ['discrete', 'integer']:
design_vars[var_name] = trial.suggest_int(
var_name,
int(var_info['min']),
int(var_info['max'])
)
else:
# Old format: [{name, type, bounds, ...}]
for dv in self.config['design_variables']:
if dv['type'] == 'continuous':
value = trial.suggest_float(
dv['name'],
dv['bounds'][0],
dv['bounds'][1]
)
# Round to appropriate precision
precision = self._get_precision(dv['name'], dv.get('units', ''))
design_vars[dv['name']] = round(value, precision)
elif dv['type'] == 'discrete':
design_vars[dv['name']] = trial.suggest_int(
dv['name'],
int(dv['bounds'][0]),
int(dv['bounds'][1])
)
# Execute pre_solve hooks
pre_solve_context = {
'trial_number': trial.number,
'design_variables': design_vars,
'sim_file': self.config.get('sim_file', ''),
'working_dir': str(Path.cwd()),
'config': self.config,
'output_dir': str(self.output_dir) # Add output_dir to context
}
self.hook_manager.execute_hooks('pre_solve', pre_solve_context, fail_fast=False)
# 2. Update NX model with new parameters
try:
self.model_updater(design_vars)
except Exception as e:
print(f"Error updating model: {e}")
raise optuna.TrialPruned()
# Execute post_mesh hooks (after model update)
post_mesh_context = {
'trial_number': trial.number,
'design_variables': design_vars,
'sim_file': self.config.get('sim_file', ''),
'working_dir': str(Path.cwd())
}
self.hook_manager.execute_hooks('post_mesh', post_mesh_context, fail_fast=False)
# 3. Run simulation
try:
result_path = self.simulation_runner()
except Exception as e:
print(f"Error running simulation: {e}")
raise optuna.TrialPruned()
# Execute post_solve hooks
post_solve_context = {
'trial_number': trial.number,
'design_variables': design_vars,
'result_path': str(result_path) if result_path else '',
'working_dir': str(Path.cwd()),
'output_dir': str(self.output_dir) # Add output_dir to context
}
self.hook_manager.execute_hooks('post_solve', post_solve_context, fail_fast=False)
# 4. Extract results with appropriate precision
extracted_results = {}
for obj in self.config['objectives']:
extractor_name = obj['extractor']
if extractor_name not in self.result_extractors:
raise ValueError(f"Missing result extractor: {extractor_name}")
extractor_func = self.result_extractors[extractor_name]
try:
result = extractor_func(result_path)
metric_name = obj['metric']
value = result[metric_name]
# Round to appropriate precision based on units
precision = self._get_precision(obj['name'], obj.get('units', ''))
extracted_results[obj['name']] = round(value, precision)
except Exception as e:
print(f"Error extracting {obj['name']}: {e}")
raise optuna.TrialPruned()
# Extract constraints with appropriate precision
for const in self.config.get('constraints', []):
extractor_name = const['extractor']
if extractor_name not in self.result_extractors:
raise ValueError(f"Missing result extractor: {extractor_name}")
extractor_func = self.result_extractors[extractor_name]
try:
result = extractor_func(result_path)
metric_name = const['metric']
value = result[metric_name]
# Round to appropriate precision based on units
precision = self._get_precision(const['name'], const.get('units', ''))
extracted_results[const['name']] = round(value, precision)
except Exception as e:
print(f"Error extracting {const['name']}: {e}")
raise optuna.TrialPruned()
# Execute post_extraction hooks
post_extraction_context = {
'trial_number': trial.number,
'design_variables': design_vars,
'extracted_results': extracted_results,
'result_path': str(result_path) if result_path else '',
'working_dir': str(Path.cwd()),
'output_dir': str(self.output_dir) # Add output_dir to context
}
self.hook_manager.execute_hooks('post_extraction', post_extraction_context, fail_fast=False)
# Export training data (if enabled)
if self.training_data_exporter:
# Determine .dat and .op2 file paths from result_path
# NX naming: sim_name-solution_N.dat and sim_name-solution_N.op2
if result_path:
sim_dir = Path(result_path).parent if Path(result_path).is_file() else Path(result_path)
sim_name = self.config.get('sim_file', '').replace('.sim', '')
# Try to find the .dat and .op2 files
# Typically: sim_name-solution_1.dat and sim_name-solution_1.op2
dat_files = list(sim_dir.glob(f"{Path(sim_name).stem}*.dat"))
op2_files = list(sim_dir.glob(f"{Path(sim_name).stem}*.op2"))
if dat_files and op2_files:
simulation_files = {
'dat_file': dat_files[0], # Use first match
'op2_file': op2_files[0]
}
self.training_data_exporter.export_trial(
trial_number=trial.number,
design_variables=design_vars,
results=extracted_results,
simulation_files=simulation_files
)
# 5. Evaluate constraints
for const in self.config.get('constraints', []):
value = extracted_results[const['name']]
limit = const['limit']
if const['type'] == 'upper_bound' and value > limit:
# Constraint violated - prune trial or penalize
print(f"Constraint violated: {const['name']} = {value:.4f} > {limit:.4f}")
raise optuna.TrialPruned()
elif const['type'] == 'lower_bound' and value < limit:
print(f"Constraint violated: {const['name']} = {value:.4f} < {limit:.4f}")
raise optuna.TrialPruned()
# 6. Calculate weighted objective
# For multi-objective: weighted sum approach
total_objective = 0.0
for obj in self.config['objectives']:
value = extracted_results[obj['name']]
weight = obj.get('weight', 1.0)
direction = obj.get('direction', 'minimize')
# Normalize by weight
if direction == 'minimize':
total_objective += weight * value
else: # maximize
total_objective -= weight * value
# Execute custom_objective hooks (can modify total_objective)
custom_objective_context = {
'trial_number': trial.number,
'design_variables': design_vars,
'extracted_results': extracted_results,
'total_objective': total_objective,
'working_dir': str(Path.cwd())
}
custom_results = self.hook_manager.execute_hooks('custom_objective', custom_objective_context, fail_fast=False)
# Allow hooks to override objective value
for result in custom_results:
if result and 'total_objective' in result:
total_objective = result['total_objective']
print(f"Custom objective hook modified total_objective to {total_objective:.6f}")
break # Use first hook that provides override
# 7. Store results in history
history_entry = {
'trial_number': trial.number,
'timestamp': datetime.now().isoformat(),
'design_variables': design_vars,
'objectives': {obj['name']: extracted_results[obj['name']] for obj in self.config['objectives']},
'constraints': {const['name']: extracted_results[const['name']] for const in self.config.get('constraints', [])},
'total_objective': total_objective
}
self.history.append(history_entry)
# Save history after each trial
self._save_history()
print(f"\nTrial {trial.number} completed:")
print(f" Design vars: {design_vars}")
print(f" Objectives: {history_entry['objectives']}")
print(f" Total objective: {total_objective:.6f}")
return total_objective
def run(
self,
study_name: Optional[str] = None,
n_trials: Optional[int] = None,
resume: bool = False
) -> optuna.Study:
"""
Run the optimization.
Args:
study_name: Optional name for the study. If None, generates timestamp-based name.
n_trials: Number of trials to run. If None, uses config value.
When resuming, this is ADDITIONAL trials to run.
resume: If True, attempts to resume existing study. If False, creates new study.
Returns:
Completed Optuna study
Examples:
# New study with 50 trials
runner.run(study_name="bracket_opt_v1", n_trials=50)
# Resume existing study for 25 more trials
runner.run(study_name="bracket_opt_v1", n_trials=25, resume=True)
# New study after topology change
runner.run(study_name="bracket_opt_v2", n_trials=50)
"""
if study_name is None:
study_name = f"optimization_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get optimization settings
settings = self.config['optimization_settings']
if n_trials is None:
n_trials = settings.get('n_trials', 100)
sampler_name = settings.get('sampler', 'TPE')
# Try to load existing study if resume=True
if resume:
existing_study = self._load_existing_study(study_name)
if existing_study is not None:
self.study = existing_study
trials_completed = len(self.study.trials)
print("\n" + "="*60)
print(f"RESUMING OPTIMIZATION: {study_name}")
print("="*60)
print(f"Trials already completed: {trials_completed}")
print(f"Additional trials to run: {n_trials}")
print(f"Total trials after completion: {trials_completed + n_trials}")
print("="*60)
# Save metadata indicating this is a resume
self._save_study_metadata(study_name, is_new=False)
else:
print(f"\nNo existing study '{study_name}' found. Creating new study instead.")
resume = False
# Create new study if not resuming or if resume failed
if not resume or self.study is None:
# Create storage for persistence
db_path = self._get_study_db_path(study_name)
storage = optuna.storages.RDBStorage(
url=f"sqlite:///{db_path}",
engine_kwargs={"connect_args": {"timeout": 10.0}}
)
sampler = self._get_sampler(sampler_name)
self.study = optuna.create_study(
study_name=study_name,
direction='minimize', # Total weighted objective is always minimized
sampler=sampler,
storage=storage,
load_if_exists=False # Force new study
)
print("="*60)
print(f"STARTING NEW OPTIMIZATION: {study_name}")
print("="*60)
print(f"Design Variables: {len(self.config['design_variables'])}")
print(f"Objectives: {len(self.config['objectives'])}")
print(f"Constraints: {len(self.config.get('constraints', []))}")
print(f"Trials: {n_trials}")
print(f"Sampler: {sampler_name}")
print("="*60)
# Save metadata for new study
self._save_study_metadata(study_name, is_new=True)
# Run optimization
start_time = time.time()
self.study.optimize(self._objective_function, n_trials=n_trials)
elapsed_time = time.time() - start_time
# Get best results
self.best_params = self.study.best_params
self.best_value = self.study.best_value
print("\n" + "="*60)
print("OPTIMIZATION COMPLETE")
print("="*60)
print(f"Time for this run: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} minutes)")
print(f"Total trials completed: {len(self.study.trials)}")
print(f"Best objective value: {self.best_value:.6f}")
print(f"Best parameters:")
for param, value in self.best_params.items():
print(f" {param}: {value:.4f}")
print("="*60)
# Save metadata and final results
self._save_study_metadata(study_name)
self._save_final_results()
# Finalize training data export (if enabled)
if self.training_data_exporter:
self.training_data_exporter.finalize()
print(f"Training data export finalized: {self.training_data_exporter.trial_count} trials exported")
# Post-processing: Visualization and Model Cleanup
self._run_post_processing()
return self.study
def _save_history(self):
"""Save optimization history to CSV and JSON."""
# Save as JSON
history_json_path = self.output_dir / 'history.json'
with open(history_json_path, 'w') as f:
json.dump(self.history, f, indent=2)
# Save as CSV (flattened)
if self.history:
# Flatten nested dicts for CSV
rows = []
for entry in self.history:
row = {
'trial_number': entry['trial_number'],
'timestamp': entry['timestamp'],
'total_objective': entry['total_objective']
}
# Add design variables
for var_name, var_value in entry['design_variables'].items():
row[f'dv_{var_name}'] = var_value
# Add objectives
for obj_name, obj_value in entry['objectives'].items():
row[f'obj_{obj_name}'] = obj_value
# Add constraints
for const_name, const_value in entry['constraints'].items():
row[f'const_{const_name}'] = const_value
rows.append(row)
df = pd.DataFrame(rows)
csv_path = self.output_dir / 'history.csv'
df.to_csv(csv_path, index=False)
def _save_final_results(self):
"""Save final optimization results summary."""
if self.study is None:
return
summary = {
'study_name': self.study.study_name,
'best_value': self.best_value,
'best_params': self.best_params,
'n_trials': len(self.study.trials),
'configuration': self.config,
'timestamp': datetime.now().isoformat()
}
summary_path = self.output_dir / 'optimization_summary.json'
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)
print(f"\nResults saved to: {self.output_dir}")
print(f" - history.json")
print(f" - history.csv")
print(f" - optimization_summary.json")
def _run_post_processing(self):
"""
Run post-processing tasks: visualization and model cleanup.
Based on config settings in 'post_processing' section:
- generate_plots: Generate matplotlib visualizations
- cleanup_models: Delete CAD/FEM files for non-top trials
"""
post_config = self.config.get('post_processing', {})
if not post_config:
return # No post-processing configured
print("\n" + "="*60)
print("POST-PROCESSING")
print("="*60)
# 1. Generate Visualization Plots
if post_config.get('generate_plots', False):
print("\nGenerating visualization plots...")
try:
from optimization_engine.reporting.visualizer import OptimizationVisualizer
formats = post_config.get('plot_formats', ['png', 'pdf'])
visualizer = OptimizationVisualizer(self.output_dir)
visualizer.generate_all_plots(save_formats=formats)
summary = visualizer.generate_plot_summary()
print(f" Plots generated: {len(formats)} format(s)")
print(f" Improvement: {summary['improvement_percent']:.1f}%")
print(f" Location: {visualizer.plots_dir}")
except Exception as e:
print(f" WARNING: Plot generation failed: {e}")
print(" Continuing with optimization results...")
# 2. Model Cleanup
if post_config.get('cleanup_models', False):
print("\nCleaning up trial models...")
try:
from optimization_engine.nx.model_cleanup import ModelCleanup
keep_n = post_config.get('keep_top_n_models', 10)
dry_run = post_config.get('cleanup_dry_run', False)
cleaner = ModelCleanup(self.output_dir)
stats = cleaner.cleanup_models(keep_top_n=keep_n, dry_run=dry_run)
if dry_run:
print(f" [DRY RUN] Would delete {stats['files_deleted']} files")
print(f" [DRY RUN] Would free {stats['space_freed_mb']:.1f} MB")
else:
print(f" Deleted {stats['files_deleted']} files from {stats['cleaned_trials']} trials")
print(f" Space freed: {stats['space_freed_mb']:.1f} MB")
print(f" Kept top {stats['kept_trials']} trial models")
except Exception as e:
print(f" WARNING: Model cleanup failed: {e}")
print(" All trial files retained...")
print("="*60 + "\n")
# Example usage
if __name__ == "__main__":
# This would be replaced with actual NX integration functions
def dummy_model_updater(design_vars: Dict[str, float]):
"""Dummy function - would update NX model."""
print(f"Updating model with: {design_vars}")
def dummy_simulation_runner() -> Path:
"""Dummy function - would run NX simulation."""
print("Running simulation...")
time.sleep(0.5) # Simulate work
return Path("examples/bracket/bracket_sim1-solution_1.op2")
def dummy_mass_extractor(result_path: Path) -> Dict[str, float]:
"""Dummy function - would extract from OP2."""
import random
return {'total_mass': 0.4 + random.random() * 0.1}
def dummy_stress_extractor(result_path: Path) -> Dict[str, float]:
"""Dummy function - would extract from OP2."""
import random
return {'max_von_mises': 150.0 + random.random() * 50.0}
def dummy_displacement_extractor(result_path: Path) -> Dict[str, float]:
"""Dummy function - would extract from OP2."""
import random
return {'max_displacement': 0.8 + random.random() * 0.3}
# Create runner
runner = OptimizationRunner(
config_path=Path("examples/bracket/optimization_config.json"),
model_updater=dummy_model_updater,
simulation_runner=dummy_simulation_runner,
result_extractors={
'mass_extractor': dummy_mass_extractor,
'stress_extractor': dummy_stress_extractor,
'displacement_extractor': dummy_displacement_extractor
}
)
# Run optimization
study = runner.run(study_name="test_bracket_optimization")