- Add validation framework (config, model, results, study validators) - Add Claude Code skills (create-study, run-optimization, generate-report, troubleshoot, analyze-model) - Add Atomizer Dashboard (React frontend + FastAPI backend) - Reorganize docs into structured directories (00-09) - Add neural surrogate modules and training infrastructure - Add multi-objective optimization support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
915 lines
32 KiB
Python
915 lines
32 KiB
Python
"""
|
|
Hybrid Mode Study Creator - Complete Automation
|
|
|
|
This module provides COMPLETE automation for creating optimization studies:
|
|
1. Creates proper study structure (1_setup, 2_substudies, 3_reports)
|
|
2. Runs benchmarking to validate simulation setup
|
|
3. Auto-generates runner from workflow JSON
|
|
4. Provides progress monitoring
|
|
|
|
No user intervention required after workflow JSON is created.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, List
|
|
import json
|
|
import shutil
|
|
from datetime import datetime
|
|
|
|
|
|
class HybridStudyCreator:
|
|
"""
|
|
Complete automation for Hybrid Mode study creation.
|
|
|
|
Usage:
|
|
creator = HybridStudyCreator()
|
|
study = creator.create_from_workflow(
|
|
workflow_json_path="path/to/workflow.json",
|
|
model_files={"prt": "path.prt", "sim": "path.sim", "fem": "path.fem"},
|
|
study_name="my_optimization"
|
|
)
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.project_root = Path(__file__).parent.parent
|
|
|
|
def create_from_workflow(
|
|
self,
|
|
workflow_json_path: Path,
|
|
model_files: Dict[str, Path],
|
|
study_name: str,
|
|
output_parent: Optional[Path] = None
|
|
) -> Path:
|
|
"""
|
|
Create complete study from workflow JSON with full automation.
|
|
|
|
Args:
|
|
workflow_json_path: Path to workflow JSON config
|
|
model_files: Dict with keys 'prt', 'sim', 'fem' (and optionally 'fem_i')
|
|
study_name: Name for the study
|
|
output_parent: Parent directory for studies (default: project_root/studies)
|
|
|
|
Returns:
|
|
Path to created study directory
|
|
"""
|
|
print("="*80)
|
|
print(" HYBRID MODE - AUTOMATED STUDY CREATION")
|
|
print("="*80)
|
|
print()
|
|
|
|
# Step 1: Create study structure
|
|
print("[1/5] Creating study structure...")
|
|
study_dir = self._create_study_structure(study_name, output_parent)
|
|
print(f" [OK] Study directory: {study_dir.name}")
|
|
print()
|
|
|
|
# Step 2: Copy files
|
|
print("[2/5] Copying model files...")
|
|
self._copy_model_files(model_files, study_dir / "1_setup/model")
|
|
print(f" [OK] Copied {len(model_files)} files")
|
|
print()
|
|
|
|
# Step 3: Copy workflow JSON
|
|
print("[3/5] Installing workflow configuration...")
|
|
workflow_dest = study_dir / "1_setup/workflow_config.json"
|
|
shutil.copy2(workflow_json_path, workflow_dest)
|
|
with open(workflow_dest) as f:
|
|
workflow = json.load(f)
|
|
print(f" [OK] Workflow: {workflow.get('study_name', 'unnamed')}")
|
|
print(f" [OK] Variables: {len(workflow.get('design_variables', []))}")
|
|
print(f" [OK] Objectives: {len(workflow.get('objectives', []))}")
|
|
print()
|
|
|
|
# Step 4: Run benchmarking
|
|
print("[4/5] Running benchmarking (validating simulation setup)...")
|
|
benchmark_results = self._run_benchmarking(
|
|
study_dir / "1_setup/model" / model_files['prt'].name,
|
|
study_dir / "1_setup/model" / model_files['sim'].name,
|
|
workflow
|
|
)
|
|
|
|
if not benchmark_results['success']:
|
|
raise RuntimeError(f"Benchmarking failed: {benchmark_results['error']}")
|
|
|
|
print(f" [OK] Simulation validated")
|
|
print(f" [OK] Extracted {benchmark_results['n_results']} results")
|
|
print()
|
|
|
|
# Step 4.5: Generate configuration report
|
|
print("[4.5/5] Generating configuration report...")
|
|
self._generate_configuration_report(study_dir, workflow, benchmark_results)
|
|
print(f" [OK] Configuration report: 1_setup/CONFIGURATION_REPORT.md")
|
|
print()
|
|
|
|
# Step 5: Generate runner
|
|
print("[5/5] Generating optimization runner...")
|
|
runner_path = self._generate_runner(study_dir, workflow, benchmark_results)
|
|
print(f" [OK] Runner: {runner_path.name}")
|
|
print()
|
|
|
|
# Create README
|
|
self._create_readme(study_dir, workflow, benchmark_results)
|
|
|
|
print("="*80)
|
|
print(" STUDY CREATION COMPLETE")
|
|
print("="*80)
|
|
print()
|
|
print(f"Study location: {study_dir}")
|
|
print()
|
|
print("To run optimization:")
|
|
print(f" python {runner_path.relative_to(self.project_root)}")
|
|
print()
|
|
|
|
return study_dir
|
|
|
|
def _create_study_structure(self, study_name: str, output_parent: Optional[Path]) -> Path:
|
|
"""Create proper study folder structure."""
|
|
if output_parent is None:
|
|
output_parent = self.project_root / "studies"
|
|
|
|
study_dir = output_parent / study_name
|
|
|
|
# Create structure
|
|
(study_dir / "1_setup/model").mkdir(parents=True, exist_ok=True)
|
|
(study_dir / "2_results").mkdir(parents=True, exist_ok=True)
|
|
(study_dir / "3_reports").mkdir(parents=True, exist_ok=True)
|
|
|
|
return study_dir
|
|
|
|
def _copy_model_files(self, model_files: Dict[str, Path], dest_dir: Path):
|
|
"""Copy model files to study."""
|
|
for file_type, file_path in model_files.items():
|
|
if file_path and file_path.exists():
|
|
shutil.copy2(file_path, dest_dir / file_path.name)
|
|
|
|
def _run_benchmarking(
|
|
self,
|
|
prt_file: Path,
|
|
sim_file: Path,
|
|
workflow: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run INTELLIGENT benchmarking to validate simulation setup.
|
|
|
|
This uses IntelligentSetup to:
|
|
1. Solve ALL solutions in the .sim file
|
|
2. Discover all available results
|
|
3. Match objectives to results automatically
|
|
4. Select optimal solution for optimization
|
|
|
|
Returns dict with:
|
|
- success: bool
|
|
- n_results: int (number of results extracted)
|
|
- results: dict (extracted values)
|
|
- solution_name: str (optimal solution to use for optimization)
|
|
- error: str (if failed)
|
|
"""
|
|
from optimization_engine.intelligent_setup import IntelligentSetup
|
|
|
|
try:
|
|
print(" Running INTELLIGENT benchmarking...")
|
|
print(" - Solving ALL solutions in .sim file")
|
|
print(" - Discovering all available results")
|
|
print(" - Matching objectives to results")
|
|
print()
|
|
|
|
# Run intelligent benchmarking
|
|
intelligent = IntelligentSetup()
|
|
benchmark_data = intelligent.run_complete_benchmarking(
|
|
prt_file, sim_file, workflow
|
|
)
|
|
|
|
if not benchmark_data['success']:
|
|
return {
|
|
'success': False,
|
|
'error': f"Intelligent benchmarking failed: {benchmark_data.get('error', 'Unknown')}"
|
|
}
|
|
|
|
# Display discovered information
|
|
print(f" [OK] Expressions found: {len(benchmark_data.get('expressions', {}))}")
|
|
print(f" [OK] Solutions found: {len(benchmark_data.get('solutions', {}))}")
|
|
print(f" [OK] Results discovered: {len(benchmark_data.get('available_results', {}))}")
|
|
|
|
# Display objective mapping
|
|
obj_mapping = benchmark_data.get('objective_mapping', {})
|
|
if 'objectives' in obj_mapping:
|
|
print(f" [OK] Objectives matched: {len(obj_mapping['objectives'])}")
|
|
for obj_name, obj_info in obj_mapping['objectives'].items():
|
|
solution = obj_info.get('solution', 'Unknown')
|
|
result_type = obj_info.get('result_type', 'Unknown')
|
|
confidence = obj_info.get('match_confidence', 'Unknown')
|
|
print(f" - {obj_name}: {result_type} from '{solution}' ({confidence} confidence)")
|
|
|
|
# Get recommended solution
|
|
recommended_solution = obj_mapping.get('primary_solution')
|
|
if recommended_solution:
|
|
print(f" [OK] Recommended solution: {recommended_solution}")
|
|
|
|
# Extract baseline values
|
|
extracted = {}
|
|
for obj in workflow.get('objectives', []):
|
|
extraction = obj.get('extraction', {})
|
|
action = extraction.get('action', '')
|
|
|
|
if 'frequency' in action.lower() or 'eigenvalue' in action.lower():
|
|
# Extract eigenvalues from discovered results
|
|
available_results = benchmark_data.get('available_results', {})
|
|
if 'eigenvalues' in available_results:
|
|
# Get op2 file from eigenvalues result
|
|
eig_result = available_results['eigenvalues']
|
|
op2_file = Path(eig_result['op2_path'])
|
|
freq = self._extract_frequency(op2_file, mode_number=1)
|
|
extracted['first_frequency'] = freq
|
|
print(f" Baseline first frequency: {freq:.4f} Hz")
|
|
|
|
elif 'displacement' in action.lower():
|
|
# Extract displacement from discovered results
|
|
available_results = benchmark_data.get('available_results', {})
|
|
if 'displacements' in available_results:
|
|
disp_result = available_results['displacements']
|
|
op2_file = Path(disp_result['op2_path'])
|
|
disp = self._extract_displacement(op2_file)
|
|
extracted['max_displacement'] = disp
|
|
print(f" Baseline max displacement: {disp:.6f} mm")
|
|
|
|
elif 'stress' in action.lower():
|
|
# Extract stress from discovered results
|
|
available_results = benchmark_data.get('available_results', {})
|
|
if 'stresses' in available_results:
|
|
stress_result = available_results['stresses']
|
|
op2_file = Path(stress_result['op2_path'])
|
|
stress = self._extract_stress(op2_file)
|
|
extracted['max_stress'] = stress
|
|
print(f" Baseline max stress: {stress:.2f} MPa")
|
|
|
|
return {
|
|
'success': True,
|
|
'n_results': len(extracted),
|
|
'results': extracted,
|
|
'solution_name': recommended_solution,
|
|
'benchmark_data': benchmark_data # Include full benchmarking data
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def _extract_frequency(self, op2_file: Path, mode_number: int = 1) -> float:
|
|
"""Extract eigenfrequency from OP2."""
|
|
from pyNastran.op2.op2 import OP2
|
|
import numpy as np
|
|
|
|
model = OP2()
|
|
model.read_op2(str(op2_file))
|
|
|
|
if not hasattr(model, 'eigenvalues') or len(model.eigenvalues) == 0:
|
|
raise ValueError("No eigenvalues found in OP2 file")
|
|
|
|
subcase = list(model.eigenvalues.keys())[0]
|
|
eig_obj = model.eigenvalues[subcase]
|
|
|
|
eigenvalue = eig_obj.eigenvalues[mode_number - 1]
|
|
angular_freq = np.sqrt(eigenvalue)
|
|
frequency_hz = angular_freq / (2 * np.pi)
|
|
|
|
return float(frequency_hz)
|
|
|
|
def _extract_displacement(self, op2_file: Path) -> float:
|
|
"""Extract max displacement from OP2."""
|
|
from pyNastran.op2.op2 import OP2
|
|
import numpy as np
|
|
|
|
model = OP2()
|
|
model.read_op2(str(op2_file))
|
|
|
|
if hasattr(model, 'displacements') and len(model.displacements) > 0:
|
|
subcase = list(model.displacements.keys())[0]
|
|
disp_obj = model.displacements[subcase]
|
|
translations = disp_obj.data[0, :, :3] # [time, node, tx/ty/tz]
|
|
magnitudes = np.linalg.norm(translations, axis=1)
|
|
return float(np.max(magnitudes))
|
|
|
|
raise ValueError("No displacements found in OP2 file")
|
|
|
|
def _extract_stress(self, op2_file: Path) -> float:
|
|
"""Extract max von Mises stress from OP2."""
|
|
from pyNastran.op2.op2 import OP2
|
|
import numpy as np
|
|
|
|
model = OP2()
|
|
model.read_op2(str(op2_file))
|
|
|
|
# Try different stress result locations
|
|
if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0:
|
|
subcase = list(model.cquad4_stress.keys())[0]
|
|
stress_obj = model.cquad4_stress[subcase]
|
|
von_mises = stress_obj.data[0, :, 7] # von Mises typically at index 7
|
|
return float(np.max(von_mises))
|
|
|
|
raise ValueError("No stress results found in OP2 file")
|
|
|
|
def _generate_runner(
|
|
self,
|
|
study_dir: Path,
|
|
workflow: Dict[str, Any],
|
|
benchmark_results: Dict[str, Any]
|
|
) -> Path:
|
|
"""Generate optimization runner script."""
|
|
runner_path = study_dir / "run_optimization.py"
|
|
|
|
# Detect result types from workflow
|
|
extracts_frequency = any(
|
|
'frequency' in obj.get('extraction', {}).get('action', '').lower()
|
|
for obj in workflow.get('objectives', [])
|
|
)
|
|
|
|
# Generate extractor function based on workflow
|
|
extractor_code = self._generate_extractor_code(workflow)
|
|
|
|
runner_code = f'''"""
|
|
Auto-generated optimization runner
|
|
Created: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
import json
|
|
import optuna
|
|
from optimization_engine.nx_updater import NXParameterUpdater
|
|
from optimization_engine.nx_solver import NXSolver
|
|
|
|
|
|
{extractor_code}
|
|
|
|
|
|
def main():
|
|
print("="*80)
|
|
print(" {workflow.get('study_name', 'OPTIMIZATION').upper()}")
|
|
print("="*80)
|
|
print()
|
|
|
|
# Load workflow
|
|
config_file = Path(__file__).parent / "1_setup/workflow_config.json"
|
|
with open(config_file) as f:
|
|
workflow = json.load(f)
|
|
|
|
print("Workflow loaded:")
|
|
print(f" Request: {workflow.get('optimization_request', 'N/A')}")
|
|
print(f" Variables: {len(workflow.get('design_variables', []))}")
|
|
print()
|
|
|
|
# Setup paths
|
|
prt_file = Path(__file__).parent / "1_setup/model" / [f for f in (Path(__file__).parent / "1_setup/model").glob("*.prt")][0].name
|
|
sim_file = Path(__file__).parent / "1_setup/model" / [f for f in (Path(__file__).parent / "1_setup/model").glob("*.sim")][0].name
|
|
output_dir = Path(__file__).parent / "2_results"
|
|
reports_dir = Path(__file__).parent / "3_reports"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
reports_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize
|
|
updater = NXParameterUpdater(prt_file)
|
|
solver = NXSolver()
|
|
|
|
# Create Optuna study
|
|
study_name = "{workflow.get('study_name', 'optimization')}"
|
|
storage = f"sqlite:///{{output_dir / 'study.db'}}"
|
|
study = optuna.create_study(
|
|
study_name=study_name,
|
|
storage=storage,
|
|
load_if_exists=True,
|
|
direction="minimize"
|
|
)
|
|
|
|
# Initialize incremental history
|
|
history_file = output_dir / 'optimization_history_incremental.json'
|
|
history = []
|
|
if history_file.exists():
|
|
with open(history_file) as f:
|
|
history = json.load(f)
|
|
|
|
def objective(trial):
|
|
# Sample design variables
|
|
params = {{}}
|
|
for var in workflow['design_variables']:
|
|
name = var['parameter']
|
|
bounds = var['bounds']
|
|
params[name] = trial.suggest_float(name, bounds[0], bounds[1])
|
|
|
|
print(f"\\nTrial {{trial.number}}:")
|
|
for name, value in params.items():
|
|
print(f" {{name}} = {{value:.2f}}")
|
|
|
|
# Update model
|
|
updater.update_expressions(params)
|
|
|
|
# Run simulation with the optimal solution
|
|
result = solver.run_simulation(sim_file, solution_name="{benchmark_results.get('solution_name')}")
|
|
if not result['success']:
|
|
raise RuntimeError(f"Simulation failed: {{result.get('errors', 'Unknown')}}")
|
|
op2_file = result['op2_file']
|
|
|
|
# Extract results and calculate objective
|
|
results = extract_results(op2_file, workflow)
|
|
|
|
# Print results
|
|
for name, value in results.items():
|
|
print(f" {{name}} = {{value:.4f}}")
|
|
|
|
# Calculate objective (from first objective in workflow)
|
|
obj_config = workflow['objectives'][0]
|
|
result_name = list(results.keys())[0]
|
|
|
|
# For target-matching objectives, compute error from target
|
|
if 'target_frequency' in obj_config.get('extraction', {{}}).get('params', {{}}):
|
|
target = obj_config['extraction']['params']['target_frequency']
|
|
objective_value = abs(results[result_name] - target)
|
|
print(f" Frequency: {{results[result_name]:.4f}} Hz, Target: {{target}} Hz, Error: {{objective_value:.4f}} Hz")
|
|
elif obj_config['goal'] == 'minimize':
|
|
objective_value = results[result_name]
|
|
else:
|
|
objective_value = -results[result_name]
|
|
|
|
print(f" Objective = {{objective_value:.4f}}")
|
|
|
|
# Save to incremental history
|
|
trial_record = {{
|
|
'trial_number': trial.number,
|
|
'design_variables': params,
|
|
'results': results,
|
|
'objective': objective_value
|
|
}}
|
|
history.append(trial_record)
|
|
with open(history_file, 'w') as f:
|
|
json.dump(history, f, indent=2)
|
|
|
|
return objective_value
|
|
|
|
# Run optimization
|
|
n_trials = 10
|
|
print(f"\\nRunning {{n_trials}} trials...")
|
|
print("="*80)
|
|
print()
|
|
|
|
study.optimize(objective, n_trials=n_trials)
|
|
|
|
# Results
|
|
print()
|
|
print("="*80)
|
|
print(" OPTIMIZATION COMPLETE")
|
|
print("="*80)
|
|
print()
|
|
print(f"Best trial: #{{study.best_trial.number}}")
|
|
for name, value in study.best_params.items():
|
|
print(f" {{name}} = {{value:.2f}}")
|
|
print(f"\\nBest objective = {{study.best_value:.4f}}")
|
|
print()
|
|
|
|
# Generate human-readable markdown report with graphs
|
|
print("Generating optimization report...")
|
|
from optimization_engine.generate_report_markdown import generate_markdown_report
|
|
|
|
# Extract target frequency from workflow objectives
|
|
target_value = None
|
|
tolerance = 0.1
|
|
for obj in workflow.get('objectives', []):
|
|
if 'target_frequency' in obj.get('extraction', {{}}).get('params', {{}}):
|
|
target_value = obj['extraction']['params']['target_frequency']
|
|
break
|
|
|
|
# Generate markdown report with graphs
|
|
report = generate_markdown_report(history_file, target_value=target_value, tolerance=tolerance)
|
|
report_file = reports_dir / 'OPTIMIZATION_REPORT.md'
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
f.write(report)
|
|
|
|
print(f"✓ Markdown report with graphs saved to: {{report_file}}")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
'''
|
|
|
|
with open(runner_path, 'w', encoding='utf-8') as f:
|
|
f.write(runner_code)
|
|
|
|
return runner_path
|
|
|
|
def _generate_extractor_code(self, workflow: Dict[str, Any]) -> str:
|
|
"""Generate extractor function based on workflow objectives."""
|
|
|
|
# Detect what needs to be extracted
|
|
needs_frequency = False
|
|
needs_displacement = False
|
|
needs_stress = False
|
|
|
|
for obj in workflow.get('objectives', []):
|
|
action = obj.get('extraction', {}).get('action', '').lower()
|
|
if 'frequency' in action or 'eigenvalue' in action:
|
|
needs_frequency = True
|
|
elif 'displacement' in action:
|
|
needs_displacement = True
|
|
elif 'stress' in action:
|
|
needs_stress = True
|
|
|
|
code = '''
|
|
def extract_results(op2_file, workflow):
|
|
"""Extract results from OP2 file based on workflow objectives."""
|
|
from pyNastran.op2.op2 import OP2
|
|
import numpy as np
|
|
|
|
model = OP2()
|
|
model.read_op2(str(op2_file))
|
|
|
|
results = {}
|
|
'''
|
|
|
|
if needs_frequency:
|
|
code += '''
|
|
# Extract first frequency
|
|
if hasattr(model, 'eigenvalues') and len(model.eigenvalues) > 0:
|
|
subcase = list(model.eigenvalues.keys())[0]
|
|
eig_obj = model.eigenvalues[subcase]
|
|
eigenvalue = eig_obj.eigenvalues[0]
|
|
angular_freq = np.sqrt(eigenvalue)
|
|
frequency_hz = angular_freq / (2 * np.pi)
|
|
results['first_frequency'] = float(frequency_hz)
|
|
else:
|
|
raise ValueError("No eigenvalues found in OP2 file")
|
|
'''
|
|
|
|
if needs_displacement:
|
|
code += '''
|
|
# Extract max displacement
|
|
if hasattr(model, 'displacements') and len(model.displacements) > 0:
|
|
subcase = list(model.displacements.keys())[0]
|
|
disp_obj = model.displacements[subcase]
|
|
translations = disp_obj.data[0, :, :3]
|
|
magnitudes = np.linalg.norm(translations, axis=1)
|
|
results['max_displacement'] = float(np.max(magnitudes))
|
|
'''
|
|
|
|
if needs_stress:
|
|
code += '''
|
|
# Extract max stress
|
|
if hasattr(model, 'cquad4_stress') and len(model.cquad4_stress) > 0:
|
|
subcase = list(model.cquad4_stress.keys())[0]
|
|
stress_obj = model.cquad4_stress[subcase]
|
|
von_mises = stress_obj.data[0, :, 7]
|
|
results['max_stress'] = float(np.max(von_mises))
|
|
'''
|
|
|
|
code += '''
|
|
return results
|
|
'''
|
|
|
|
return code
|
|
|
|
def _generate_configuration_report(
|
|
self,
|
|
study_dir: Path,
|
|
workflow: Dict[str, Any],
|
|
benchmark_results: Dict[str, Any]
|
|
):
|
|
"""
|
|
Generate a comprehensive configuration report with ALL setup details.
|
|
|
|
This creates 1_setup/CONFIGURATION_REPORT.md with:
|
|
- User's optimization request
|
|
- All discovered expressions
|
|
- All discovered solutions
|
|
- All available result types
|
|
- Objective matching details
|
|
- Baseline values
|
|
- Warnings and issues
|
|
"""
|
|
report_path = study_dir / "1_setup" / "CONFIGURATION_REPORT.md"
|
|
|
|
# Get benchmark data
|
|
benchmark_data = benchmark_results.get('benchmark_data', {})
|
|
expressions = benchmark_data.get('expressions', {})
|
|
solutions = benchmark_data.get('solutions', {})
|
|
available_results = benchmark_data.get('available_results', {})
|
|
obj_mapping = benchmark_data.get('objective_mapping', {})
|
|
|
|
# Build expressions section
|
|
expressions_md = "## Model Expressions\n\n"
|
|
if expressions:
|
|
expressions_md += f"**Total expressions found: {len(expressions)}**\n\n"
|
|
expressions_md += "| Expression Name | Current Value | Units | Formula |\n"
|
|
expressions_md += "|----------------|---------------|-------|----------|\n"
|
|
for name, info in sorted(expressions.items()):
|
|
value = info.get('value', 'N/A')
|
|
units = info.get('units', '')
|
|
formula = info.get('formula', '')
|
|
expressions_md += f"| {name} | {value} | {units} | {formula} |\n"
|
|
else:
|
|
expressions_md += "*No expressions found in model*\n"
|
|
|
|
# Build solutions section
|
|
solutions_md = "## Simulation Solutions\n\n"
|
|
if solutions:
|
|
# Handle both old format (solution_names list) and new format (dict)
|
|
if isinstance(solutions, dict):
|
|
if 'solution_names' in solutions:
|
|
# Old format: just solution names
|
|
solution_names = solutions.get('solution_names', [])
|
|
num_solved = solutions.get('num_solved', 0)
|
|
num_failed = solutions.get('num_failed', 0)
|
|
num_skipped = solutions.get('num_skipped', 0)
|
|
|
|
solutions_md += f"**Solutions discovered**: {len(solution_names)}\n"
|
|
solutions_md += f"**Solved**: {num_solved} | **Failed**: {num_failed} | **Skipped**: {num_skipped}\n\n"
|
|
|
|
if solution_names:
|
|
for sol_name in solution_names:
|
|
solutions_md += f"- {sol_name}\n"
|
|
else:
|
|
solutions_md += "*No solution names retrieved*\n"
|
|
else:
|
|
# New format: dict of solution details
|
|
solutions_md += f"**Total solutions found: {len(solutions)}**\n\n"
|
|
for sol_name, sol_info in solutions.items():
|
|
solutions_md += f"### {sol_name}\n\n"
|
|
solutions_md += f"- **Type**: {sol_info.get('type', 'Unknown')}\n"
|
|
solutions_md += f"- **OP2 File**: `{sol_info.get('op2_path', 'N/A')}`\n\n"
|
|
else:
|
|
solutions_md += "*No solutions discovered - check if benchmarking solved all solutions*\n"
|
|
|
|
# Build available results section
|
|
results_md = "## Available Results\n\n"
|
|
if available_results:
|
|
results_md += f"**Total result types discovered: {len(available_results)}**\n\n"
|
|
for result_type, result_info in available_results.items():
|
|
results_md += f"### {result_type}\n\n"
|
|
results_md += f"- **Solution**: {result_info.get('solution', 'Unknown')}\n"
|
|
results_md += f"- **OP2 File**: `{result_info.get('op2_path', 'N/A')}`\n"
|
|
if 'sample_value' in result_info:
|
|
results_md += f"- **Sample Value**: {result_info['sample_value']}\n"
|
|
results_md += "\n"
|
|
else:
|
|
results_md += "*No results discovered - check if simulations solved successfully*\n"
|
|
|
|
# Build objective matching section
|
|
matching_md = "## Objective Matching\n\n"
|
|
if 'objectives' in obj_mapping and obj_mapping['objectives']:
|
|
matching_md += f"**Objectives matched: {len(obj_mapping['objectives'])}**\n\n"
|
|
for obj_name, obj_info in obj_mapping['objectives'].items():
|
|
solution = obj_info.get('solution', 'NONE')
|
|
result_type = obj_info.get('result_type', 'Unknown')
|
|
confidence = obj_info.get('match_confidence', 'Unknown')
|
|
extractor = obj_info.get('extractor', 'Unknown')
|
|
op2_file = obj_info.get('op2_file', 'N/A')
|
|
error = obj_info.get('error', None)
|
|
|
|
matching_md += f"### {obj_name}\n\n"
|
|
matching_md += f"- **Result Type**: {result_type}\n"
|
|
matching_md += f"- **Solution**: {solution}\n"
|
|
matching_md += f"- **Confidence**: {confidence}\n"
|
|
matching_md += f"- **Extractor**: `{extractor}`\n"
|
|
matching_md += f"- **OP2 File**: `{op2_file}`\n"
|
|
|
|
if error:
|
|
matching_md += f"- **⚠️ ERROR**: {error}\n"
|
|
|
|
matching_md += "\n"
|
|
|
|
# Add primary solution
|
|
primary_solution = obj_mapping.get('primary_solution')
|
|
if primary_solution:
|
|
matching_md += f"**Primary Solution Selected**: `{primary_solution}`\n\n"
|
|
matching_md += "This solution will be used for optimization.\n\n"
|
|
else:
|
|
matching_md += "*No objectives matched - check workflow configuration*\n"
|
|
|
|
# Build baseline values section
|
|
baseline_md = "## Baseline Values\n\n"
|
|
baseline_results = benchmark_results.get('results', {})
|
|
if baseline_results:
|
|
baseline_md += "Values extracted from the initial (unoptimized) model:\n\n"
|
|
for key, value in baseline_results.items():
|
|
baseline_md += f"- **{key}**: {value}\n"
|
|
else:
|
|
baseline_md += "*No baseline values extracted*\n"
|
|
|
|
# Build warnings section
|
|
warnings_md = "## Warnings and Issues\n\n"
|
|
warnings = []
|
|
|
|
# Check for missing eigenvalues
|
|
for obj_name, obj_info in obj_mapping.get('objectives', {}).items():
|
|
if obj_info.get('error'):
|
|
warnings.append(f"- ⚠️ **{obj_name}**: {obj_info['error']}")
|
|
|
|
# Check for no solutions
|
|
if not solutions:
|
|
warnings.append("- ⚠️ **No solutions discovered**: Benchmarking may not have solved all solutions")
|
|
|
|
# Check for no results
|
|
if not available_results:
|
|
warnings.append("- ⚠️ **No results available**: Check if simulations ran successfully")
|
|
|
|
if warnings:
|
|
warnings_md += "\n".join(warnings) + "\n"
|
|
else:
|
|
warnings_md += "✅ No issues detected!\n"
|
|
|
|
# Build full report
|
|
content = f'''# Configuration Report
|
|
|
|
**Study**: {workflow.get('study_name', study_dir.name)}
|
|
**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
|
|
---
|
|
|
|
## Optimization Request
|
|
|
|
**User's Goal**:
|
|
|
|
> {workflow.get('optimization_request', '*No description provided*')}
|
|
|
|
**Design Variables**: {len(workflow.get('design_variables', []))}
|
|
|
|
| Variable | Min | Max |
|
|
|----------|-----|-----|
|
|
'''
|
|
|
|
for var in workflow.get('design_variables', []):
|
|
param = var.get('parameter', 'Unknown')
|
|
bounds = var.get('bounds', [0, 0])
|
|
content += f"| {param} | {bounds[0]} | {bounds[1]} |\n"
|
|
|
|
content += f'''
|
|
|
|
**Objectives**: {len(workflow.get('objectives', []))}
|
|
|
|
| Objective | Goal |
|
|
|-----------|------|
|
|
'''
|
|
|
|
for obj in workflow.get('objectives', []):
|
|
obj_name = obj.get('name', 'Unknown')
|
|
goal = obj.get('goal', 'Unknown')
|
|
content += f"| {obj_name} | {goal} |\n"
|
|
|
|
content += f'''
|
|
|
|
---
|
|
|
|
{expressions_md}
|
|
|
|
---
|
|
|
|
{solutions_md}
|
|
|
|
---
|
|
|
|
{results_md}
|
|
|
|
---
|
|
|
|
{matching_md}
|
|
|
|
---
|
|
|
|
{baseline_md}
|
|
|
|
---
|
|
|
|
{warnings_md}
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
1. ✅ Study structure created
|
|
2. ✅ Benchmarking complete
|
|
3. ✅ Configuration validated
|
|
4. ➡️ **Run optimization**: `python run_optimization.py`
|
|
|
|
---
|
|
|
|
*This report was auto-generated by the Intelligent Setup System*
|
|
'''
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
def _create_readme(
|
|
self,
|
|
study_dir: Path,
|
|
workflow: Dict[str, Any],
|
|
benchmark_results: Dict[str, Any]
|
|
):
|
|
"""Create README for the study."""
|
|
readme_path = study_dir / "README.md"
|
|
|
|
# Format design variables
|
|
vars_md = ""
|
|
for var in workflow.get('design_variables', []):
|
|
bounds = var.get('bounds', [0, 1])
|
|
desc = var.get('description', '')
|
|
vars_md += f"- `{var['parameter']}`: {bounds[0]}-{bounds[1]} mm"
|
|
if desc:
|
|
vars_md += f" - {desc}"
|
|
vars_md += "\n"
|
|
|
|
# Format objectives
|
|
objs_md = ""
|
|
for obj in workflow.get('objectives', []):
|
|
objs_md += f"- {obj['goal'].title()} {obj['name']}\n"
|
|
|
|
# Format benchmark results
|
|
bench_md = ""
|
|
if benchmark_results.get('success'):
|
|
for name, value in benchmark_results.get('results', {}).items():
|
|
bench_md += f"- {name}: {value:.4f}\n"
|
|
|
|
content = f'''# {workflow.get('study_name', 'Optimization Study')}
|
|
|
|
**Created**: {datetime.now().strftime("%Y-%m-%d")}
|
|
**Mode**: Hybrid (Workflow JSON + Auto-generated runner)
|
|
|
|
## Problem Description
|
|
|
|
{workflow.get('optimization_request', 'N/A')}
|
|
|
|
### Design Variables
|
|
|
|
{vars_md}
|
|
|
|
### Objectives
|
|
|
|
{objs_md}
|
|
|
|
## Benchmark Results
|
|
|
|
Baseline simulation (default geometry):
|
|
|
|
{bench_md}
|
|
|
|
## Study Structure
|
|
|
|
```
|
|
{study_dir.name}/
|
|
├── 1_setup/
|
|
│ ├── model/ # FEM model files
|
|
│ └── workflow_config.json # Optimization specification
|
|
├── 2_substudies/
|
|
│ └── results/ # Optimization results
|
|
├── 3_reports/
|
|
├── run_optimization.py # Auto-generated runner
|
|
└── README.md # This file
|
|
```
|
|
|
|
## Running the Optimization
|
|
|
|
```bash
|
|
python run_optimization.py
|
|
```
|
|
|
|
This will:
|
|
1. Load workflow configuration
|
|
2. Initialize NX model updater and solver
|
|
3. Run {10} optimization trials
|
|
4. Save results to `2_substudies/results/`
|
|
|
|
## Results
|
|
|
|
After optimization completes, check:
|
|
- `2_substudies/results/study.db` - Optuna database
|
|
- `2_substudies/results/` - Best design parameters
|
|
|
|
---
|
|
|
|
**Created by Hybrid Mode** - 90% automation, production ready!
|
|
'''
|
|
|
|
with open(readme_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Example usage
|
|
creator = HybridStudyCreator()
|
|
|
|
# Example: Create study from workflow JSON
|
|
study_dir = creator.create_from_workflow(
|
|
workflow_json_path=Path("path/to/workflow.json"),
|
|
model_files={
|
|
'prt': Path("path/to/model.prt"),
|
|
'sim': Path("path/to/model.sim"),
|
|
'fem': Path("path/to/model.fem")
|
|
},
|
|
study_name="example_study"
|
|
)
|
|
|
|
print(f"Study created: {study_dir}")
|