Files
Atomizer/optimization_engine/study_wizard.py

1746 lines
59 KiB
Python
Raw Permalink Normal View History

"""
Study Creation Wizard - Comprehensive study setup for Atomizer
===============================================================
A powerful, LLM-friendly wizard that automates the complete study creation workflow:
1. Model Introspection - Discover expressions, solutions, and available results
2. Configuration Generation - Build optimization_config.json from user requirements
3. Script Generation - Generate run_optimization.py with proper extractors
4. Documentation Generation - Create README.md, STUDY_REPORT.md, MODEL_INTROSPECTION.md
This module is designed to work seamlessly with Claude Code skills.
Usage:
from optimization_engine.study_wizard import StudyWizard
wizard = StudyWizard(
study_name="my_optimization",
description="Optimize bracket for stiffness and mass"
)
# Step 1: Set model files
wizard.set_model_files(
prt_file="path/to/model.prt",
sim_file="path/to/model_sim1.sim"
)
# Step 2: Introspect model
introspection = wizard.introspect()
# Step 3: Add design variables, objectives, constraints
wizard.add_design_variable("thickness", bounds=[5, 20], units="mm")
wizard.add_objective("mass", goal="minimize", extractor="extract_mass_from_bdf")
wizard.add_constraint("max_stress", type="less_than", threshold=250, units="MPa")
# Step 4: Generate study
wizard.generate()
Author: Atomizer Development Team
Version: 1.0.0
Last Updated: 2025-12-06
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import json
import logging
import shutil
logger = logging.getLogger(__name__)
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class DesignVariable:
"""Design variable specification."""
parameter: str
bounds: Tuple[float, float]
description: str = ""
units: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"parameter": self.parameter,
"bounds": list(self.bounds),
"description": self.description or f"{self.parameter} design variable",
"units": self.units
}
@dataclass
class Objective:
"""Optimization objective specification."""
name: str
goal: str # "minimize" or "maximize"
extractor: str
params: Dict[str, Any] = field(default_factory=dict)
weight: float = 1.0
description: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"goal": self.goal,
"weight": self.weight,
"description": self.description or f"{self.goal.capitalize()} {self.name}",
"extraction": {
"action": self.extractor,
"domain": "result_extraction",
"params": self.params
}
}
@dataclass
class Constraint:
"""Optimization constraint specification."""
name: str
type: str # "less_than" or "greater_than"
threshold: float
extractor: str
params: Dict[str, Any] = field(default_factory=dict)
description: str = ""
units: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"type": self.type,
"threshold": self.threshold,
"description": self.description or f"{self.name} constraint",
"extraction": {
"action": self.extractor,
"domain": "result_extraction",
"params": self.params
}
}
@dataclass
class IntrospectionResult:
"""Results from model introspection."""
success: bool
expressions: List[Dict[str, Any]] = field(default_factory=list)
solutions: List[Dict[str, Any]] = field(default_factory=list)
bodies: List[Dict[str, Any]] = field(default_factory=list)
mass_properties: Dict[str, Any] = field(default_factory=dict)
mesh_info: Dict[str, Any] = field(default_factory=dict)
available_results: Dict[str, bool] = field(default_factory=dict)
boundary_conditions: List[Dict[str, Any]] = field(default_factory=list)
loads: List[Dict[str, Any]] = field(default_factory=list)
materials: List[Dict[str, Any]] = field(default_factory=list)
subcases: List[int] = field(default_factory=list)
error: str = ""
def get_expression_names(self) -> List[str]:
"""Get list of expression names."""
return [e.get('name', '') for e in self.expressions]
def get_solution_names(self) -> List[str]:
"""Get list of solution names."""
return [s.get('name', '') for s in self.solutions]
def suggest_design_variables(self) -> List[Dict[str, Any]]:
"""Suggest potential design variables from expressions."""
suggestions = []
for expr in self.expressions:
name = expr.get('name', '')
value = expr.get('value')
# Skip system/reference expressions
if name.startswith('p') and name[1:].isdigit():
continue
if 'mass' in name.lower() and 'input' not in name.lower():
continue
if value is not None and isinstance(value, (int, float)):
# Suggest bounds based on value
if value > 0:
bounds = (value * 0.5, value * 1.5)
else:
bounds = (value * 1.5, value * 0.5)
suggestions.append({
'name': name,
'current_value': value,
'suggested_bounds': bounds,
'units': expr.get('unit', ''),
'confidence': 'high' if any(kw in name.lower() for kw in ['thickness', 'angle', 'radius', 'length', 'width', 'height']) else 'medium'
})
return suggestions
# =============================================================================
# Extractor Catalog
# =============================================================================
EXTRACTOR_CATALOG = {
# Mass extractors
"extract_mass_from_bdf": {
"module": "optimization_engine.extractors.bdf_mass_extractor",
"function": "extract_mass_from_bdf",
"input": ".dat/.bdf",
"output_unit": "kg",
"description": "Extract total mass from BDF/DAT file"
},
"extract_part_mass": {
"module": "optimization_engine.extractors.extract_part_mass_material",
"function": "extract_part_mass",
"input": ".prt",
"output_unit": "kg",
"description": "Extract mass from NX part file via journal"
},
# Displacement extractors
"extract_displacement": {
"module": "optimization_engine.extractors.extract_displacement",
"function": "extract_displacement",
"input": ".op2",
"output_unit": "mm",
"description": "Extract displacement results from OP2"
},
# Stress extractors
"extract_solid_stress": {
"module": "optimization_engine.extractors.extract_von_mises_stress",
"function": "extract_solid_stress",
"input": ".op2",
"output_unit": "MPa",
"description": "Extract von Mises stress from OP2"
},
"extract_principal_stress": {
"module": "optimization_engine.extractors.extract_principal_stress",
"function": "extract_principal_stress",
"input": ".op2",
"output_unit": "MPa",
"description": "Extract principal stresses (σ1, σ2, σ3)"
},
# Strain energy extractors
"extract_strain_energy": {
"module": "optimization_engine.extractors.extract_strain_energy",
"function": "extract_strain_energy",
"input": ".op2",
"output_unit": "J",
"description": "Extract strain energy from OP2"
},
"extract_total_strain_energy": {
"module": "optimization_engine.extractors.extract_strain_energy",
"function": "extract_total_strain_energy",
"input": ".op2",
"output_unit": "J",
"description": "Extract total strain energy (convenience)"
},
# Reaction forces
"extract_spc_forces": {
"module": "optimization_engine.extractors.extract_spc_forces",
"function": "extract_spc_forces",
"input": ".op2",
"output_unit": "N",
"description": "Extract SPC/reaction forces from OP2"
},
# Frequency extractors
"extract_frequency": {
"module": "optimization_engine.extractors.extract_frequency",
"function": "extract_frequency",
"input": ".op2",
"output_unit": "Hz",
"description": "Extract natural frequencies from modal analysis"
},
"get_first_frequency": {
"module": "optimization_engine.extractors.extract_modal_mass",
"function": "get_first_frequency",
"input": ".f06",
"output_unit": "Hz",
"description": "Get first natural frequency from F06"
},
# Temperature extractors (Phase 3)
"extract_temperature": {
"module": "optimization_engine.extractors.extract_temperature",
"function": "extract_temperature",
"input": ".op2",
"output_unit": "K/°C",
"description": "Extract temperatures from thermal analysis"
},
"get_max_temperature": {
"module": "optimization_engine.extractors.extract_temperature",
"function": "get_max_temperature",
"input": ".op2",
"output_unit": "K/°C",
"description": "Get maximum temperature (convenience)"
},
# Modal mass (Phase 3)
"extract_modal_mass": {
"module": "optimization_engine.extractors.extract_modal_mass",
"function": "extract_modal_mass",
"input": ".f06",
"output_unit": "kg",
"description": "Extract modal effective mass from F06"
},
# Zernike (optical)
"extract_zernike_from_op2": {
"module": "optimization_engine.extractors.extract_zernike",
"function": "extract_zernike_from_op2",
"input": ".op2 + .bdf",
"output_unit": "nm",
"description": "Extract Zernike coefficients for optical surfaces"
}
}
# =============================================================================
# Protocol Catalog
# =============================================================================
PROTOCOL_CATALOG = {
"protocol_10_single": {
"name": "Single-Objective IMSO",
"sampler": "TPESampler",
"description": "Adaptive single-objective optimization",
"use_when": ["single objective", "maximize or minimize one thing"],
"directions": 1
},
"protocol_11_multi": {
"name": "Multi-Objective NSGA-II",
"sampler": "NSGAIISampler",
"description": "Pareto-optimal multi-objective optimization",
"use_when": ["multiple objectives", "pareto front", "trade-offs"],
"directions": "multiple"
}
}
# =============================================================================
# Study Wizard
# =============================================================================
class StudyWizard:
"""
Comprehensive study creation wizard for Atomizer.
This wizard guides the complete study setup process:
1. Model introspection (discover expressions, solutions, results)
2. Configuration generation (optimization_config.json)
3. Script generation (run_optimization.py, reset_study.py)
4. Documentation generation (README.md, STUDY_REPORT.md, MODEL_INTROSPECTION.md)
"""
def __init__(
self,
study_name: str,
description: str = "",
studies_dir: Optional[Path] = None
):
"""
Initialize study wizard.
Args:
study_name: Name of the study (used for directory name)
description: Human-readable description
studies_dir: Base directory for studies (default: project/studies/)
"""
self.study_name = study_name
self.description = description
# Set studies directory
if studies_dir is None:
# Find project root by looking for CLAUDE.md
current = Path(__file__).parent
while current != current.parent:
if (current / "CLAUDE.md").exists():
studies_dir = current / "studies"
break
current = current.parent
else:
studies_dir = Path.cwd() / "studies"
self.studies_dir = Path(studies_dir)
self.study_dir = self.studies_dir / study_name
# Model files
self.prt_file: Optional[Path] = None
self.sim_file: Optional[Path] = None
self.fem_file: Optional[Path] = None
self.op2_file: Optional[Path] = None
# Configuration
self.design_variables: List[DesignVariable] = []
self.objectives: List[Objective] = []
self.constraints: List[Constraint] = []
# Introspection results
self.introspection: Optional[IntrospectionResult] = None
# Settings
self.protocol = "protocol_11_multi" # Default to multi-objective
self.n_trials = 100
self.timeout_per_trial = 400
self.neural_enabled = False
logger.info(f"StudyWizard initialized for '{study_name}'")
logger.info(f" Study directory: {self.study_dir}")
# =========================================================================
# Model File Management
# =========================================================================
def set_model_files(
self,
prt_file: Union[str, Path],
sim_file: Optional[Union[str, Path]] = None,
fem_file: Optional[Union[str, Path]] = None,
op2_file: Optional[Union[str, Path]] = None
) -> "StudyWizard":
"""
Set model files for the study.
Args:
prt_file: Path to NX part file (.prt)
sim_file: Path to simulation file (.sim) - auto-detected if None
fem_file: Path to FEM file (.fem) - auto-detected if None
op2_file: Path to OP2 results file (.op2) - optional
Returns:
Self for method chaining
"""
self.prt_file = Path(prt_file)
if not self.prt_file.exists():
raise FileNotFoundError(f"Part file not found: {self.prt_file}")
# Auto-detect sim file
if sim_file:
self.sim_file = Path(sim_file)
else:
# Look for *_sim1.sim or *.sim in same directory
prt_dir = self.prt_file.parent
sim_candidates = list(prt_dir.glob("*_sim1.sim")) + list(prt_dir.glob("*.sim"))
if sim_candidates:
self.sim_file = sim_candidates[0]
logger.info(f" Auto-detected sim file: {self.sim_file.name}")
# Auto-detect fem file
if fem_file:
self.fem_file = Path(fem_file)
elif self.sim_file:
fem_candidates = list(self.sim_file.parent.glob("*_fem1.fem")) + list(self.sim_file.parent.glob("*.fem"))
if fem_candidates:
self.fem_file = fem_candidates[0]
logger.info(f" Auto-detected fem file: {self.fem_file.name}")
# Set op2 file if provided
if op2_file:
self.op2_file = Path(op2_file)
logger.info(f"Model files set:")
logger.info(f" PRT: {self.prt_file}")
logger.info(f" SIM: {self.sim_file}")
logger.info(f" FEM: {self.fem_file}")
return self
# =========================================================================
# Model Introspection
# =========================================================================
def introspect(self, run_baseline: bool = False) -> IntrospectionResult:
"""
Introspect model to discover expressions, solutions, and available results.
Uses the model_introspection module for comprehensive analysis.
Args:
run_baseline: If True, run a baseline solve to generate OP2 for introspection
Returns:
IntrospectionResult with all discovered information
"""
logger.info("=" * 60)
logger.info("MODEL INTROSPECTION")
logger.info("=" * 60)
try:
from optimization_engine.hooks.nx_cad.model_introspection import (
introspect_part,
introspect_simulation,
introspect_op2
)
except ImportError:
logger.warning("Model introspection module not available")
self.introspection = IntrospectionResult(
success=False,
error="Model introspection module not available"
)
return self.introspection
result = IntrospectionResult(success=True)
# Introspect part file
if self.prt_file and self.prt_file.exists():
logger.info(f"\nIntrospecting part: {self.prt_file.name}")
part_result = introspect_part(str(self.prt_file))
if part_result.get('success'):
data = part_result.get('data', {})
result.expressions = data.get('expressions', [])
result.bodies = data.get('bodies', [])
result.mass_properties = data.get('mass_properties', {})
logger.info(f" Found {len(result.expressions)} expressions")
logger.info(f" Found {len(result.bodies)} bodies")
if result.mass_properties:
logger.info(f" Mass: {result.mass_properties.get('mass', 'N/A')} kg")
else:
logger.warning(f" Part introspection failed: {part_result.get('error')}")
# Introspect simulation file
if self.sim_file and self.sim_file.exists():
logger.info(f"\nIntrospecting simulation: {self.sim_file.name}")
sim_result = introspect_simulation(str(self.sim_file))
if sim_result.get('success'):
data = sim_result.get('data', {})
result.solutions = data.get('solutions', [])
result.boundary_conditions = data.get('boundary_conditions', [])
result.loads = data.get('loads', [])
result.materials = data.get('materials', [])
result.mesh_info = data.get('mesh_info', {})
logger.info(f" Found {len(result.solutions)} solutions")
logger.info(f" Found {len(result.boundary_conditions)} boundary conditions")
logger.info(f" Found {len(result.loads)} loads")
else:
logger.warning(f" Simulation introspection failed: {sim_result.get('error')}")
# Introspect OP2 file if available
if self.op2_file and self.op2_file.exists():
logger.info(f"\nIntrospecting OP2: {self.op2_file.name}")
op2_result = introspect_op2(str(self.op2_file))
if op2_result.get('success'):
data = op2_result.get('data', {})
result.available_results = data.get('available_results', {})
result.subcases = data.get('subcases', [])
logger.info(f" Available results: {result.available_results}")
logger.info(f" Subcases: {result.subcases}")
else:
logger.warning(f" OP2 introspection failed: {op2_result.get('error')}")
self.introspection = result
return result
# =========================================================================
# Design Variable, Objective, Constraint Management
# =========================================================================
def add_design_variable(
self,
parameter: str,
bounds: Tuple[float, float],
description: str = "",
units: str = ""
) -> "StudyWizard":
"""
Add a design variable to the study.
Args:
parameter: Name of the NX expression to vary
bounds: (min, max) bounds for the variable
description: Human-readable description
units: Units (e.g., "mm", "degrees")
Returns:
Self for method chaining
"""
dv = DesignVariable(
parameter=parameter,
bounds=bounds,
description=description,
units=units
)
self.design_variables.append(dv)
logger.info(f"Added design variable: {parameter} [{bounds[0]}, {bounds[1]}] {units}")
return self
def add_objective(
self,
name: str,
goal: str,
extractor: str,
params: Optional[Dict[str, Any]] = None,
weight: float = 1.0,
description: str = ""
) -> "StudyWizard":
"""
Add an optimization objective.
Args:
name: Objective name (e.g., "mass", "stiffness")
goal: "minimize" or "maximize"
extractor: Extractor function name from catalog
params: Additional parameters for extractor
weight: Weight for multi-objective optimization
description: Human-readable description
Returns:
Self for method chaining
"""
if extractor not in EXTRACTOR_CATALOG:
logger.warning(f"Extractor '{extractor}' not in catalog, proceeding anyway")
obj = Objective(
name=name,
goal=goal,
extractor=extractor,
params=params or {},
weight=weight,
description=description
)
self.objectives.append(obj)
logger.info(f"Added objective: {goal} {name} (extractor: {extractor})")
return self
def add_constraint(
self,
name: str,
constraint_type: str,
threshold: float,
extractor: str,
params: Optional[Dict[str, Any]] = None,
description: str = "",
units: str = ""
) -> "StudyWizard":
"""
Add an optimization constraint.
Args:
name: Constraint name (e.g., "max_stress")
constraint_type: "less_than" or "greater_than"
threshold: Constraint threshold value
extractor: Extractor function name from catalog
params: Additional parameters for extractor
description: Human-readable description
units: Units for display
Returns:
Self for method chaining
"""
const = Constraint(
name=name,
type=constraint_type,
threshold=threshold,
extractor=extractor,
params=params or {},
description=description,
units=units
)
self.constraints.append(const)
logger.info(f"Added constraint: {name} {constraint_type} {threshold} {units}")
return self
# =========================================================================
# Settings
# =========================================================================
def set_protocol(self, protocol: str) -> "StudyWizard":
"""Set optimization protocol."""
if protocol not in PROTOCOL_CATALOG:
raise ValueError(f"Unknown protocol: {protocol}. Available: {list(PROTOCOL_CATALOG.keys())}")
self.protocol = protocol
return self
def set_trials(self, n_trials: int) -> "StudyWizard":
"""Set number of optimization trials."""
self.n_trials = n_trials
return self
def enable_neural(self, enabled: bool = True) -> "StudyWizard":
"""Enable/disable neural acceleration."""
self.neural_enabled = enabled
return self
# =========================================================================
# Generation
# =========================================================================
def generate(self, copy_model_files: bool = True) -> Dict[str, Path]:
"""
Generate complete study structure.
Creates:
- Study directory structure
- optimization_config.json
- workflow_config.json
- run_optimization.py
- reset_study.py
- README.md
- STUDY_REPORT.md
- MODEL_INTROSPECTION.md
Args:
copy_model_files: If True, copy model files to study directory
Returns:
Dict of generated file paths
"""
logger.info("=" * 60)
logger.info("GENERATING STUDY")
logger.info("=" * 60)
# Validate
if not self.design_variables:
raise ValueError("At least one design variable is required")
if not self.objectives:
raise ValueError("At least one objective is required")
# Create directory structure
setup_dir = self.study_dir / "1_setup"
model_dir = setup_dir / "model"
results_dir = self.study_dir / "2_results"
setup_dir.mkdir(parents=True, exist_ok=True)
model_dir.mkdir(exist_ok=True)
results_dir.mkdir(exist_ok=True)
logger.info(f"Created study directory: {self.study_dir}")
generated_files = {}
# Copy model files
if copy_model_files and self.prt_file:
self._copy_model_files(model_dir)
# Generate optimization_config.json
config_path = setup_dir / "optimization_config.json"
self._generate_config(config_path)
generated_files['optimization_config'] = config_path
# Generate workflow_config.json
workflow_path = setup_dir / "workflow_config.json"
self._generate_workflow_config(workflow_path)
generated_files['workflow_config'] = workflow_path
# Generate run_optimization.py
run_script_path = self.study_dir / "run_optimization.py"
self._generate_run_script(run_script_path)
generated_files['run_optimization'] = run_script_path
# Generate reset_study.py
reset_script_path = self.study_dir / "reset_study.py"
self._generate_reset_script(reset_script_path)
generated_files['reset_study'] = reset_script_path
# Generate documentation
readme_path = self.study_dir / "README.md"
self._generate_readme(readme_path)
generated_files['readme'] = readme_path
report_path = self.study_dir / "STUDY_REPORT.md"
self._generate_study_report(report_path)
generated_files['study_report'] = report_path
introspection_path = self.study_dir / "MODEL_INTROSPECTION.md"
self._generate_introspection_report(introspection_path)
generated_files['model_introspection'] = introspection_path
logger.info("=" * 60)
logger.info("STUDY GENERATION COMPLETE")
logger.info("=" * 60)
logger.info(f"\nGenerated files:")
for name, path in generated_files.items():
logger.info(f" {name}: {path}")
logger.info(f"\nNext steps:")
logger.info(f" 1. Review generated files")
logger.info(f" 2. cd {self.study_dir}")
logger.info(f" 3. python run_optimization.py --discover")
logger.info(f" 4. python run_optimization.py --validate")
logger.info(f" 5. python run_optimization.py --run --trials {self.n_trials}")
return generated_files
def _copy_model_files(self, model_dir: Path):
"""Copy model files to study directory."""
logger.info("Copying model files...")
files_to_copy = [self.prt_file, self.sim_file, self.fem_file]
for src in files_to_copy:
if src and src.exists():
dst = model_dir / src.name
if not dst.exists():
shutil.copy2(src, dst)
logger.info(f" Copied: {src.name}")
else:
logger.info(f" Already exists: {src.name}")
def _generate_config(self, path: Path):
"""Generate optimization_config.json."""
logger.info(f"Generating: {path.name}")
# Determine simulation files
model_name = self.prt_file.stem if self.prt_file else "model"
sim_name = self.sim_file.stem if self.sim_file else f"{model_name}_sim1"
# Infer dat and op2 file names
dat_file = f"{sim_name.lower()}-solution_1.dat"
op2_file = f"{sim_name.lower()}-solution_1.op2"
# Determine sampler from protocol
sampler = PROTOCOL_CATALOG.get(self.protocol, {}).get('sampler', 'NSGAIISampler')
config = {
"study_name": self.study_name,
"description": self.description,
"engineering_context": f"Generated by StudyWizard on {datetime.now().strftime('%Y-%m-%d %H:%M')}",
"template_info": {
"category": "structural",
"analysis_type": "static",
"typical_applications": [],
"neural_enabled": self.neural_enabled
},
"optimization_settings": {
"protocol": self.protocol,
"n_trials": self.n_trials,
"sampler": sampler,
"pruner": None,
"timeout_per_trial": self.timeout_per_trial
},
"design_variables": [dv.to_dict() for dv in self.design_variables],
"objectives": [obj.to_dict() for obj in self.objectives],
"constraints": [const.to_dict() for const in self.constraints],
"simulation": {
"model_file": self.prt_file.name if self.prt_file else "",
"sim_file": self.sim_file.name if self.sim_file else "",
"fem_file": self.fem_file.name if self.fem_file else "",
"solver": "nastran",
"analysis_types": ["static"],
"solution_name": "Solution 1",
"dat_file": dat_file,
"op2_file": op2_file
},
"result_extraction": self._build_extraction_config(),
"reporting": {
"generate_plots": True,
"save_incremental": True,
"llm_summary": True,
"generate_pareto_front": len(self.objectives) > 1
},
"neural_acceleration": {
"enabled": self.neural_enabled,
"min_training_points": 50,
"auto_train": True,
"epochs": 100,
"validation_split": 0.2
}
}
with open(path, 'w') as f:
json.dump(config, f, indent=2)
def _build_extraction_config(self) -> Dict[str, Any]:
"""Build result_extraction section of config."""
extraction = {}
# Add extractors for objectives
for obj in self.objectives:
extractor_info = EXTRACTOR_CATALOG.get(obj.extractor, {})
extraction[obj.name] = {
"method": obj.extractor,
"extractor_module": extractor_info.get('module', ''),
"function": extractor_info.get('function', obj.extractor),
"output_unit": extractor_info.get('output_unit', '')
}
# Add extractors for constraints
for const in self.constraints:
if const.name not in extraction:
extractor_info = EXTRACTOR_CATALOG.get(const.extractor, {})
extraction[const.name] = {
"method": const.extractor,
"extractor_module": extractor_info.get('module', ''),
"function": extractor_info.get('function', const.extractor),
"output_unit": extractor_info.get('output_unit', const.units)
}
return extraction
def _generate_workflow_config(self, path: Path):
"""Generate workflow_config.json."""
logger.info(f"Generating: {path.name}")
config = {
"workflow_id": f"{self.study_name}_workflow",
"description": f"Workflow for {self.study_name}",
"steps": []
}
with open(path, 'w') as f:
json.dump(config, f, indent=2)
def _generate_run_script(self, path: Path):
"""Generate run_optimization.py script."""
logger.info(f"Generating: {path.name}")
# Build import statements for extractors
extractor_imports = set()
for obj in self.objectives:
info = EXTRACTOR_CATALOG.get(obj.extractor, {})
if info.get('module'):
extractor_imports.add(f"from {info['module']} import {info.get('function', obj.extractor)}")
for const in self.constraints:
info = EXTRACTOR_CATALOG.get(const.extractor, {})
if info.get('module'):
extractor_imports.add(f"from {info['module']} import {info.get('function', const.extractor)}")
# Determine if multi-objective
is_multi = len(self.objectives) > 1
sampler = PROTOCOL_CATALOG.get(self.protocol, {}).get('sampler', 'NSGAIISampler')
# Build objective function extraction code
extraction_code = self._build_extraction_code()
# Build return statement
if is_multi:
returns = ", ".join([f"obj_{obj.name}" for obj in self.objectives])
return_stmt = f"return ({returns})"
else:
obj = self.objectives[0]
return_stmt = f"return obj_{obj.name}"
script = f'''"""
{self.study_name} - Optimization Script
{"=" * 60}
{self.description}
Protocol: {PROTOCOL_CATALOG.get(self.protocol, {}).get('name', self.protocol)}
Staged Workflow:
----------------
1. DISCOVER: python run_optimization.py --discover
2. VALIDATE: python run_optimization.py --validate
3. TEST: python run_optimization.py --test
4. RUN: python run_optimization.py --run --trials {self.n_trials}
Generated by StudyWizard on {datetime.now().strftime('%Y-%m-%d %H:%M')}
"""
from pathlib import Path
import sys
import json
import argparse
from datetime import datetime
from typing import Optional, Tuple, List
# Add parent directory to path
project_root = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root))
import optuna
from optuna.samplers import {sampler}
# Core imports
from optimization_engine.nx_solver import NXSolver
from optimization_engine.logger import get_logger
# Extractor imports
{chr(10).join(sorted(extractor_imports))}
def load_config(config_file: Path) -> dict:
"""Load configuration from JSON file."""
with open(config_file, 'r') as f:
return json.load(f)
def clean_nastran_files(model_dir: Path, logger) -> List[Path]:
"""Remove old Nastran solver output files."""
patterns = ['*.op2', '*.f06', '*.log', '*.f04', '*.pch', '*.DBALL', '*.MASTER', '_temp*.txt']
deleted = []
for pattern in patterns:
for f in model_dir.glob(pattern):
try:
f.unlink()
deleted.append(f)
logger.info(f" Deleted: {{f.name}}")
except Exception as e:
logger.warning(f" Failed to delete {{f.name}}: {{e}}")
return deleted
def objective(trial: optuna.Trial, config: dict, nx_solver: NXSolver,
model_dir: Path, logger) -> {"Tuple[" + ", ".join(["float"] * len(self.objectives)) + "]" if is_multi else "float"}:
"""
Objective function for optimization.
{"Returns tuple of objectives for multi-objective optimization." if is_multi else "Returns single objective value."}
"""
# Sample design variables
design_vars = {{}}
for var in config['design_variables']:
param_name = var['parameter']
bounds = var['bounds']
design_vars[param_name] = trial.suggest_float(param_name, bounds[0], bounds[1])
logger.trial_start(trial.number, design_vars)
try:
# Get file paths
sim_file = model_dir / config['simulation']['sim_file']
# Run FEA simulation
result = nx_solver.run_simulation(
sim_file=sim_file,
working_dir=model_dir,
expression_updates=design_vars,
solution_name=config['simulation'].get('solution_name'),
cleanup=True
)
if not result['success']:
logger.trial_failed(trial.number, f"Simulation failed: {{result.get('error', 'Unknown')}}")
return {"(" + ", ".join(["float('inf')"] * len(self.objectives)) + ")" if is_multi else "float('inf')"}
op2_file = result['op2_file']
dat_file = model_dir / config['simulation']['dat_file']
{extraction_code}
# Check constraints
feasible = True
constraint_results = {{}}
{self._build_constraint_check_code()}
# Set user attributes
{self._build_user_attrs_code()}
trial.set_user_attr('feasible', feasible)
objectives = {{{", ".join([f"'{obj.name}': obj_{obj.name}" for obj in self.objectives])}}}
logger.trial_complete(trial.number, objectives, constraint_results, feasible)
{return_stmt}
except Exception as e:
logger.trial_failed(trial.number, str(e))
return {"(" + ", ".join(["float('inf')"] * len(self.objectives)) + ")" if is_multi else "float('inf')"}
def main():
"""Main optimization workflow."""
parser = argparse.ArgumentParser(description='{self.study_name}')
stage_group = parser.add_mutually_exclusive_group()
stage_group.add_argument('--discover', action='store_true', help='Discover model outputs')
stage_group.add_argument('--validate', action='store_true', help='Run single validation trial')
stage_group.add_argument('--test', action='store_true', help='Run 3-trial test')
stage_group.add_argument('--run', action='store_true', help='Run optimization')
parser.add_argument('--trials', type=int, default={self.n_trials}, help='Number of trials')
parser.add_argument('--resume', action='store_true', help='Resume existing study')
parser.add_argument('--clean', action='store_true', help='Clean old files first')
args = parser.parse_args()
if not any([args.discover, args.validate, args.test, args.run]):
print("No stage specified. Use --discover, --validate, --test, or --run")
return 1
# Setup paths
study_dir = Path(__file__).parent
config_path = study_dir / "1_setup" / "optimization_config.json"
model_dir = study_dir / "1_setup" / "model"
results_dir = study_dir / "2_results"
results_dir.mkdir(exist_ok=True)
study_name = "{self.study_name}"
# Initialize
logger = get_logger(study_name, study_dir=results_dir)
config = load_config(config_path)
nx_solver = NXSolver()
if args.clean:
clean_nastran_files(model_dir, logger)
# Run appropriate stage
if args.discover or args.validate or args.test:
# Run limited trials for these stages
n = 1 if args.discover or args.validate else 3
storage = f"sqlite:///{{results_dir / 'study_test.db'}}"
study = optuna.create_study(
study_name=f"{{study_name}}_test",
storage=storage,
sampler={sampler}({"population_size=5, seed=42" if is_multi else "seed=42"}),
{"directions=['minimize'] * " + str(len(self.objectives)) if is_multi else "direction='minimize'"},
load_if_exists=False
)
study.optimize(
lambda trial: objective(trial, config, nx_solver, model_dir, logger),
n_trials=n,
show_progress_bar=True
)
logger.info(f"Completed {{len(study.trials)}} trial(s)")
return 0
# Full optimization run
storage = f"sqlite:///{{results_dir / 'study.db'}}"
if args.resume:
study = optuna.load_study(
study_name=study_name,
storage=storage,
sampler={sampler}({"population_size=20, seed=42" if is_multi else "seed=42"})
)
else:
study = optuna.create_study(
study_name=study_name,
storage=storage,
sampler={sampler}({"population_size=20, seed=42" if is_multi else "seed=42"}),
{"directions=['minimize'] * " + str(len(self.objectives)) if is_multi else "direction='minimize'"},
load_if_exists=True
)
logger.study_start(study_name, args.trials, "{sampler}")
study.optimize(
lambda trial: objective(trial, config, nx_solver, model_dir, logger),
n_trials=args.trials,
show_progress_bar=True
)
n_complete = len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE])
logger.study_complete(study_name, len(study.trials), n_complete)
# Report results
{"pareto_trials = study.best_trials" if is_multi else "best_trial = study.best_trial"}
logger.info(f"\\nOptimization Complete!")
logger.info(f"Total trials: {{len(study.trials)}}")
logger.info(f"Successful: {{n_complete}}")
return 0
if __name__ == "__main__":
exit(main())
'''
with open(path, 'w') as f:
f.write(script)
def _build_extraction_code(self) -> str:
"""Build extraction code for objective function."""
lines = []
lines.append(" # Extract results")
for obj in self.objectives:
info = EXTRACTOR_CATALOG.get(obj.extractor, {})
func = info.get('function', obj.extractor)
# Determine source file
if '.dat' in info.get('input', '') or '.bdf' in info.get('input', ''):
source = "dat_file"
else:
source = "op2_file"
# Build extraction call
if 'displacement' in obj.extractor.lower():
lines.append(f" disp_result = {func}({source}, subcase=1)")
lines.append(f" max_displacement = disp_result['max_displacement']")
if obj.goal == 'maximize':
lines.append(f" # For stiffness maximization, use inverse of displacement")
lines.append(f" applied_force = 1000.0 # N - adjust based on your model")
lines.append(f" obj_{obj.name} = -applied_force / max(abs(max_displacement), 1e-6)")
else:
lines.append(f" obj_{obj.name} = max_displacement")
elif 'mass' in obj.extractor.lower():
lines.append(f" obj_{obj.name} = {func}(str({source}))")
if obj.goal == 'maximize':
lines.append(f" obj_{obj.name} = -obj_{obj.name} # Negate for maximization")
elif 'stress' in obj.extractor.lower():
lines.append(f" stress_result = {func}({source}, subcase=1)")
lines.append(f" obj_{obj.name} = stress_result.get('max_von_mises', float('inf'))")
if obj.goal == 'maximize':
lines.append(f" obj_{obj.name} = -obj_{obj.name} # Negate for maximization")
else:
# Generic extraction
lines.append(f" obj_{obj.name} = {func}({source})")
if obj.goal == 'maximize':
lines.append(f" obj_{obj.name} = -obj_{obj.name} # Negate for maximization")
lines.append(f" logger.info(f' {obj.name}: {{obj_{obj.name}}}')")
lines.append("")
return "\n".join(lines)
def _build_constraint_check_code(self) -> str:
"""Build constraint checking code."""
if not self.constraints:
return " pass # No constraints defined"
lines = []
for const in self.constraints:
lines.append(f" # Check {const.name}")
# Get the value to check (may need extraction)
if any(obj.name == const.name for obj in self.objectives):
# Already extracted as objective
value_var = f"obj_{const.name}"
else:
# Need to extract
info = EXTRACTOR_CATALOG.get(const.extractor, {})
func = info.get('function', const.extractor)
source = "dat_file" if '.dat' in info.get('input', '') else "op2_file"
lines.append(f" const_{const.name} = {func}({source})")
value_var = f"const_{const.name}"
lines.append(f" constraint_results['{const.name}'] = {value_var}")
if const.type == "less_than":
lines.append(f" if {value_var} > {const.threshold}:")
else:
lines.append(f" if {value_var} < {const.threshold}:")
lines.append(f" feasible = False")
lines.append(f" logger.warning(f' Constraint violation: {const.name} = {{{value_var}}} vs {const.threshold}')")
lines.append("")
return "\n".join(lines)
def _build_user_attrs_code(self) -> str:
"""Build user attributes setting code."""
lines = []
for obj in self.objectives:
lines.append(f" trial.set_user_attr('{obj.name}', obj_{obj.name})")
return "\n".join(lines)
def _generate_reset_script(self, path: Path):
"""Generate reset_study.py script."""
logger.info(f"Generating: {path.name}")
script = f'''"""
Reset study - Delete results database and logs.
Usage:
python reset_study.py
python reset_study.py --confirm # Skip confirmation
"""
from pathlib import Path
import shutil
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--confirm', action='store_true', help='Skip confirmation')
args = parser.parse_args()
study_dir = Path(__file__).parent
results_dir = study_dir / "2_results"
if not args.confirm:
print(f"This will delete all results in: {{results_dir}}")
response = input("Are you sure? (y/N): ")
if response.lower() != 'y':
print("Cancelled.")
return
# Delete database files
for f in results_dir.glob("*.db"):
f.unlink()
print(f"Deleted: {{f.name}}")
# Delete log files
for f in results_dir.glob("*.log"):
f.unlink()
print(f"Deleted: {{f.name}}")
# Delete JSON results
for f in results_dir.glob("*.json"):
f.unlink()
print(f"Deleted: {{f.name}}")
print("Study reset complete.")
if __name__ == "__main__":
main()
'''
with open(path, 'w') as f:
f.write(script)
def _generate_readme(self, path: Path):
"""Generate README.md documentation."""
logger.info(f"Generating: {path.name}")
# Build design variables table
dv_table = "| Parameter | Bounds | Units | Description |\n|-----------|--------|-------|-------------|\n"
for dv in self.design_variables:
dv_table += f"| `{dv.parameter}` | [{dv.bounds[0]}, {dv.bounds[1]}] | {dv.units} | {dv.description} |\n"
# Build objectives table
obj_table = "| Objective | Goal | Extractor | Weight |\n|-----------|------|-----------|--------|\n"
for obj in self.objectives:
obj_table += f"| {obj.name} | {obj.goal} | `{obj.extractor}` | {obj.weight} |\n"
# Build constraints table
const_table = "| Constraint | Type | Threshold | Units |\n|------------|------|-----------|-------|\n"
for const in self.constraints:
const_table += f"| {const.name} | {const.type} | {const.threshold} | {const.units} |\n"
protocol_info = PROTOCOL_CATALOG.get(self.protocol, {})
readme = f'''# {self.study_name}
{self.description}
**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M')}
**Protocol**: {protocol_info.get('name', self.protocol)}
**Trials**: {self.n_trials}
---
## 1. Engineering Problem
{self.description}
---
## 2. Mathematical Formulation
### Design Variables
{dv_table}
### Objectives
{obj_table}
### Constraints
{const_table if self.constraints else "No constraints defined."}
---
## 3. Optimization Algorithm
- **Protocol**: {self.protocol}
- **Sampler**: {protocol_info.get('sampler', 'TPESampler')}
- **Trials**: {self.n_trials}
- **Neural Acceleration**: {"Enabled" if self.neural_enabled else "Disabled"}
---
## 4. Simulation Pipeline
```
Design Variables NX Expression Update Nastran Solve Result Extraction Objective Evaluation
```
---
## 5. Result Extraction Methods
| Result | Extractor | Source |
|--------|-----------|--------|
{chr(10).join([f"| {obj.name} | `{obj.extractor}` | OP2/DAT |" for obj in self.objectives])}
---
## 6. Study File Structure
```
{self.study_name}/
1_setup/
model/
{self.prt_file.name if self.prt_file else "Model.prt"}
{self.sim_file.name if self.sim_file else "Model_sim1.sim"}
{self.fem_file.name if self.fem_file else "Model_fem1.fem"}
optimization_config.json
workflow_config.json
2_results/
study.db
optimization.log
run_optimization.py
reset_study.py
README.md
STUDY_REPORT.md
MODEL_INTROSPECTION.md
```
---
## 7. Quick Start
```bash
# 1. Discover model outputs
python run_optimization.py --discover
# 2. Validate setup with single trial
python run_optimization.py --validate
# 3. Run integration test (3 trials)
python run_optimization.py --test
# 4. Run full optimization
python run_optimization.py --run --trials {self.n_trials}
# 5. Resume if interrupted
python run_optimization.py --run --trials 50 --resume
```
---
## 8. Results Location
| File | Description |
|------|-------------|
| `2_results/study.db` | Optuna SQLite database |
| `2_results/optimization.log` | Structured log file |
| `2_results/pareto_front.json` | Pareto-optimal solutions |
---
## 9. References
- [Atomizer Documentation](../../docs/)
- [Protocol {self.protocol}](../../docs/protocols/system/)
- [Extractor Library](../../docs/protocols/system/SYS_12_EXTRACTOR_LIBRARY.md)
'''
with open(path, 'w') as f:
f.write(readme)
def _generate_study_report(self, path: Path):
"""Generate STUDY_REPORT.md template."""
logger.info(f"Generating: {path.name}")
report = f'''# Study Report: {self.study_name}
**Status**: Not Started
**Created**: {datetime.now().strftime('%Y-%m-%d %H:%M')}
**Last Updated**: {datetime.now().strftime('%Y-%m-%d %H:%M')}
---
## 1. Optimization Progress
| Metric | Value |
|--------|-------|
| Total Trials | 0 |
| Successful Trials | 0 |
| Best Objective | - |
| Duration | - |
---
## 2. Best Solutions
*No optimization runs completed yet.*
---
## 3. Pareto Front (if multi-objective)
*No Pareto front generated yet.*
---
## 4. Design Variable Sensitivity
*Analysis pending optimization runs.*
---
## 5. Constraint Satisfaction
*Analysis pending optimization runs.*
---
## 6. Recommendations
*Recommendations will be added after optimization runs.*
---
## 7. Next Steps
1. [ ] Run `python run_optimization.py --discover`
2. [ ] Run `python run_optimization.py --validate`
3. [ ] Run `python run_optimization.py --test`
4. [ ] Run `python run_optimization.py --run --trials {self.n_trials}`
5. [ ] Analyze results and update this report
---
*Generated by StudyWizard*
'''
with open(path, 'w') as f:
f.write(report)
def _generate_introspection_report(self, path: Path):
"""Generate MODEL_INTROSPECTION.md report."""
logger.info(f"Generating: {path.name}")
# Build expressions table
if self.introspection and self.introspection.expressions:
expr_table = "| Name | Value | Unit | Optimization Candidate |\n|------|-------|------|------------------------|\n"
for expr in self.introspection.expressions[:20]: # Limit to 20
name = expr.get('name', '')
value = expr.get('value', 'N/A')
unit = expr.get('unit', '')
candidate = "✓ High" if any(kw in name.lower() for kw in ['thickness', 'angle', 'radius', 'length']) else "Medium"
expr_table += f"| {name} | {value} | {unit} | {candidate} |\n"
else:
expr_table = "*Run introspection to discover expressions.*"
# Build solutions table
if self.introspection and self.introspection.solutions:
sol_table = "| Solution | Type | Status |\n|----------|------|--------|\n"
for sol in self.introspection.solutions:
sol_table += f"| {sol.get('name', 'Unknown')} | {sol.get('type', 'Static')} | ✓ Active |\n"
else:
sol_table = "*Run introspection to discover solutions.*"
report = f'''# Model Introspection Report
**Study**: {self.study_name}
**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M')}
**Introspection Version**: 1.0
---
## 1. Files Discovered
| Type | File | Status |
|------|------|--------|
| Part (.prt) | {self.prt_file.name if self.prt_file else "Not set"} | {"✓ Found" if self.prt_file and self.prt_file.exists() else "❌ Missing"} |
| Simulation (.sim) | {self.sim_file.name if self.sim_file else "Not set"} | {"✓ Found" if self.sim_file and self.sim_file.exists() else "❌ Missing"} |
| FEM (.fem) | {self.fem_file.name if self.fem_file else "Not set"} | {"✓ Found" if self.fem_file and self.fem_file.exists() else "⚠ Will be created"} |
---
## 2. Expressions (Potential Design Variables)
{expr_table}
---
## 3. Solutions
{sol_table}
---
## 4. Available Results
| Result Type | Available | Subcases |
|-------------|-----------|----------|
| Displacement | {"" if self.introspection and self.introspection.available_results.get('displacement') else "?"} | - |
| Stress | {"" if self.introspection and self.introspection.available_results.get('stress') else "?"} | - |
| SPC Forces | {"" if self.introspection and self.introspection.available_results.get('spc_forces') else "?"} | - |
---
## 5. Optimization Configuration
### Selected Design Variables
{chr(10).join([f"- `{dv.parameter}`: [{dv.bounds[0]}, {dv.bounds[1]}] {dv.units}" for dv in self.design_variables]) if self.design_variables else "*No design variables configured yet.*"}
### Selected Objectives
{chr(10).join([f"- {obj.goal.capitalize()} `{obj.name}` using `{obj.extractor}`" for obj in self.objectives]) if self.objectives else "*No objectives configured yet.*"}
### Selected Constraints
{chr(10).join([f"- `{c.name}` {c.type} {c.threshold} {c.units}" for c in self.constraints]) if self.constraints else "*No constraints configured.*"}
---
*Ready to create optimization study? Run `python run_optimization.py --discover` to proceed.*
'''
with open(path, 'w') as f:
f.write(report)
# =============================================================================
# Convenience Functions
# =============================================================================
def create_study(
study_name: str,
description: str,
prt_file: Union[str, Path],
design_variables: List[Dict[str, Any]],
objectives: List[Dict[str, Any]],
constraints: Optional[List[Dict[str, Any]]] = None,
n_trials: int = 100,
protocol: str = "protocol_11_multi"
) -> Dict[str, Path]:
"""
Convenience function to create a complete study in one call.
Args:
study_name: Name of the study
description: Human-readable description
prt_file: Path to NX part file
design_variables: List of design variable dicts with keys:
- parameter: str
- bounds: [min, max]
- units: str (optional)
- description: str (optional)
objectives: List of objective dicts with keys:
- name: str
- goal: "minimize" or "maximize"
- extractor: str
- params: dict (optional)
constraints: List of constraint dicts with keys:
- name: str
- type: "less_than" or "greater_than"
- threshold: float
- extractor: str
- units: str (optional)
n_trials: Number of optimization trials
protocol: Optimization protocol
Returns:
Dict of generated file paths
Example:
create_study(
study_name="bracket_opt",
description="Optimize bracket for stiffness",
prt_file="Bracket.prt",
design_variables=[
{"parameter": "thickness", "bounds": [5, 20], "units": "mm"}
],
objectives=[
{"name": "stiffness", "goal": "maximize", "extractor": "extract_displacement"}
],
constraints=[
{"name": "mass", "type": "less_than", "threshold": 0.5, "extractor": "extract_mass_from_bdf", "units": "kg"}
]
)
"""
wizard = StudyWizard(study_name, description)
wizard.set_model_files(prt_file)
wizard.set_protocol(protocol)
wizard.set_trials(n_trials)
# Add design variables
for dv in design_variables:
wizard.add_design_variable(
parameter=dv['parameter'],
bounds=tuple(dv['bounds']),
units=dv.get('units', ''),
description=dv.get('description', '')
)
# Add objectives
for obj in objectives:
wizard.add_objective(
name=obj['name'],
goal=obj['goal'],
extractor=obj['extractor'],
params=obj.get('params', {}),
weight=obj.get('weight', 1.0),
description=obj.get('description', '')
)
# Add constraints
for const in (constraints or []):
wizard.add_constraint(
name=const['name'],
constraint_type=const['type'],
threshold=const['threshold'],
extractor=const['extractor'],
params=const.get('params', {}),
units=const.get('units', ''),
description=const.get('description', '')
)
# Run introspection if model files exist
try:
wizard.introspect()
except Exception as e:
logger.warning(f"Introspection failed: {e}")
return wizard.generate()
def list_extractors() -> Dict[str, Dict[str, Any]]:
"""Return the extractor catalog."""
return EXTRACTOR_CATALOG.copy()
def list_protocols() -> Dict[str, Dict[str, Any]]:
"""Return the protocol catalog."""
return PROTOCOL_CATALOG.copy()
# =============================================================================
# CLI
# =============================================================================
def main():
"""CLI for study wizard."""
import argparse
parser = argparse.ArgumentParser(description="Atomizer Study Creation Wizard")
parser.add_argument('--name', required=True, help='Study name')
parser.add_argument('--description', default='', help='Study description')
parser.add_argument('--prt', required=True, help='Path to PRT file')
parser.add_argument('--list-extractors', action='store_true', help='List available extractors')
args = parser.parse_args()
if args.list_extractors:
print("\nAvailable Extractors:")
print("=" * 60)
for name, info in EXTRACTOR_CATALOG.items():
print(f"\n{name}:")
print(f" Input: {info.get('input', 'N/A')}")
print(f" Output: {info.get('output_unit', 'N/A')}")
print(f" {info.get('description', '')}")
return 0
# Interactive wizard would go here
print(f"\nStudy Wizard initialized for: {args.name}")
print("Use the Python API for full functionality.")
return 0
if __name__ == "__main__":
exit(main())