feat: Add substudy system with live history tracking and workflow fixes

Major Features:
- Hierarchical substudy system (like NX Solutions/Subcases)
  * Shared model files across all substudies
  * Independent configuration per substudy
  * Continuation support from previous substudies
  * Real-time incremental history updates
- Live history tracking with optimization_history_incremental.json
- Complete bracket_displacement_maximizing study with substudy examples

Core Fixes:
- Fixed expression update workflow to pass design_vars through simulation_runner
  * Restored working NX journal expression update mechanism
  * OP2 timestamp verification instead of file deletion
  * Resolved issue where all trials returned identical objective values
- Fixed LLMOptimizationRunner to pass design variables to simulation runner
- Enhanced NXSolver with timestamp-based file regeneration verification

New Components:
- optimization_engine/llm_optimization_runner.py - LLM-driven optimization runner
- optimization_engine/optimization_setup_wizard.py - Phase 3.3 setup wizard
- studies/bracket_displacement_maximizing/ - Complete substudy example
  * run_substudy.py - Substudy runner with continuation
  * run_optimization.py - Standalone optimization runner
  * config/substudy_template.json - Template for new substudies
  * substudies/coarse_exploration/ - 20-trial coarse search
  * substudies/fine_tuning/ - 50-trial refinement (continuation example)
  * SUBSTUDIES_README.md - Complete substudy documentation

Technical Improvements:
- Incremental history saving after each trial (optimization_history_incremental.json)
- Expression update workflow: .prt update → NX journal receives values → geometry update → FEM update → solve
- Trial indexing fix in substudy result saving
- Updated README with substudy system documentation

Testing:
- Successfully ran 20-trial coarse_exploration substudy
- Verified different objective values across trials (workflow fix validated)
- Confirmed live history updates in real-time
- Tested shared model file usage across substudies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-16 21:29:54 -05:00
parent 90a9e020d8
commit 2f3afc3813
126 changed files with 15592 additions and 97 deletions

View File

@@ -0,0 +1,528 @@
"""
LLM-Enhanced Optimization Runner - Phase 3.2
Flexible LLM-enhanced optimization runner that integrates:
- Phase 2.7: LLM workflow analysis
- Phase 2.8: Inline code generation (optional)
- Phase 2.9: Post-processing hook generation (optional)
- Phase 3.0: pyNastran research agent (optional)
- Phase 3.1: Extractor orchestration (optional)
This runner enables users to describe optimization goals in natural language
and choose to leverage automated code generation, manual coding, or a hybrid approach.
Author: Atomizer Development Team
Version: 0.1.0 (Phase 3.2)
Last Updated: 2025-01-16
"""
from pathlib import Path
from typing import Dict, Any, List, Optional
import json
import logging
import optuna
from datetime import datetime
from optimization_engine.extractor_orchestrator import ExtractorOrchestrator
from optimization_engine.inline_code_generator import InlineCodeGenerator
from optimization_engine.hook_generator import HookGenerator
from optimization_engine.plugins.hook_manager import HookManager
logger = logging.getLogger(__name__)
class LLMOptimizationRunner:
"""
LLM-enhanced optimization runner with flexible automation options.
This runner empowers users to leverage LLM-assisted code generation for:
- OP2 result extractors (Phase 3.1) - optional
- Inline calculations (Phase 2.8) - optional
- Post-processing hooks (Phase 2.9) - optional
Users can describe goals in natural language and choose automated generation,
manual coding, or a hybrid approach based on their needs.
"""
def __init__(self,
llm_workflow: Dict[str, Any],
model_updater: callable,
simulation_runner: callable,
study_name: str = "llm_optimization",
output_dir: Optional[Path] = None):
"""
Initialize LLM-driven optimization runner.
Args:
llm_workflow: Output from Phase 2.7 LLM analysis with:
- engineering_features: List of FEA operations
- inline_calculations: List of simple math operations
- post_processing_hooks: List of custom calculations
- optimization: Dict with algorithm, design_variables, etc.
model_updater: Function(design_vars: Dict) -> None
simulation_runner: Function() -> Path (returns OP2 file path)
study_name: Name for Optuna study
output_dir: Directory for results
"""
self.llm_workflow = llm_workflow
self.model_updater = model_updater
self.simulation_runner = simulation_runner
self.study_name = study_name
if output_dir is None:
output_dir = Path.cwd() / "optimization_results" / study_name
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize automation components
self._initialize_automation()
# Optuna study
self.study = None
self.history = []
logger.info(f"LLMOptimizationRunner initialized for study: {study_name}")
def _initialize_automation(self):
"""Initialize all automation components from LLM workflow."""
logger.info("Initializing automation components...")
# Phase 3.1: Extractor Orchestrator
logger.info(" - Phase 3.1: Extractor Orchestrator")
self.orchestrator = ExtractorOrchestrator(
extractors_dir=self.output_dir / "generated_extractors"
)
# Generate extractors from LLM workflow
self.extractors = self.orchestrator.process_llm_workflow(self.llm_workflow)
logger.info(f" Generated {len(self.extractors)} extractor(s)")
# Phase 2.8: Inline Code Generator
logger.info(" - Phase 2.8: Inline Code Generator")
self.inline_generator = InlineCodeGenerator()
self.inline_code = []
for calc in self.llm_workflow.get('inline_calculations', []):
generated = self.inline_generator.generate_from_llm_output(calc)
self.inline_code.append(generated.code)
logger.info(f" Generated {len(self.inline_code)} inline calculation(s)")
# Phase 2.9: Hook Generator
logger.info(" - Phase 2.9: Hook Generator")
self.hook_generator = HookGenerator()
# Generate lifecycle hooks from post_processing_hooks
hook_dir = self.output_dir / "generated_hooks"
hook_dir.mkdir(exist_ok=True)
for hook_spec in self.llm_workflow.get('post_processing_hooks', []):
hook_content = self.hook_generator.generate_lifecycle_hook(
hook_spec,
hook_point='post_calculation'
)
# Save hook
hook_name = hook_spec.get('action', 'custom_hook')
hook_file = hook_dir / f"{hook_name}.py"
with open(hook_file, 'w') as f:
f.write(hook_content)
logger.info(f" Generated hook: {hook_name}")
# Phase 1: Hook Manager
logger.info(" - Phase 1: Hook Manager")
self.hook_manager = HookManager()
# Load generated hooks
if hook_dir.exists():
self.hook_manager.load_plugins_from_directory(hook_dir)
# Load system hooks
system_hooks_dir = Path(__file__).parent / 'plugins'
if system_hooks_dir.exists():
self.hook_manager.load_plugins_from_directory(system_hooks_dir)
summary = self.hook_manager.get_summary()
logger.info(f" Loaded {summary['enabled_hooks']} hook(s)")
logger.info("Automation components initialized successfully!")
def _create_optuna_study(self) -> optuna.Study:
"""Create Optuna study from LLM workflow optimization settings."""
opt_config = self.llm_workflow.get('optimization', {})
# Determine direction (minimize or maximize)
direction = opt_config.get('direction', 'minimize')
# Create study
study = optuna.create_study(
study_name=self.study_name,
direction=direction,
storage=f"sqlite:///{self.output_dir / f'{self.study_name}.db'}",
load_if_exists=True
)
logger.info(f"Created Optuna study: {self.study_name} (direction: {direction})")
return study
def _objective(self, trial: optuna.Trial) -> float:
"""
Optuna objective function - LLM-enhanced with flexible automation!
This function leverages LLM workflow analysis with user-configurable automation:
1. Suggests design variables from LLM analysis
2. Updates model
3. Runs simulation
4. Extracts results (using generated or manual extractors)
5. Executes inline calculations (generated or manual)
6. Executes post-calculation hooks (generated or manual)
7. Returns objective value
Args:
trial: Optuna trial
Returns:
Objective value
"""
trial_number = trial.number
logger.info(f"\n{'='*80}")
logger.info(f"Trial {trial_number} starting...")
logger.info(f"{'='*80}")
# ====================================================================
# STEP 1: Suggest Design Variables
# ====================================================================
design_vars_config = self.llm_workflow.get('optimization', {}).get('design_variables', [])
design_vars = {}
for var_config in design_vars_config:
var_name = var_config['parameter']
var_min = var_config.get('min', 0.0)
var_max = var_config.get('max', 1.0)
# Suggest value using Optuna
design_vars[var_name] = trial.suggest_float(var_name, var_min, var_max)
logger.info(f"Design variables: {design_vars}")
# Execute pre-solve hooks
self.hook_manager.execute_hooks('pre_solve', {
'trial_number': trial_number,
'design_variables': design_vars
})
# ====================================================================
# STEP 2: Update Model
# ====================================================================
logger.info("Updating model...")
self.model_updater(design_vars)
# ====================================================================
# STEP 3: Run Simulation
# ====================================================================
logger.info("Running simulation...")
# Pass design_vars to simulation_runner so NX journal can update expressions
op2_file = self.simulation_runner(design_vars)
logger.info(f"Simulation complete: {op2_file}")
# Execute post-solve hooks
self.hook_manager.execute_hooks('post_solve', {
'trial_number': trial_number,
'op2_file': op2_file
})
# ====================================================================
# STEP 4: Extract Results (Phase 3.1 - Auto-Generated Extractors)
# ====================================================================
logger.info("Extracting results...")
results = {}
for extractor in self.extractors:
try:
extraction_result = self.orchestrator.execute_extractor(
extractor.name,
Path(op2_file),
subcase=1
)
results.update(extraction_result)
logger.info(f" {extractor.name}: {list(extraction_result.keys())}")
except Exception as e:
logger.error(f"Extraction failed for {extractor.name}: {e}")
# Continue with other extractors
# Execute post-extraction hooks
self.hook_manager.execute_hooks('post_extraction', {
'trial_number': trial_number,
'results': results
})
# ====================================================================
# STEP 5: Inline Calculations (Phase 2.8 - Auto-Generated Code)
# ====================================================================
logger.info("Executing inline calculations...")
calculations = {}
calc_namespace = {**results, **calculations} # Make results available
for calc_code in self.inline_code:
try:
exec(calc_code, calc_namespace)
# Extract newly created variables
for key, value in calc_namespace.items():
if key not in results and not key.startswith('_'):
calculations[key] = value
logger.info(f" Executed: {calc_code[:50]}...")
except Exception as e:
logger.error(f"Inline calculation failed: {e}")
logger.info(f"Calculations: {calculations}")
# ====================================================================
# STEP 6: Post-Calculation Hooks (Phase 2.9 - Auto-Generated Hooks)
# ====================================================================
logger.info("Executing post-calculation hooks...")
hook_results = self.hook_manager.execute_hooks('post_calculation', {
'trial_number': trial_number,
'design_variables': design_vars,
'results': results,
'calculations': calculations
})
# Merge hook results
final_context = {**results, **calculations}
for hook_result in hook_results:
if hook_result:
final_context.update(hook_result)
logger.info(f"Hook results: {hook_results}")
# ====================================================================
# STEP 7: Extract Objective Value
# ====================================================================
# Try to get objective from hooks first
objective = None
# Check hook results for 'objective' or 'weighted_objective'
for hook_result in hook_results:
if hook_result:
if 'objective' in hook_result:
objective = hook_result['objective']
break
elif 'weighted_objective' in hook_result:
objective = hook_result['weighted_objective']
break
# Fallback: use first extracted result
if objective is None:
# Try common objective names
for key in ['max_displacement', 'max_stress', 'max_von_mises']:
if key in final_context:
objective = final_context[key]
logger.warning(f"No explicit objective found, using: {key}")
break
if objective is None:
raise ValueError("Could not determine objective value from results/calculations/hooks")
logger.info(f"Objective value: {objective}")
# Save trial history
trial_data = {
'trial_number': trial_number,
'design_variables': design_vars,
'results': results,
'calculations': calculations,
'objective': objective
}
self.history.append(trial_data)
# Incremental save - write history after each trial
# This allows monitoring progress in real-time
self._save_incremental_history()
return float(objective)
def run_optimization(self, n_trials: int = 50) -> Dict[str, Any]:
"""
Run LLM-enhanced optimization with flexible automation.
Args:
n_trials: Number of optimization trials
Returns:
Dict with:
- best_params: Best design variable values
- best_value: Best objective value
- history: Complete trial history
"""
logger.info(f"\n{'='*80}")
logger.info(f"Starting LLM-Driven Optimization")
logger.info(f"{'='*80}")
logger.info(f"Study: {self.study_name}")
logger.info(f"Trials: {n_trials}")
logger.info(f"Output: {self.output_dir}")
logger.info(f"{'='*80}\n")
# Create study
self.study = self._create_optuna_study()
# Run optimization
self.study.optimize(self._objective, n_trials=n_trials)
# Get results
best_trial = self.study.best_trial
results = {
'best_params': best_trial.params,
'best_value': best_trial.value,
'best_trial_number': best_trial.number,
'history': self.history
}
# Save results
self._save_results(results)
logger.info(f"\n{'='*80}")
logger.info("Optimization Complete!")
logger.info(f"{'='*80}")
logger.info(f"Best value: {results['best_value']}")
logger.info(f"Best params: {results['best_params']}")
logger.info(f"Results saved to: {self.output_dir}")
logger.info(f"{'='*80}\n")
return results
def _save_incremental_history(self):
"""
Save trial history incrementally after each trial.
This allows real-time monitoring of optimization progress.
"""
history_file = self.output_dir / "optimization_history_incremental.json"
# Convert history to JSON-serializable format
serializable_history = []
for trial in self.history:
trial_copy = trial.copy()
# Convert any numpy types to native Python types
for key in ['results', 'calculations', 'design_variables']:
if key in trial_copy:
trial_copy[key] = {k: float(v) if isinstance(v, (int, float)) else v
for k, v in trial_copy[key].items()}
if 'objective' in trial_copy:
trial_copy['objective'] = float(trial_copy['objective'])
serializable_history.append(trial_copy)
# Write to file
with open(history_file, 'w') as f:
json.dump(serializable_history, f, indent=2, default=str)
def _save_results(self, results: Dict[str, Any]):
"""Save optimization results to file."""
results_file = self.output_dir / "optimization_results.json"
# Make history JSON serializable
serializable_results = {
'best_params': results['best_params'],
'best_value': results['best_value'],
'best_trial_number': results['best_trial_number'],
'timestamp': datetime.now().isoformat(),
'study_name': self.study_name,
'n_trials': len(results['history'])
}
with open(results_file, 'w') as f:
json.dump(serializable_results, f, indent=2)
logger.info(f"Results saved to: {results_file}")
def main():
"""Test LLM-driven optimization runner."""
print("=" * 80)
print("Phase 3.2: LLM-Driven Optimization Runner Test")
print("=" * 80)
print()
# Example LLM workflow (from Phase 2.7)
llm_workflow = {
"engineering_features": [
{
"action": "extract_displacement",
"domain": "result_extraction",
"description": "Extract displacement from OP2",
"params": {"result_type": "displacement"}
}
],
"inline_calculations": [
{
"action": "normalize",
"params": {
"input": "max_displacement",
"reference": "max_allowed_disp",
"value": 5.0
},
"code_hint": "norm_disp = max_displacement / 5.0"
}
],
"post_processing_hooks": [
{
"action": "weighted_objective",
"params": {
"inputs": ["norm_disp"],
"weights": [1.0],
"objective": "minimize"
}
}
],
"optimization": {
"algorithm": "TPE",
"direction": "minimize",
"design_variables": [
{
"parameter": "wall_thickness",
"min": 3.0,
"max": 8.0,
"type": "continuous"
}
]
}
}
print("LLM Workflow Configuration:")
print(f" Engineering features: {len(llm_workflow['engineering_features'])}")
print(f" Inline calculations: {len(llm_workflow['inline_calculations'])}")
print(f" Post-processing hooks: {len(llm_workflow['post_processing_hooks'])}")
print(f" Design variables: {len(llm_workflow['optimization']['design_variables'])}")
print()
# Dummy functions for testing
def dummy_model_updater(design_vars):
print(f" [Dummy] Updating model with: {design_vars}")
def dummy_simulation_runner():
print(" [Dummy] Running simulation...")
# Return path to test OP2
return Path("tests/bracket_sim1-solution_1.op2")
# Initialize runner
print("Initializing LLM-driven optimization runner...")
runner = LLMOptimizationRunner(
llm_workflow=llm_workflow,
model_updater=dummy_model_updater,
simulation_runner=dummy_simulation_runner,
study_name="test_llm_optimization"
)
print()
print("=" * 80)
print("Runner initialized successfully!")
print("Ready to run optimization with auto-generated code!")
print("=" * 80)
if __name__ == '__main__':
main()

View File

@@ -194,22 +194,16 @@ class NXSolver:
print(f" Working dir: {working_dir}")
print(f" Mode: {'Journal' if self.use_journal else 'Direct'}")
# Delete old result files (.op2, .log, .f06) to force fresh solve
# (.dat file is needed by NX, don't delete it!)
# (Otherwise NX may reuse cached results)
files_to_delete = [op2_file, log_file, f06_file]
# Record timestamps of old files BEFORE solving
# We'll verify files are regenerated by checking timestamps AFTER solve
# This is more reliable than deleting (which can fail due to file locking on Windows)
old_op2_time = op2_file.stat().st_mtime if op2_file.exists() else None
old_f06_time = f06_file.stat().st_mtime if f06_file.exists() else None
old_log_time = log_file.stat().st_mtime if log_file.exists() else None
deleted_count = 0
for old_file in files_to_delete:
if old_file.exists():
try:
old_file.unlink()
deleted_count += 1
except Exception as e:
print(f" Warning: Could not delete {old_file.name}: {e}")
if deleted_count > 0:
print(f" Deleted {deleted_count} old result file(s) to force fresh solve")
if old_op2_time:
print(f" Found existing OP2 (modified: {time.ctime(old_op2_time)})")
print(f" Will verify NX regenerates it with newer timestamp")
# Build command based on mode
if self.use_journal and sim_file.suffix == '.sim':
@@ -308,19 +302,41 @@ sys.argv = ['', {argv_str}] # Set argv for the main function
for line in result.stderr.strip().split('\n')[:5]:
print(f" {line}")
# Wait for output files to appear (journal mode runs solve in background)
# Wait for output files to appear AND be regenerated (journal mode runs solve in background)
if self.use_journal:
max_wait = 30 # seconds - background solves can take time
wait_start = time.time()
print("[NX SOLVER] Waiting for solve to complete...")
while not (f06_file.exists() and op2_file.exists()) and (time.time() - wait_start) < max_wait:
# Wait for files to exist AND have newer timestamps than before
while (time.time() - wait_start) < max_wait:
files_exist = f06_file.exists() and op2_file.exists()
if files_exist:
# Verify files were regenerated (newer timestamps)
new_op2_time = op2_file.stat().st_mtime
new_f06_time = f06_file.stat().st_mtime
# If no old files, or new files are newer, we're done!
if (old_op2_time is None or new_op2_time > old_op2_time) and \
(old_f06_time is None or new_f06_time > old_f06_time):
elapsed = time.time() - wait_start
print(f"[NX SOLVER] Fresh output files detected after {elapsed:.1f}s")
if old_op2_time:
print(f" OP2 regenerated: {time.ctime(old_op2_time)} -> {time.ctime(new_op2_time)}")
break
time.sleep(0.5)
if (time.time() - wait_start) % 2 < 0.5: # Print every 2 seconds
elapsed = time.time() - wait_start
print(f" Waiting... ({elapsed:.0f}s)")
print(f" Waiting for fresh results... ({elapsed:.0f}s)")
if f06_file.exists() and op2_file.exists():
print(f"[NX SOLVER] Output files detected after {time.time() - wait_start:.1f}s")
# Final check - warn if files weren't regenerated
if op2_file.exists():
current_op2_time = op2_file.stat().st_mtime
if old_op2_time and current_op2_time <= old_op2_time:
print(f" WARNING: OP2 file was NOT regenerated! (Still has old timestamp)")
print(f" Old: {time.ctime(old_op2_time)}, Current: {time.ctime(current_op2_time)}")
# Check for completion
success = self._check_solution_success(f06_file, log_file)

View File

@@ -88,6 +88,24 @@ class NXParameterUpdater:
return expressions
def get_all_expressions(self) -> Dict[str, Dict[str, any]]:
"""
Get all expressions as a dictionary.
Returns:
Dict mapping expression name to info dict with 'value', 'units', 'type'
"""
expressions_list = self.find_expressions()
return {
expr['name']: {
'value': expr['value'],
'units': expr['units'],
'type': expr['type'],
'formula': None # Binary .prt files don't have formulas accessible
}
for expr in expressions_list
}
def update_expression(self, name: str, new_value: float) -> bool:
"""
Update a single expression value.

View File

@@ -0,0 +1,575 @@
"""
Optimization Setup Wizard - Phase 3.3
Interactive wizard that validates the complete optimization pipeline BEFORE running trials:
1. Introspect NX model for available expressions
2. Run baseline simulation to generate OP2
3. Introspect OP2 file to detect element types and available results
4. LLM-guided configuration based on actual model contents
5. Dry-run pipeline validation with baseline OP2
6. Report success/failure before starting optimization
This prevents wasted time running optimizations that will fail!
Author: Atomizer Development Team
Version: 0.1.0 (Phase 3.3)
Last Updated: 2025-01-16
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import logging
from dataclasses import dataclass
from optimization_engine.nx_updater import NXParameterUpdater
from optimization_engine.nx_solver import NXSolver
from optimization_engine.extractor_orchestrator import ExtractorOrchestrator
from optimization_engine.inline_code_generator import InlineCodeGenerator
from optimization_engine.plugins.hook_manager import HookManager
logger = logging.getLogger(__name__)
@dataclass
class ModelIntrospection:
"""Results from NX model introspection."""
expressions: Dict[str, Any] # {name: {'value': float, 'formula': str}}
prt_file: Path
sim_file: Path
@dataclass
class OP2Introspection:
"""Results from OP2 file introspection."""
element_types: List[str] # e.g., ['CHEXA', 'CPENTA', 'CTETRA']
result_types: List[str] # e.g., ['displacement', 'stress']
subcases: List[int] # e.g., [1]
node_count: int
element_count: int
op2_file: Path
@dataclass
class ValidationResult:
"""Result from pipeline validation."""
success: bool
component: str # 'extractor', 'calculation', 'hook', 'objective'
message: str
data: Optional[Dict[str, Any]] = None
class OptimizationSetupWizard:
"""
Interactive wizard for validating optimization setup before running trials.
This wizard prevents common mistakes by:
- Checking model expressions exist
- Validating OP2 file contains expected results
- Testing extractors on real data
- Confirming calculations work
- Verifying complete pipeline before optimization
"""
def __init__(self, prt_file: Path, sim_file: Path, output_dir: Optional[Path] = None):
"""
Initialize optimization setup wizard.
Args:
prt_file: Path to NX part file (.prt)
sim_file: Path to NX simulation file (.sim)
output_dir: Directory for validation outputs
"""
self.prt_file = Path(prt_file)
self.sim_file = Path(sim_file)
if output_dir is None:
output_dir = Path.cwd() / "optimization_validation"
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.model_info: Optional[ModelIntrospection] = None
self.op2_info: Optional[OP2Introspection] = None
self.baseline_op2: Optional[Path] = None
logger.info(f"OptimizationSetupWizard initialized")
logger.info(f" Part: {self.prt_file}")
logger.info(f" Sim: {self.sim_file}")
logger.info(f" Output: {self.output_dir}")
# =========================================================================
# STEP 1: Model Introspection
# =========================================================================
def introspect_model(self) -> ModelIntrospection:
"""
Introspect NX model to find available expressions.
Returns:
ModelIntrospection with all expressions found
"""
logger.info("=" * 80)
logger.info("STEP 1: Introspecting NX Model")
logger.info("=" * 80)
# Use NXParameterUpdater to read expressions
updater = NXParameterUpdater(prt_file_path=self.prt_file)
expressions = updater.get_all_expressions()
logger.info(f"Found {len(expressions)} expressions in model:")
for name, info in expressions.items():
logger.info(f" - {name}: {info.get('value')} ({info.get('formula', 'N/A')})")
self.model_info = ModelIntrospection(
expressions=expressions,
prt_file=self.prt_file,
sim_file=self.sim_file
)
return self.model_info
# =========================================================================
# STEP 2: Baseline Simulation
# =========================================================================
def run_baseline_simulation(self) -> Path:
"""
Run baseline simulation with current expression values.
This generates an OP2 file that we can introspect to see what
element types and results are actually present.
Returns:
Path to generated OP2 file
"""
logger.info("=" * 80)
logger.info("STEP 2: Running Baseline Simulation")
logger.info("=" * 80)
logger.info("This generates OP2 file for introspection...")
solver = NXSolver(nastran_version='2412', use_journal=True)
result = solver.run_simulation(self.sim_file)
self.baseline_op2 = result['op2_file']
logger.info(f"Baseline simulation complete!")
logger.info(f" OP2 file: {self.baseline_op2}")
return self.baseline_op2
# =========================================================================
# STEP 3: OP2 Introspection
# =========================================================================
def introspect_op2(self, op2_file: Optional[Path] = None) -> OP2Introspection:
"""
Introspect OP2 file to detect element types and available results.
Args:
op2_file: Path to OP2 file (uses baseline if not provided)
Returns:
OP2Introspection with detected contents
"""
logger.info("=" * 80)
logger.info("STEP 3: Introspecting OP2 File")
logger.info("=" * 80)
if op2_file is None:
op2_file = self.baseline_op2
if op2_file is None:
raise ValueError("No OP2 file available. Run baseline simulation first.")
# Use pyNastran to read OP2 and detect contents
from pyNastran.op2.op2 import OP2
model = OP2()
model.read_op2(str(op2_file))
# Detect element types with stress results
# In pyNastran, stress results are stored in model.op2_results.stress
element_types = []
# Dynamically discover ALL element types with stress data from pyNastran
# Instead of hardcoding, we introspect what pyNastran actually has!
if hasattr(model, 'op2_results') and hasattr(model.op2_results, 'stress'):
stress_obj = model.op2_results.stress
# Find all attributes ending with '_stress' that have data
for attr_name in dir(stress_obj):
if attr_name.endswith('_stress') and not attr_name.startswith('_'):
# Check if this element type has data
element_data = getattr(stress_obj, attr_name, None)
if element_data: # Has data
# Convert attribute name to element type
# e.g., 'chexa_stress' -> 'CHEXA'
element_type = attr_name.replace('_stress', '').upper()
# Handle special cases (composite elements)
if '_composite' not in attr_name:
element_types.append(element_type)
# Also check for forces (stored differently in pyNastran)
# Bar/beam forces are at model level, not in stress object
if hasattr(model, 'cbar_force') and model.cbar_force:
element_types.append('CBAR')
if hasattr(model, 'cbeam_force') and model.cbeam_force:
element_types.append('CBEAM')
if hasattr(model, 'crod_force') and model.crod_force:
element_types.append('CROD')
# Detect result types
result_types = []
if hasattr(model, 'displacements') and model.displacements:
result_types.append('displacement')
if element_types: # Has stress
result_types.append('stress')
if hasattr(model, 'cbar_force') and model.cbar_force:
result_types.append('force')
# Get subcases
subcases = []
if hasattr(model, 'displacements') and model.displacements:
subcases = list(model.displacements.keys())
# Get counts
node_count = len(model.nodes) if hasattr(model, 'nodes') else 0
element_count = len(model.elements) if hasattr(model, 'elements') else 0
logger.info(f"OP2 Introspection Results:")
logger.info(f" Element types with stress: {element_types}")
logger.info(f" Result types available: {result_types}")
logger.info(f" Subcases: {subcases}")
logger.info(f" Nodes: {node_count}")
logger.info(f" Elements: {element_count}")
self.op2_info = OP2Introspection(
element_types=element_types,
result_types=result_types,
subcases=subcases,
node_count=node_count,
element_count=element_count,
op2_file=op2_file
)
return self.op2_info
# =========================================================================
# STEP 4: LLM-Guided Configuration
# =========================================================================
def suggest_configuration(self, user_goal: str) -> Dict[str, Any]:
"""
Use LLM to suggest configuration based on user goal and available data.
This would analyze:
- User's natural language description
- Available expressions in model
- Available element types in OP2
- Available result types in OP2
And propose a concrete configuration.
Args:
user_goal: User's description of optimization goal
Returns:
Suggested configuration dict
"""
logger.info("=" * 80)
logger.info("STEP 4: LLM-Guided Configuration")
logger.info("=" * 80)
logger.info(f"User goal: {user_goal}")
# TODO: Implement LLM analysis
# For now, return a manual suggestion based on OP2 contents
if self.op2_info is None:
raise ValueError("OP2 not introspected. Run introspect_op2() first.")
# Suggest extractors based on available result types
engineering_features = []
if 'displacement' in self.op2_info.result_types:
engineering_features.append({
'action': 'extract_displacement',
'domain': 'result_extraction',
'description': 'Extract displacement results from OP2 file',
'params': {'result_type': 'displacement'}
})
if 'stress' in self.op2_info.result_types and self.op2_info.element_types:
# Use first available element type
element_type = self.op2_info.element_types[0].lower()
engineering_features.append({
'action': 'extract_solid_stress',
'domain': 'result_extraction',
'description': f'Extract stress from {element_type.upper()} elements',
'params': {
'result_type': 'stress',
'element_type': element_type
}
})
logger.info(f"Suggested configuration:")
logger.info(f" Engineering features: {len(engineering_features)}")
for feat in engineering_features:
logger.info(f" - {feat['action']}: {feat['description']}")
return {
'engineering_features': engineering_features,
'inline_calculations': [],
'post_processing_hooks': []
}
# =========================================================================
# STEP 5: Pipeline Validation (Dry Run)
# =========================================================================
def validate_pipeline(self, llm_workflow: Dict[str, Any]) -> List[ValidationResult]:
"""
Validate complete pipeline with baseline OP2 file.
This executes the entire extraction/calculation/hook pipeline
using the baseline OP2 to ensure everything works BEFORE
starting the optimization.
Args:
llm_workflow: Complete LLM workflow configuration
Returns:
List of ValidationResult objects
"""
logger.info("=" * 80)
logger.info("STEP 5: Pipeline Validation (Dry Run)")
logger.info("=" * 80)
if self.baseline_op2 is None:
raise ValueError("No baseline OP2 file. Run baseline simulation first.")
results = []
# Validate extractors
logger.info("\nValidating extractors...")
orchestrator = ExtractorOrchestrator(
extractors_dir=self.output_dir / "generated_extractors"
)
extractors = orchestrator.process_llm_workflow(llm_workflow)
extraction_results = {}
for extractor in extractors:
try:
# Pass extractor params (like element_type) to execution
result = orchestrator.execute_extractor(
extractor.name,
self.baseline_op2,
subcase=1,
**extractor.params # Pass params from workflow (element_type, etc.)
)
extraction_results.update(result)
results.append(ValidationResult(
success=True,
component='extractor',
message=f"[OK] {extractor.name}: {list(result.keys())}",
data=result
))
logger.info(f" [OK] {extractor.name}: {list(result.keys())}")
except Exception as e:
results.append(ValidationResult(
success=False,
component='extractor',
message=f"[FAIL] {extractor.name}: {str(e)}",
data=None
))
logger.error(f" [FAIL] {extractor.name}: {str(e)}")
# Validate inline calculations
logger.info("\nValidating inline calculations...")
inline_generator = InlineCodeGenerator()
calculations = {}
calc_namespace = {**extraction_results, **calculations}
for calc_spec in llm_workflow.get('inline_calculations', []):
try:
generated = inline_generator.generate_from_llm_output(calc_spec)
exec(generated.code, calc_namespace)
# Extract newly created variables
for key, value in calc_namespace.items():
if key not in extraction_results and not key.startswith('_'):
calculations[key] = value
results.append(ValidationResult(
success=True,
component='calculation',
message=f"[OK] {calc_spec.get('action', 'calculation')}: Created {list(calculations.keys())}",
data=calculations
))
logger.info(f" [OK] {calc_spec.get('action', 'calculation')}")
except Exception as e:
results.append(ValidationResult(
success=False,
component='calculation',
message=f"[FAIL] {calc_spec.get('action', 'calculation')}: {str(e)}",
data=None
))
logger.error(f" [FAIL] {calc_spec.get('action', 'calculation')}: {str(e)}")
# Validate hooks
logger.info("\nValidating hooks...")
hook_manager = HookManager()
# Load system hooks
system_hooks_dir = Path(__file__).parent / 'plugins'
if system_hooks_dir.exists():
hook_manager.load_plugins_from_directory(system_hooks_dir)
hook_results = hook_manager.execute_hooks('post_calculation', {
'trial_number': 0,
'design_variables': {},
'results': extraction_results,
'calculations': calculations
})
if hook_results:
results.append(ValidationResult(
success=True,
component='hook',
message=f"[OK] Hooks executed: {len(hook_results)} results",
data={'hook_results': hook_results}
))
logger.info(f" [OK] Executed {len(hook_results)} hook(s)")
# Check for objective
logger.info("\nValidating objective...")
objective = None
for hook_result in hook_results:
if hook_result and 'objective' in hook_result:
objective = hook_result['objective']
break
if objective is None:
# Try to find objective in calculations or results
for key in ['max_displacement', 'max_stress', 'max_von_mises']:
if key in {**extraction_results, **calculations}:
objective = {**extraction_results, **calculations}[key]
logger.warning(f" [WARNING] No explicit objective, using: {key}")
break
if objective is not None:
results.append(ValidationResult(
success=True,
component='objective',
message=f"[OK] Objective value: {objective}",
data={'objective': objective}
))
logger.info(f" [OK] Objective value: {objective}")
else:
results.append(ValidationResult(
success=False,
component='objective',
message="[FAIL] Could not determine objective value",
data=None
))
logger.error(" [FAIL] Could not determine objective value")
return results
# =========================================================================
# Complete Validation Workflow
# =========================================================================
def run_complete_validation(self, user_goal: str, llm_workflow: Optional[Dict[str, Any]] = None) -> Tuple[bool, List[ValidationResult]]:
"""
Run complete validation workflow from start to finish.
Steps:
1. Introspect model for expressions
2. Run baseline simulation
3. Introspect OP2 for contents
4. Suggest/validate configuration
5. Dry-run pipeline validation
Args:
user_goal: User's description of optimization goal
llm_workflow: Optional pre-configured workflow (otherwise suggested)
Returns:
Tuple of (success: bool, results: List[ValidationResult])
"""
logger.info("=" * 80)
logger.info("OPTIMIZATION SETUP WIZARD - COMPLETE VALIDATION")
logger.info("=" * 80)
# Step 1: Introspect model
self.introspect_model()
# Step 2: Run baseline
self.run_baseline_simulation()
# Step 3: Introspect OP2
self.introspect_op2()
# Step 4: Get configuration
if llm_workflow is None:
llm_workflow = self.suggest_configuration(user_goal)
# Step 5: Validate pipeline
validation_results = self.validate_pipeline(llm_workflow)
# Check if all validations passed
all_passed = all(r.success for r in validation_results)
logger.info("=" * 80)
logger.info("VALIDATION SUMMARY")
logger.info("=" * 80)
for result in validation_results:
logger.info(result.message)
if all_passed:
logger.info("\n[OK] ALL VALIDATIONS PASSED - Ready for optimization!")
else:
logger.error("\n[FAIL] VALIDATION FAILED - Fix issues before optimization")
return all_passed, validation_results
def main():
"""Test optimization setup wizard."""
import sys
print("=" * 80)
print("Phase 3.3: Optimization Setup Wizard Test")
print("=" * 80)
print()
# Configuration
prt_file = Path("tests/Bracket.prt")
sim_file = Path("tests/Bracket_sim1.sim")
if not prt_file.exists() or not sim_file.exists():
print("ERROR: Test files not found")
sys.exit(1)
# Initialize wizard
wizard = OptimizationSetupWizard(prt_file, sim_file)
# Run complete validation
user_goal = "Maximize displacement while keeping stress below yield/4"
success, results = wizard.run_complete_validation(user_goal)
if success:
print("\n[OK] Pipeline validated! Ready to start optimization.")
else:
print("\n[FAIL] Validation failed. Review errors above.")
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,90 @@
"""
Safety Factor Constraint Hook - Manual Implementation
This hook enforces a minimum safety factor constraint on stress.
If safety_factor < minimum required, the objective is heavily penalized.
Safety Factor = Yield Strength / Max Stress
For Aluminum 6061-T6:
- Yield Strength: 276 MPa
- Required Safety Factor: 4.0
- Allowable Stress: 69 MPa
Author: Atomizer Development Team
Version: 1.0
"""
from optimization_engine.plugins.hooks import Hook, HookPoint
def safety_factor_constraint_hook(context: dict) -> dict:
"""
Enforce safety factor constraint on optimization.
This hook checks if the calculated safety factor meets the minimum requirement.
If violated, it adds a large penalty to the objective to guide optimization
away from unsafe designs.
Args:
context: Dict containing:
- calculations: Dict with 'safety_factor' value
- results: Dict with stress results
Returns:
Dict with:
- safety_factor_satisfied: bool
- safety_factor_violation: float (0 if satisfied, penalty otherwise)
- constrained_objective: float (original or penalized objective)
"""
calculations = context.get('calculations', {})
# Get safety factor from calculations
safety_factor = calculations.get('safety_factor', 0.0)
# Get objective (negative displacement to maximize)
neg_displacement = calculations.get('neg_displacement', 0.0)
# Required minimum safety factor
min_safety_factor = 4.0
# Check constraint
satisfied = safety_factor >= min_safety_factor
# Calculate violation (how much we're under the limit)
violation = max(0.0, min_safety_factor - safety_factor)
# Apply penalty if constraint violated
if not satisfied:
# Heavy penalty: add large value to objective (we're minimizing)
# Penalty scales with violation severity
penalty = 1000.0 * violation
constrained_objective = neg_displacement + penalty
print(f" [CONSTRAINT VIOLATED] Safety factor {safety_factor:.2f} < {min_safety_factor}")
print(f" [PENALTY APPLIED] Adding {penalty:.2f} to objective")
else:
constrained_objective = neg_displacement
print(f" [CONSTRAINT SATISFIED] Safety factor {safety_factor:.2f} >= {min_safety_factor}")
return {
'safety_factor_satisfied': satisfied,
'safety_factor_violation': violation,
'constrained_objective': constrained_objective,
'objective': constrained_objective # This becomes the final objective
}
# Register hook with plugin system
hook = Hook(
name="safety_factor_constraint",
hook_point=HookPoint.POST_CALCULATION,
function=safety_factor_constraint_hook,
enabled=True,
description="Enforce minimum safety factor constraint with penalty"
)
def register_hooks(hook_manager):
"""Register hooks with the plugin system."""
return [hook]

View File

@@ -118,15 +118,21 @@ class PyNastranResearchAgent:
model.read_op2(str(op2_file))
# Get stress object for element type
# In pyNastran, stress is stored in model.op2_results.stress
stress_attr = f"{element_type}_stress"
if not hasattr(model, stress_attr):
if not hasattr(model, 'op2_results') or not hasattr(model.op2_results, 'stress'):
raise ValueError(f"No stress results in OP2")
stress_obj = model.op2_results.stress
if not hasattr(stress_obj, stress_attr):
raise ValueError(f"No {element_type} stress results in OP2")
stress = getattr(model, stress_attr)[subcase]
stress = getattr(stress_obj, stress_attr)[subcase]
itime = 0
# Extract von Mises if available
if stress.is_von_mises():
if stress.is_von_mises: # Property, not method
von_mises = stress.data[itime, :, 9] # Column 9 is von Mises
max_stress = float(np.max(von_mises))