feat: Implement Agentic Architecture for robust session workflows

Phase 1 - Session Bootstrap:
- Add .claude/ATOMIZER_CONTEXT.md as single entry point for new sessions
- Add study state detection and task routing

Phase 2 - Code Deduplication:
- Add optimization_engine/base_runner.py (ConfigDrivenRunner)
- Add optimization_engine/generic_surrogate.py (ConfigDrivenSurrogate)
- Add optimization_engine/study_state.py for study detection
- Add optimization_engine/templates/ with registry and templates
- Studies now require ~50 lines instead of ~300

Phase 3 - Skill Consolidation:
- Add YAML frontmatter metadata to all skills (versioning, dependencies)
- Consolidate create-study.md into core/study-creation-core.md
- Update 00_BOOTSTRAP.md, 01_CHEATSHEET.md, 02_CONTEXT_LOADER.md

Phase 4 - Self-Expanding Knowledge:
- Add optimization_engine/auto_doc.py for auto-generating documentation
- Generate docs/generated/EXTRACTORS.md (27 extractors documented)
- Generate docs/generated/TEMPLATES.md (6 templates)
- Generate docs/generated/EXTRACTOR_CHEATSHEET.md

Phase 5 - Subagent Implementation:
- Add .claude/commands/study-builder.md (create studies)
- Add .claude/commands/nx-expert.md (NX Open API)
- Add .claude/commands/protocol-auditor.md (config validation)
- Add .claude/commands/results-analyzer.md (results analysis)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Antoine
2025-12-07 14:52:25 -05:00
parent 6cf12d9344
commit 0e04457539
22 changed files with 4708 additions and 2212 deletions

View File

@@ -0,0 +1,341 @@
"""
Auto-Documentation Generator for Atomizer
This module automatically generates documentation from code, ensuring
that skills and protocols stay in sync with the implementation.
Usage:
python -m optimization_engine.auto_doc extractors
python -m optimization_engine.auto_doc templates
python -m optimization_engine.auto_doc all
"""
import inspect
import importlib
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional
def get_extractor_info() -> List[Dict[str, Any]]:
"""Extract information about all registered extractors."""
from optimization_engine import extractors
extractor_info = []
# Get all exported functions
for name in extractors.__all__:
obj = getattr(extractors, name)
if callable(obj):
# Get function signature
try:
sig = inspect.signature(obj)
params = [
{
'name': p.name,
'default': str(p.default) if p.default != inspect.Parameter.empty else None,
'annotation': str(p.annotation) if p.annotation != inspect.Parameter.empty else None
}
for p in sig.parameters.values()
]
except (ValueError, TypeError):
params = []
# Get docstring
doc = inspect.getdoc(obj) or "No documentation available"
# Determine category
category = "general"
if "stress" in name.lower():
category = "stress"
elif "temperature" in name.lower() or "thermal" in name.lower() or "heat" in name.lower():
category = "thermal"
elif "modal" in name.lower() or "frequency" in name.lower():
category = "modal"
elif "zernike" in name.lower():
category = "optical"
elif "mass" in name.lower():
category = "mass"
elif "strain" in name.lower():
category = "strain"
elif "spc" in name.lower() or "reaction" in name.lower() or "force" in name.lower():
category = "forces"
# Determine phase
phase = "Phase 1"
if name in ['extract_principal_stress', 'extract_max_principal_stress',
'extract_min_principal_stress', 'extract_strain_energy',
'extract_total_strain_energy', 'extract_strain_energy_density',
'extract_spc_forces', 'extract_total_reaction_force',
'extract_reaction_component', 'check_force_equilibrium']:
phase = "Phase 2"
elif name in ['extract_temperature', 'extract_temperature_gradient',
'extract_heat_flux', 'get_max_temperature',
'extract_modal_mass', 'extract_frequencies',
'get_first_frequency', 'get_modal_mass_ratio']:
phase = "Phase 3"
extractor_info.append({
'name': name,
'module': obj.__module__,
'category': category,
'phase': phase,
'parameters': params,
'docstring': doc,
'is_class': inspect.isclass(obj)
})
return extractor_info
def get_template_info() -> List[Dict[str, Any]]:
"""Extract information about available study templates."""
templates_file = Path(__file__).parent / 'templates' / 'registry.json'
if not templates_file.exists():
return []
with open(templates_file) as f:
data = json.load(f)
return data.get('templates', [])
def generate_extractor_markdown(extractors: List[Dict[str, Any]]) -> str:
"""Generate markdown documentation for extractors."""
lines = [
"# Atomizer Extractor Library",
"",
f"*Auto-generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*",
"",
"This document is automatically generated from the extractor source code.",
"",
"---",
"",
"## Quick Reference",
"",
"| Extractor | Category | Phase | Description |",
"|-----------|----------|-------|-------------|",
]
for ext in sorted(extractors, key=lambda x: (x['category'], x['name'])):
doc_first_line = ext['docstring'].split('\n')[0][:60]
lines.append(f"| `{ext['name']}` | {ext['category']} | {ext['phase']} | {doc_first_line} |")
lines.extend(["", "---", ""])
# Group by category
categories = {}
for ext in extractors:
cat = ext['category']
if cat not in categories:
categories[cat] = []
categories[cat].append(ext)
for cat_name, cat_extractors in sorted(categories.items()):
lines.append(f"## {cat_name.title()} Extractors")
lines.append("")
for ext in sorted(cat_extractors, key=lambda x: x['name']):
lines.append(f"### `{ext['name']}`")
lines.append("")
lines.append(f"**Module**: `{ext['module']}`")
lines.append(f"**Phase**: {ext['phase']}")
lines.append("")
# Parameters
if ext['parameters']:
lines.append("**Parameters**:")
lines.append("")
for param in ext['parameters']:
default_str = f" = `{param['default']}`" if param['default'] else ""
lines.append(f"- `{param['name']}`{default_str}")
lines.append("")
# Docstring
lines.append("**Description**:")
lines.append("")
lines.append("```")
lines.append(ext['docstring'])
lines.append("```")
lines.append("")
lines.append("---")
lines.append("")
return '\n'.join(lines)
def generate_template_markdown(templates: List[Dict[str, Any]]) -> str:
"""Generate markdown documentation for templates."""
lines = [
"# Atomizer Study Templates",
"",
f"*Auto-generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*",
"",
"Available templates for quick study creation.",
"",
"---",
"",
"## Template Reference",
"",
"| Template | Objectives | Extractors |",
"|----------|------------|------------|",
]
for tmpl in templates:
# Handle objectives that might be dicts or strings
obj_list = tmpl.get('objectives', [])
if obj_list and isinstance(obj_list[0], dict):
objectives = ', '.join([o.get('name', str(o)) for o in obj_list])
else:
objectives = ', '.join(obj_list)
extractors = ', '.join(tmpl.get('extractors', []))
lines.append(f"| `{tmpl['name']}` | {objectives} | {extractors} |")
lines.extend(["", "---", ""])
for tmpl in templates:
lines.append(f"## {tmpl['name']}")
lines.append("")
lines.append(f"**Description**: {tmpl.get('description', 'N/A')}")
lines.append("")
lines.append(f"**Category**: {tmpl.get('category', 'N/A')}")
lines.append(f"**Solver**: {tmpl.get('solver', 'N/A')}")
lines.append(f"**Sampler**: {tmpl.get('sampler', 'N/A')}")
lines.append(f"**Turbo Suitable**: {'Yes' if tmpl.get('turbo_suitable') else 'No'}")
lines.append("")
lines.append(f"**Example Study**: `{tmpl.get('example_study', 'N/A')}`")
lines.append("")
if tmpl.get('objectives'):
lines.append("**Objectives**:")
for obj in tmpl['objectives']:
if isinstance(obj, dict):
lines.append(f"- {obj.get('name', '?')} ({obj.get('direction', '?')}) - Extractor: {obj.get('extractor', '?')}")
else:
lines.append(f"- {obj}")
lines.append("")
if tmpl.get('extractors'):
lines.append("**Extractors Used**:")
for ext in tmpl['extractors']:
lines.append(f"- {ext}")
lines.append("")
if tmpl.get('recommended_trials'):
lines.append("**Recommended Trials**:")
for key, val in tmpl['recommended_trials'].items():
lines.append(f"- {key}: {val}")
lines.append("")
lines.append("---")
lines.append("")
return '\n'.join(lines)
def generate_cheatsheet_update(extractors: List[Dict[str, Any]]) -> str:
"""Generate the extractor quick reference for 01_CHEATSHEET.md."""
lines = [
"## Extractor Quick Reference",
"",
"| Physics | Extractor | Function Call |",
"|---------|-----------|---------------|",
]
# Map categories to physics names
physics_map = {
'stress': 'Von Mises stress',
'thermal': 'Temperature',
'modal': 'Natural frequency',
'optical': 'Zernike WFE',
'mass': 'Mass',
'strain': 'Strain energy',
'forces': 'Reaction forces',
'general': 'Displacement',
}
for ext in sorted(extractors, key=lambda x: x['category']):
if ext['is_class']:
continue
physics = physics_map.get(ext['category'], ext['category'])
# Build function call example
params = ext['parameters'][:2] if ext['parameters'] else []
param_str = ', '.join([p['name'] for p in params])
lines.append(f"| {physics} | {ext['name']} | `{ext['name']}({param_str})` |")
return '\n'.join(lines)
def update_atomizer_context(extractors: List[Dict[str, Any]], templates: List[Dict[str, Any]]):
"""Update ATOMIZER_CONTEXT.md with current extractor count."""
context_file = Path(__file__).parent.parent / '.claude' / 'ATOMIZER_CONTEXT.md'
if not context_file.exists():
print(f"Warning: {context_file} not found")
return
content = context_file.read_text()
# Update extractor library version based on count
extractor_count = len(extractors)
template_count = len(templates)
print(f"Found {extractor_count} extractors and {template_count} templates")
# Could add logic here to update version info based on changes
def main():
import sys
if len(sys.argv) < 2:
print("Usage: python -m optimization_engine.auto_doc [extractors|templates|all]")
sys.exit(1)
command = sys.argv[1]
output_dir = Path(__file__).parent.parent / 'docs' / 'generated'
output_dir.mkdir(parents=True, exist_ok=True)
if command in ['extractors', 'all']:
print("Generating extractor documentation...")
extractors = get_extractor_info()
# Write full documentation
doc_content = generate_extractor_markdown(extractors)
(output_dir / 'EXTRACTORS.md').write_text(doc_content)
print(f" Written: {output_dir / 'EXTRACTORS.md'}")
# Write cheatsheet update
cheatsheet = generate_cheatsheet_update(extractors)
(output_dir / 'EXTRACTOR_CHEATSHEET.md').write_text(cheatsheet)
print(f" Written: {output_dir / 'EXTRACTOR_CHEATSHEET.md'}")
print(f" Found {len(extractors)} extractors")
if command in ['templates', 'all']:
print("Generating template documentation...")
templates = get_template_info()
if templates:
doc_content = generate_template_markdown(templates)
(output_dir / 'TEMPLATES.md').write_text(doc_content)
print(f" Written: {output_dir / 'TEMPLATES.md'}")
print(f" Found {len(templates)} templates")
else:
print(" No templates found")
if command == 'all':
print("\nUpdating ATOMIZER_CONTEXT.md...")
extractors = get_extractor_info()
templates = get_template_info()
update_atomizer_context(extractors, templates)
print("\nDone!")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,598 @@
"""
BaseOptimizationRunner - Unified base class for all optimization studies.
This module eliminates ~4,200 lines of duplicated code across study run_optimization.py files
by providing a config-driven optimization runner.
Usage:
# In study's run_optimization.py (now ~50 lines instead of ~300):
from optimization_engine.base_runner import ConfigDrivenRunner
runner = ConfigDrivenRunner(__file__)
runner.run()
Or for custom extraction logic:
from optimization_engine.base_runner import BaseOptimizationRunner
class MyStudyRunner(BaseOptimizationRunner):
def extract_objectives(self, op2_file, dat_file, design_vars):
# Custom extraction logic
return {'mass': ..., 'stress': ..., 'stiffness': ...}
runner = MyStudyRunner(__file__)
runner.run()
"""
from pathlib import Path
import sys
import json
import argparse
from datetime import datetime
from typing import Dict, Any, Optional, Tuple, List, Callable
from abc import ABC, abstractmethod
import importlib
import optuna
from optuna.samplers import NSGAIISampler, TPESampler
class ConfigNormalizer:
"""
Normalizes different config formats to a standard internal format.
Handles variations like:
- 'parameter' vs 'name' for variable names
- 'bounds' vs 'min'/'max' for ranges
- 'goal' vs 'direction' for objective direction
"""
@staticmethod
def normalize_config(config: Dict) -> Dict:
"""Convert any config format to standardized format."""
normalized = {
'study_name': config.get('study_name', 'unnamed_study'),
'description': config.get('description', ''),
'design_variables': [],
'objectives': [],
'constraints': [],
'simulation': {},
'optimization': {},
'neural_acceleration': config.get('neural_acceleration', {}),
}
# Normalize design variables
for var in config.get('design_variables', []):
normalized['design_variables'].append({
'name': var.get('parameter') or var.get('name'),
'type': var.get('type', 'continuous'),
'min': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[0] if 'bounds' in var else var.get('min', 0),
'max': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[1] if 'bounds' in var else var.get('max', 1),
'units': var.get('units', ''),
'description': var.get('description', ''),
})
# Normalize objectives
for obj in config.get('objectives', []):
normalized['objectives'].append({
'name': obj.get('name'),
'direction': obj.get('goal') or obj.get('direction', 'minimize'),
'description': obj.get('description', ''),
'extraction': obj.get('extraction', {}),
})
# Normalize constraints
for con in config.get('constraints', []):
normalized['constraints'].append({
'name': con.get('name'),
'type': con.get('type', 'less_than'),
'value': con.get('threshold') or con.get('value', 0),
'units': con.get('units', ''),
'description': con.get('description', ''),
'extraction': con.get('extraction', {}),
})
# Normalize simulation settings
sim = config.get('simulation', {})
normalized['simulation'] = {
'prt_file': sim.get('prt_file') or sim.get('model_file', ''),
'sim_file': sim.get('sim_file', ''),
'fem_file': sim.get('fem_file', ''),
'dat_file': sim.get('dat_file', ''),
'op2_file': sim.get('op2_file', ''),
'solution_name': sim.get('solution_name', 'Solution 1'),
'solver': sim.get('solver', 'nastran'),
}
# Normalize optimization settings
opt = config.get('optimization', config.get('optimization_settings', {}))
normalized['optimization'] = {
'algorithm': opt.get('algorithm') or opt.get('sampler', 'NSGAIISampler'),
'n_trials': opt.get('n_trials', 100),
'population_size': opt.get('population_size', 20),
'seed': opt.get('seed', 42),
'timeout_per_trial': opt.get('timeout_per_trial', 600),
}
return normalized
class BaseOptimizationRunner(ABC):
"""
Abstract base class for optimization runners.
Subclasses must implement extract_objectives() to define how
physics results are extracted from FEA output files.
"""
def __init__(self, script_path: str, config_path: Optional[str] = None):
"""
Initialize the runner.
Args:
script_path: Path to the study's run_optimization.py (__file__)
config_path: Optional explicit path to config file
"""
self.study_dir = Path(script_path).parent
self.config_path = Path(config_path) if config_path else self._find_config()
self.model_dir = self.study_dir / "1_setup" / "model"
self.results_dir = self.study_dir / "2_results"
# Load and normalize config
with open(self.config_path, 'r') as f:
self.raw_config = json.load(f)
self.config = ConfigNormalizer.normalize_config(self.raw_config)
self.study_name = self.config['study_name']
self.logger = None
self.nx_solver = None
def _find_config(self) -> Path:
"""Find the optimization config file."""
candidates = [
self.study_dir / "optimization_config.json",
self.study_dir / "1_setup" / "optimization_config.json",
]
for path in candidates:
if path.exists():
return path
raise FileNotFoundError(f"No optimization_config.json found in {self.study_dir}")
def _setup(self):
"""Initialize solver and logger."""
# Add project root to path
project_root = self.study_dir.parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from optimization_engine.nx_solver import NXSolver
from optimization_engine.logger import get_logger
self.results_dir.mkdir(exist_ok=True)
self.logger = get_logger(self.study_name, study_dir=self.results_dir)
self.nx_solver = NXSolver(nastran_version="2506")
def sample_design_variables(self, trial: optuna.Trial) -> Dict[str, float]:
"""Sample design variables from the config."""
design_vars = {}
for var in self.config['design_variables']:
name = var['name']
if var['type'] == 'integer':
design_vars[name] = trial.suggest_int(name, int(var['min']), int(var['max']))
else:
design_vars[name] = trial.suggest_float(name, var['min'], var['max'])
return design_vars
def run_simulation(self, design_vars: Dict[str, float]) -> Dict[str, Any]:
"""Run the FEA simulation with given design variables."""
sim_file = self.model_dir / self.config['simulation']['sim_file']
result = self.nx_solver.run_simulation(
sim_file=sim_file,
working_dir=self.model_dir,
expression_updates=design_vars,
solution_name=self.config['simulation'].get('solution_name'),
cleanup=True
)
return result
@abstractmethod
def extract_objectives(self, op2_file: Path, dat_file: Path,
design_vars: Dict[str, float]) -> Dict[str, float]:
"""
Extract objective values from FEA results.
Args:
op2_file: Path to OP2 results file
dat_file: Path to DAT/BDF file
design_vars: Design variable values for this trial
Returns:
Dictionary of objective names to values
"""
pass
def check_constraints(self, objectives: Dict[str, float],
op2_file: Path) -> Tuple[bool, Dict[str, float]]:
"""
Check if constraints are satisfied.
Returns:
Tuple of (feasible, constraint_values)
"""
feasible = True
constraint_values = {}
for con in self.config['constraints']:
name = con['name']
threshold = con['value']
con_type = con['type']
# Try to get constraint value from objectives or extract
if name in objectives:
value = objectives[name]
elif 'stress' in name.lower() and 'stress' in objectives:
value = objectives['stress']
elif 'displacement' in name.lower() and 'displacement' in objectives:
value = objectives['displacement']
else:
# Need to extract separately
value = 0 # Default
constraint_values[name] = value
if con_type == 'less_than' and value > threshold:
feasible = False
self.logger.warning(f' Constraint violation: {name} = {value:.2f} > {threshold}')
elif con_type == 'greater_than' and value < threshold:
feasible = False
self.logger.warning(f' Constraint violation: {name} = {value:.2f} < {threshold}')
return feasible, constraint_values
def objective_function(self, trial: optuna.Trial) -> Tuple[float, ...]:
"""
Main objective function for Optuna optimization.
Returns tuple of objective values for multi-objective optimization.
"""
design_vars = self.sample_design_variables(trial)
self.logger.trial_start(trial.number, design_vars)
try:
# Run simulation
result = self.run_simulation(design_vars)
if not result['success']:
self.logger.trial_failed(trial.number, f"Simulation failed: {result.get('error', 'Unknown')}")
return tuple([float('inf')] * len(self.config['objectives']))
op2_file = result['op2_file']
dat_file = self.model_dir / self.config['simulation']['dat_file']
# Extract objectives
objectives = self.extract_objectives(op2_file, dat_file, design_vars)
# Check constraints
feasible, constraint_values = self.check_constraints(objectives, op2_file)
# Set user attributes
for name, value in objectives.items():
trial.set_user_attr(name, value)
trial.set_user_attr('feasible', feasible)
self.logger.trial_complete(trial.number, objectives, constraint_values, feasible)
# Return objectives in order, converting maximize to minimize
obj_values = []
for obj_config in self.config['objectives']:
name = obj_config['name']
value = objectives.get(name, float('inf'))
if obj_config['direction'] == 'maximize':
value = -value # Negate for maximization
obj_values.append(value)
return tuple(obj_values)
except Exception as e:
self.logger.trial_failed(trial.number, str(e))
return tuple([float('inf')] * len(self.config['objectives']))
def get_sampler(self):
"""Get the appropriate Optuna sampler based on config."""
alg = self.config['optimization']['algorithm']
pop_size = self.config['optimization']['population_size']
seed = self.config['optimization']['seed']
if 'NSGA' in alg.upper():
return NSGAIISampler(population_size=pop_size, seed=seed)
elif 'TPE' in alg.upper():
return TPESampler(seed=seed)
else:
return NSGAIISampler(population_size=pop_size, seed=seed)
def get_directions(self) -> List[str]:
"""Get optimization directions for all objectives."""
# All directions are 'minimize' since we negate maximize objectives
return ['minimize'] * len(self.config['objectives'])
def clean_nastran_files(self):
"""Remove old Nastran solver output files."""
patterns = ['*.op2', '*.f06', '*.log', '*.f04', '*.pch', '*.DBALL', '*.MASTER', '_temp*.txt']
deleted = []
for pattern in patterns:
for f in self.model_dir.glob(pattern):
try:
f.unlink()
deleted.append(f)
self.logger.info(f" Deleted: {f.name}")
except Exception as e:
self.logger.warning(f" Failed to delete {f.name}: {e}")
return deleted
def print_study_info(self):
"""Print study information to console."""
print("\n" + "=" * 60)
print(f" {self.study_name.upper()}")
print("=" * 60)
print(f"\nDescription: {self.config['description']}")
print(f"\nDesign Variables ({len(self.config['design_variables'])}):")
for var in self.config['design_variables']:
print(f" - {var['name']}: {var['min']}-{var['max']} {var['units']}")
print(f"\nObjectives ({len(self.config['objectives'])}):")
for obj in self.config['objectives']:
print(f" - {obj['name']}: {obj['direction']}")
print(f"\nConstraints ({len(self.config['constraints'])}):")
for c in self.config['constraints']:
print(f" - {c['name']}: < {c['value']} {c['units']}")
print()
def run(self, args=None):
"""
Main entry point for running optimization.
Args:
args: Optional argparse Namespace. If None, will parse sys.argv
"""
if args is None:
args = self.parse_args()
self._setup()
if args.clean:
self.clean_nastran_files()
self.print_study_info()
# Determine number of trials and storage
if args.discover:
n_trials = 1
storage = f"sqlite:///{self.results_dir / 'study_test.db'}"
study_suffix = "_discover"
elif args.validate:
n_trials = 1
storage = f"sqlite:///{self.results_dir / 'study_test.db'}"
study_suffix = "_validate"
elif args.test:
n_trials = 3
storage = f"sqlite:///{self.results_dir / 'study_test.db'}"
study_suffix = "_test"
else:
n_trials = args.trials
storage = f"sqlite:///{self.results_dir / 'study.db'}"
study_suffix = ""
# Create or load study
full_study_name = f"{self.study_name}{study_suffix}"
if args.resume and study_suffix == "":
study = optuna.load_study(
study_name=self.study_name,
storage=storage,
sampler=self.get_sampler()
)
print(f"\nResuming study with {len(study.trials)} existing trials...")
else:
study = optuna.create_study(
study_name=full_study_name,
storage=storage,
sampler=self.get_sampler(),
directions=self.get_directions(),
load_if_exists=(study_suffix == "")
)
# Run optimization
if study_suffix == "":
self.logger.study_start(self.study_name, n_trials,
self.config['optimization']['algorithm'])
print(f"\nRunning {n_trials} trials...")
study.optimize(
self.objective_function,
n_trials=n_trials,
show_progress_bar=True
)
# Report results
n_complete = len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE])
if study_suffix == "":
self.logger.study_complete(self.study_name, len(study.trials), n_complete)
print("\n" + "=" * 60)
print(" COMPLETE!")
print("=" * 60)
print(f"\nTotal trials: {len(study.trials)}")
print(f"Successful: {n_complete}")
if hasattr(study, 'best_trials'):
print(f"Pareto front: {len(study.best_trials)} solutions")
if study_suffix == "":
print("\nNext steps:")
print(" 1. Run method selector:")
print(f" python -m optimization_engine.method_selector {self.config_path.relative_to(self.study_dir)} 2_results/study.db")
print(" 2. If turbo recommended, run neural acceleration")
return 0
def parse_args(self) -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=f'{self.study_name} - Optimization')
stage_group = parser.add_mutually_exclusive_group()
stage_group.add_argument('--discover', action='store_true', help='Discover model outputs (1 trial)')
stage_group.add_argument('--validate', action='store_true', help='Run single validation trial')
stage_group.add_argument('--test', action='store_true', help='Run 3-trial test')
stage_group.add_argument('--run', action='store_true', help='Run full optimization')
parser.add_argument('--trials', type=int,
default=self.config['optimization']['n_trials'],
help='Number of trials')
parser.add_argument('--resume', action='store_true', help='Resume existing study')
parser.add_argument('--clean', action='store_true', help='Clean old files first')
args = parser.parse_args()
if not any([args.discover, args.validate, args.test, args.run]):
print("No stage specified. Use --discover, --validate, --test, or --run")
print("\nTypical workflow:")
print(" 1. python run_optimization.py --discover # Discover model outputs")
print(" 2. python run_optimization.py --validate # Single trial validation")
print(" 3. python run_optimization.py --test # Quick 3-trial test")
print(f" 4. python run_optimization.py --run --trials {self.config['optimization']['n_trials']} # Full run")
sys.exit(1)
return args
class ConfigDrivenRunner(BaseOptimizationRunner):
"""
Fully config-driven optimization runner.
Automatically extracts objectives based on config file definitions.
Supports standard extractors: mass, stress, displacement, stiffness.
"""
def __init__(self, script_path: str, config_path: Optional[str] = None,
element_type: str = 'auto'):
"""
Initialize config-driven runner.
Args:
script_path: Path to the study's script (__file__)
config_path: Optional explicit path to config
element_type: Element type for stress extraction ('ctetra', 'cquad4', 'auto')
"""
super().__init__(script_path, config_path)
self.element_type = element_type
self._extractors_loaded = False
self._extractors = {}
def _load_extractors(self):
"""Lazy-load extractor functions."""
if self._extractors_loaded:
return
from optimization_engine.extractors.bdf_mass_extractor import extract_mass_from_bdf
from optimization_engine.extractors.extract_displacement import extract_displacement
from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress
self._extractors = {
'extract_mass_from_bdf': extract_mass_from_bdf,
'extract_displacement': extract_displacement,
'extract_solid_stress': extract_solid_stress,
}
self._extractors_loaded = True
def _detect_element_type(self, dat_file: Path) -> str:
"""Auto-detect element type from BDF/DAT file."""
if self.element_type != 'auto':
return self.element_type
try:
with open(dat_file, 'r') as f:
content = f.read(50000) # Read first 50KB
if 'CTETRA' in content:
return 'ctetra'
elif 'CHEXA' in content:
return 'chexa'
elif 'CQUAD4' in content:
return 'cquad4'
elif 'CTRIA3' in content:
return 'ctria3'
else:
return 'ctetra' # Default
except Exception:
return 'ctetra'
def extract_objectives(self, op2_file: Path, dat_file: Path,
design_vars: Dict[str, float]) -> Dict[str, float]:
"""
Extract all objectives based on config.
Handles common objectives: mass, stress, displacement, stiffness
"""
self._load_extractors()
objectives = {}
element_type = self._detect_element_type(dat_file)
for obj_config in self.config['objectives']:
name = obj_config['name'].lower()
try:
if 'mass' in name:
objectives[obj_config['name']] = self._extractors['extract_mass_from_bdf'](str(dat_file))
self.logger.info(f" {obj_config['name']}: {objectives[obj_config['name']]:.2f} kg")
elif 'stress' in name:
stress_result = self._extractors['extract_solid_stress'](
op2_file, subcase=1, element_type=element_type
)
# Convert kPa to MPa
stress_mpa = stress_result.get('max_von_mises', float('inf')) / 1000.0
objectives[obj_config['name']] = stress_mpa
self.logger.info(f" {obj_config['name']}: {stress_mpa:.2f} MPa")
elif 'displacement' in name:
disp_result = self._extractors['extract_displacement'](op2_file, subcase=1)
objectives[obj_config['name']] = disp_result['max_displacement']
self.logger.info(f" {obj_config['name']}: {disp_result['max_displacement']:.3f} mm")
elif 'stiffness' in name:
disp_result = self._extractors['extract_displacement'](op2_file, subcase=1)
max_disp = disp_result['max_displacement']
applied_force = 1000.0 # N - standard assumption
stiffness = applied_force / max(abs(max_disp), 1e-6)
objectives[obj_config['name']] = stiffness
objectives['displacement'] = max_disp # Store for constraint check
self.logger.info(f" {obj_config['name']}: {stiffness:.1f} N/mm")
self.logger.info(f" displacement: {max_disp:.3f} mm")
else:
self.logger.warning(f" Unknown objective: {name}")
objectives[obj_config['name']] = float('inf')
except Exception as e:
self.logger.error(f" Failed to extract {name}: {e}")
objectives[obj_config['name']] = float('inf')
return objectives
def create_runner(script_path: str, element_type: str = 'auto') -> ConfigDrivenRunner:
"""
Factory function to create a ConfigDrivenRunner.
Args:
script_path: Path to the study's run_optimization.py (__file__)
element_type: Element type for stress extraction
Returns:
Configured runner ready to execute
"""
return ConfigDrivenRunner(script_path, element_type=element_type)

View File

@@ -0,0 +1,834 @@
"""
GenericSurrogate - Config-driven neural network surrogate for optimization.
This module eliminates ~2,800 lines of duplicated code across study run_nn_optimization.py files
by providing a fully config-driven neural surrogate system.
Usage:
# In study's run_nn_optimization.py (now ~30 lines instead of ~600):
from optimization_engine.generic_surrogate import ConfigDrivenSurrogate
surrogate = ConfigDrivenSurrogate(__file__)
surrogate.run() # Handles --train, --turbo, --all flags automatically
"""
from pathlib import Path
import sys
import json
import argparse
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
import time
import numpy as np
# Conditional PyTorch import
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, TensorDataset
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
import optuna
from optuna.samplers import NSGAIISampler
class MLPSurrogate(nn.Module):
"""
Generic MLP architecture for surrogate modeling.
Architecture: Input -> [Linear -> LayerNorm -> ReLU -> Dropout] * N -> Output
"""
def __init__(self, n_inputs: int, n_outputs: int,
hidden_dims: List[int] = None, dropout: float = 0.1):
super().__init__()
if hidden_dims is None:
# Default architecture scales with problem size
hidden_dims = [64, 128, 128, 64]
layers = []
prev_dim = n_inputs
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU(),
nn.Dropout(dropout)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, n_outputs))
self.network = nn.Sequential(*layers)
# Initialize weights
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
def forward(self, x):
return self.network(x)
class GenericSurrogate:
"""
Config-driven neural surrogate for FEA optimization.
Automatically adapts to any number of design variables and objectives
based on the optimization_config.json file.
"""
def __init__(self, config: Dict, device: str = 'auto'):
"""
Initialize surrogate from config.
Args:
config: Normalized config dictionary
device: 'auto', 'cuda', or 'cpu'
"""
if not TORCH_AVAILABLE:
raise ImportError("PyTorch required for neural surrogate")
self.config = config
self.device = torch.device(
'cuda' if torch.cuda.is_available() and device == 'auto' else 'cpu'
)
# Extract variable and objective info from config
self.design_var_names = [v['name'] for v in config['design_variables']]
self.design_var_bounds = {
v['name']: (v['min'], v['max'])
for v in config['design_variables']
}
self.design_var_types = {
v['name']: v.get('type', 'continuous')
for v in config['design_variables']
}
self.objective_names = [o['name'] for o in config['objectives']]
self.n_inputs = len(self.design_var_names)
self.n_outputs = len(self.objective_names)
self.model = None
self.normalization = None
def _get_hidden_dims(self) -> List[int]:
"""Calculate hidden layer dimensions based on problem size."""
n = self.n_inputs
if n <= 3:
return [32, 64, 32]
elif n <= 6:
return [64, 128, 128, 64]
elif n <= 10:
return [128, 256, 256, 128]
else:
return [256, 512, 512, 256]
def train_from_database(self, db_path: Path, study_name: str,
epochs: int = 300, validation_split: float = 0.2,
batch_size: int = 16, learning_rate: float = 0.001,
save_path: Path = None, verbose: bool = True):
"""
Train surrogate from Optuna database.
Args:
db_path: Path to study.db
study_name: Name of the Optuna study
epochs: Number of training epochs
validation_split: Fraction of data for validation
batch_size: Training batch size
learning_rate: Initial learning rate
save_path: Where to save the trained model
verbose: Print training progress
"""
if verbose:
print(f"\n{'='*60}")
print(f"Training Generic Surrogate ({self.n_inputs} inputs -> {self.n_outputs} outputs)")
print(f"{'='*60}")
print(f"Device: {self.device}")
print(f"Database: {db_path}")
# Load data from Optuna
storage = optuna.storages.RDBStorage(f"sqlite:///{db_path}")
study = optuna.load_study(study_name=study_name, storage=storage)
completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
if verbose:
print(f"Found {len(completed)} completed trials")
if len(completed) < 10:
raise ValueError(f"Need at least 10 trials for training, got {len(completed)}")
# Extract training data
design_params = []
objectives = []
for trial in completed:
# Skip inf values
if any(v == float('inf') or v != v for v in trial.values): # nan check
continue
params = [trial.params.get(name, 0) for name in self.design_var_names]
objs = list(trial.values)
design_params.append(params)
objectives.append(objs)
design_params = np.array(design_params, dtype=np.float32)
objectives = np.array(objectives, dtype=np.float32)
if verbose:
print(f"Valid samples: {len(design_params)}")
print(f"\nDesign variable ranges:")
for i, name in enumerate(self.design_var_names):
print(f" {name}: {design_params[:, i].min():.2f} - {design_params[:, i].max():.2f}")
print(f"\nObjective ranges:")
for i, name in enumerate(self.objective_names):
print(f" {name}: {objectives[:, i].min():.4f} - {objectives[:, i].max():.4f}")
# Compute normalization parameters
design_mean = design_params.mean(axis=0)
design_std = design_params.std(axis=0) + 1e-8
objective_mean = objectives.mean(axis=0)
objective_std = objectives.std(axis=0) + 1e-8
self.normalization = {
'design_mean': design_mean,
'design_std': design_std,
'objective_mean': objective_mean,
'objective_std': objective_std
}
# Normalize data
X = (design_params - design_mean) / design_std
Y = (objectives - objective_mean) / objective_std
X_tensor = torch.tensor(X, dtype=torch.float32)
Y_tensor = torch.tensor(Y, dtype=torch.float32)
# Create datasets
dataset = TensorDataset(X_tensor, Y_tensor)
n_val = max(1, int(len(dataset) * validation_split))
n_train = len(dataset) - n_val
train_ds, val_ds = random_split(dataset, [n_train, n_val])
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
if verbose:
print(f"\nTraining: {n_train} samples, Validation: {n_val} samples")
# Build model
hidden_dims = self._get_hidden_dims()
self.model = MLPSurrogate(
n_inputs=self.n_inputs,
n_outputs=self.n_outputs,
hidden_dims=hidden_dims
).to(self.device)
n_params = sum(p.numel() for p in self.model.parameters())
if verbose:
print(f"Model architecture: {self.n_inputs} -> {hidden_dims} -> {self.n_outputs}")
print(f"Total parameters: {n_params:,}")
# Training setup
optimizer = torch.optim.AdamW(self.model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
best_val_loss = float('inf')
best_state = None
if verbose:
print(f"\nTraining for {epochs} epochs...")
for epoch in range(epochs):
# Training
self.model.train()
train_loss = 0.0
for x, y in train_loader:
x, y = x.to(self.device), y.to(self.device)
optimizer.zero_grad()
pred = self.model(x)
loss = F.mse_loss(pred, y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# Validation
self.model.eval()
val_loss = 0.0
with torch.no_grad():
for x, y in val_loader:
x, y = x.to(self.device), y.to(self.device)
pred = self.model(x)
val_loss += F.mse_loss(pred, y).item()
val_loss /= len(val_loader)
scheduler.step()
if val_loss < best_val_loss:
best_val_loss = val_loss
best_state = self.model.state_dict().copy()
if verbose and ((epoch + 1) % 50 == 0 or epoch == 0):
print(f" Epoch {epoch+1:3d}: train={train_loss:.6f}, val={val_loss:.6f}")
# Load best model
self.model.load_state_dict(best_state)
if verbose:
print(f"\nBest validation loss: {best_val_loss:.6f}")
# Final evaluation
self._print_validation_metrics(val_loader)
# Save model
if save_path:
self.save(save_path)
return self
def _print_validation_metrics(self, val_loader):
"""Print validation accuracy metrics."""
self.model.eval()
all_preds = []
all_targets = []
with torch.no_grad():
for x, y in val_loader:
x = x.to(self.device)
pred = self.model(x).cpu().numpy()
all_preds.append(pred)
all_targets.append(y.numpy())
all_preds = np.concatenate(all_preds)
all_targets = np.concatenate(all_targets)
# Denormalize
preds_denorm = all_preds * self.normalization['objective_std'] + self.normalization['objective_mean']
targets_denorm = all_targets * self.normalization['objective_std'] + self.normalization['objective_mean']
print(f"\nValidation accuracy:")
for i, name in enumerate(self.objective_names):
mae = np.abs(preds_denorm[:, i] - targets_denorm[:, i]).mean()
mape = (np.abs(preds_denorm[:, i] - targets_denorm[:, i]) /
(np.abs(targets_denorm[:, i]) + 1e-8)).mean() * 100
print(f" {name}: MAE={mae:.4f}, MAPE={mape:.1f}%")
def predict(self, design_params: Dict[str, float]) -> Dict[str, float]:
"""
Predict objectives from design parameters.
Args:
design_params: Dictionary of design variable values
Returns:
Dictionary of predicted objective values
"""
if self.model is None:
raise ValueError("Model not trained. Call train_from_database first.")
# Build input array
x = np.array([design_params.get(name, 0) for name in self.design_var_names], dtype=np.float32)
x_norm = (x - self.normalization['design_mean']) / self.normalization['design_std']
x_tensor = torch.tensor(x_norm, dtype=torch.float32, device=self.device).unsqueeze(0)
# Predict
self.model.eval()
with torch.no_grad():
y_norm = self.model(x_tensor).cpu().numpy()[0]
# Denormalize
y = y_norm * self.normalization['objective_std'] + self.normalization['objective_mean']
return {name: float(y[i]) for i, name in enumerate(self.objective_names)}
def sample_random_design(self) -> Dict[str, float]:
"""Sample a random point in the design space."""
params = {}
for name in self.design_var_names:
low, high = self.design_var_bounds[name]
if self.design_var_types[name] == 'integer':
params[name] = float(np.random.randint(int(low), int(high) + 1))
else:
params[name] = np.random.uniform(low, high)
return params
def save(self, path: Path):
"""Save model to file."""
path = Path(path)
torch.save({
'model_state_dict': self.model.state_dict(),
'normalization': {
'design_mean': self.normalization['design_mean'].tolist(),
'design_std': self.normalization['design_std'].tolist(),
'objective_mean': self.normalization['objective_mean'].tolist(),
'objective_std': self.normalization['objective_std'].tolist()
},
'design_var_names': self.design_var_names,
'objective_names': self.objective_names,
'n_inputs': self.n_inputs,
'n_outputs': self.n_outputs,
'hidden_dims': self._get_hidden_dims()
}, path)
print(f"Model saved to {path}")
def load(self, path: Path):
"""Load model from file."""
path = Path(path)
checkpoint = torch.load(path, map_location=self.device)
hidden_dims = checkpoint.get('hidden_dims', self._get_hidden_dims())
self.model = MLPSurrogate(
n_inputs=checkpoint['n_inputs'],
n_outputs=checkpoint['n_outputs'],
hidden_dims=hidden_dims
).to(self.device)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model.eval()
norm = checkpoint['normalization']
self.normalization = {
'design_mean': np.array(norm['design_mean']),
'design_std': np.array(norm['design_std']),
'objective_mean': np.array(norm['objective_mean']),
'objective_std': np.array(norm['objective_std'])
}
self.design_var_names = checkpoint.get('design_var_names', self.design_var_names)
self.objective_names = checkpoint.get('objective_names', self.objective_names)
print(f"Model loaded from {path}")
class ConfigDrivenSurrogate:
"""
Fully config-driven neural surrogate system.
Provides complete --train, --turbo, --all workflow based on optimization_config.json.
Handles FEA validation, surrogate retraining, and result reporting automatically.
"""
def __init__(self, script_path: str, config_path: Optional[str] = None,
element_type: str = 'auto'):
"""
Initialize config-driven surrogate.
Args:
script_path: Path to study's run_nn_optimization.py (__file__)
config_path: Optional explicit path to config
element_type: Element type for stress extraction ('auto' detects from DAT file)
"""
self.study_dir = Path(script_path).parent
self.config_path = Path(config_path) if config_path else self._find_config()
self.model_dir = self.study_dir / "1_setup" / "model"
self.results_dir = self.study_dir / "2_results"
# Load config
with open(self.config_path, 'r') as f:
self.raw_config = json.load(f)
# Normalize config (reuse from base_runner)
self.config = self._normalize_config(self.raw_config)
self.study_name = self.config['study_name']
self.element_type = element_type
self.surrogate = None
self.logger = None
self.nx_solver = None
def _find_config(self) -> Path:
"""Find the optimization config file."""
candidates = [
self.study_dir / "optimization_config.json",
self.study_dir / "1_setup" / "optimization_config.json",
]
for path in candidates:
if path.exists():
return path
raise FileNotFoundError(f"No optimization_config.json found in {self.study_dir}")
def _normalize_config(self, config: Dict) -> Dict:
"""Normalize config format variations."""
# This mirrors ConfigNormalizer from base_runner.py
normalized = {
'study_name': config.get('study_name', 'unnamed_study'),
'description': config.get('description', ''),
'design_variables': [],
'objectives': [],
'constraints': [],
'simulation': {},
'neural_acceleration': config.get('neural_acceleration', {}),
}
# Normalize design variables
for var in config.get('design_variables', []):
normalized['design_variables'].append({
'name': var.get('parameter') or var.get('name'),
'type': var.get('type', 'continuous'),
'min': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[0] if 'bounds' in var else var.get('min', 0),
'max': var.get('bounds', [var.get('min', 0), var.get('max', 1)])[1] if 'bounds' in var else var.get('max', 1),
})
# Normalize objectives
for obj in config.get('objectives', []):
normalized['objectives'].append({
'name': obj.get('name'),
'direction': obj.get('goal') or obj.get('direction', 'minimize'),
})
# Normalize simulation
sim = config.get('simulation', {})
normalized['simulation'] = {
'sim_file': sim.get('sim_file', ''),
'dat_file': sim.get('dat_file', ''),
'solution_name': sim.get('solution_name', 'Solution 1'),
}
return normalized
def _setup(self):
"""Initialize solver and logger."""
project_root = self.study_dir.parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from optimization_engine.nx_solver import NXSolver
from optimization_engine.logger import get_logger
self.results_dir.mkdir(exist_ok=True)
self.logger = get_logger(self.study_name, study_dir=self.results_dir)
self.nx_solver = NXSolver(nastran_version="2506")
def _detect_element_type(self, dat_file: Path) -> str:
"""Auto-detect element type from DAT file."""
if self.element_type != 'auto':
return self.element_type
try:
with open(dat_file, 'r') as f:
content = f.read(50000)
if 'CTETRA' in content:
return 'ctetra'
elif 'CHEXA' in content:
return 'chexa'
elif 'CQUAD4' in content:
return 'cquad4'
else:
return 'ctetra'
except Exception:
return 'ctetra'
def train(self, epochs: int = 300) -> GenericSurrogate:
"""Train surrogate model from FEA database."""
print(f"\n{'='*60}")
print("PHASE: Train Surrogate Model")
print(f"{'='*60}")
self.surrogate = GenericSurrogate(self.config, device='auto')
self.surrogate.train_from_database(
db_path=self.results_dir / "study.db",
study_name=self.study_name,
epochs=epochs,
save_path=self.results_dir / "surrogate_best.pt"
)
return self.surrogate
def turbo(self, total_nn_trials: int = 5000, batch_size: int = 100,
retrain_every: int = 10, epochs: int = 150):
"""
Run TURBO mode: NN exploration + FEA validation + surrogate retraining.
Args:
total_nn_trials: Total NN trials to run
batch_size: NN trials per batch before FEA validation
retrain_every: Retrain surrogate every N FEA validations
epochs: Training epochs for surrogate
"""
from optimization_engine.extractors.bdf_mass_extractor import extract_mass_from_bdf
from optimization_engine.extractors.extract_displacement import extract_displacement
from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress
print(f"\n{'#'*60}")
print(f"# TURBO MODE: {self.study_name}")
print(f"{'#'*60}")
print(f"Design variables: {len(self.config['design_variables'])}")
print(f"Objectives: {len(self.config['objectives'])}")
print(f"Total NN budget: {total_nn_trials:,} trials")
print(f"NN batch size: {batch_size}")
print(f"Expected FEA validations: ~{total_nn_trials // batch_size}")
# Initial training
print(f"\n[INIT] Training initial surrogate...")
self.train(epochs=epochs)
sim_file = self.model_dir / self.config['simulation']['sim_file']
dat_file = self.model_dir / self.config['simulation']['dat_file']
element_type = self._detect_element_type(dat_file)
fea_count = 0
nn_count = 0
best_solutions = []
iteration = 0
start_time = time.time()
# Get objective info
obj_names = [o['name'] for o in self.config['objectives']]
obj_directions = [o['direction'] for o in self.config['objectives']]
while nn_count < total_nn_trials:
iteration += 1
batch_trials = min(batch_size, total_nn_trials - nn_count)
print(f"\n{''*50}")
print(f"Iteration {iteration}: NN trials {nn_count+1}-{nn_count+batch_trials}")
# Find best candidate via NN
best_candidate = None
best_score = float('inf')
for _ in range(batch_trials):
params = self.surrogate.sample_random_design()
pred = self.surrogate.predict(params)
# Compute score (simple weighted sum - lower is better)
score = sum(pred[name] if obj_directions[i] == 'minimize' else -pred[name]
for i, name in enumerate(obj_names))
if score < best_score:
best_score = score
best_candidate = {'params': params, 'nn_pred': pred}
nn_count += batch_trials
params = best_candidate['params']
nn_pred = best_candidate['nn_pred']
# Log NN prediction
var_str = ", ".join(f"{k}={v:.2f}" for k, v in list(params.items())[:3])
print(f" Best NN: {var_str}...")
pred_str = ", ".join(f"{k}={v:.2f}" for k, v in nn_pred.items())
print(f" NN pred: {pred_str}")
# Run FEA validation
result = self.nx_solver.run_simulation(
sim_file=sim_file,
working_dir=self.model_dir,
expression_updates=params,
solution_name=self.config['simulation'].get('solution_name'),
cleanup=True
)
if not result['success']:
print(f" FEA FAILED - skipping")
continue
# Extract FEA results
op2_file = result['op2_file']
fea_results = self._extract_fea_results(op2_file, dat_file, element_type,
extract_mass_from_bdf, extract_displacement,
extract_solid_stress)
fea_str = ", ".join(f"{k}={v:.2f}" for k, v in fea_results.items())
print(f" FEA: {fea_str}")
# Compute errors
errors = {}
for name in obj_names:
if name in fea_results and name in nn_pred and fea_results[name] != 0:
errors[name] = abs(fea_results[name] - nn_pred[name]) / abs(fea_results[name]) * 100
if errors:
err_str = ", ".join(f"{k}={v:.1f}%" for k, v in errors.items())
print(f" Error: {err_str}")
fea_count += 1
# Add to main study database
self._add_to_study(params, fea_results, iteration)
best_solutions.append({
'iteration': iteration,
'params': {k: float(v) for k, v in params.items()},
'fea': [fea_results.get(name, 0) for name in obj_names],
'nn_error': [errors.get(name, 0) for name in obj_names[:2]] # First 2 errors
})
# Retrain periodically
if fea_count % retrain_every == 0:
print(f"\n [RETRAIN] Retraining surrogate...")
self.train(epochs=epochs)
# Progress
elapsed = time.time() - start_time
rate = nn_count / elapsed if elapsed > 0 else 0
remaining = (total_nn_trials - nn_count) / rate if rate > 0 else 0
print(f" Progress: {nn_count:,}/{total_nn_trials:,} NN | {fea_count} FEA | {elapsed/60:.1f}min | ~{remaining/60:.1f}min left")
# Final summary
print(f"\n{'#'*60}")
print("# TURBO MODE COMPLETE")
print(f"{'#'*60}")
print(f"NN trials: {nn_count:,}")
print(f"FEA validations: {fea_count}")
print(f"Time: {(time.time() - start_time)/60:.1f} minutes")
# Save report
turbo_report = {
'mode': 'turbo',
'total_nn_trials': nn_count,
'fea_validations': fea_count,
'time_minutes': (time.time() - start_time) / 60,
'best_solutions': best_solutions[-20:]
}
report_path = self.results_dir / "turbo_report.json"
with open(report_path, 'w') as f:
json.dump(turbo_report, f, indent=2)
print(f"\nReport saved to {report_path}")
def _extract_fea_results(self, op2_file: Path, dat_file: Path, element_type: str,
extract_mass_from_bdf, extract_displacement, extract_solid_stress) -> Dict[str, float]:
"""Extract FEA results for all objectives."""
results = {}
for obj in self.config['objectives']:
name = obj['name'].lower()
try:
if 'mass' in name:
results[obj['name']] = extract_mass_from_bdf(str(dat_file))
elif 'stress' in name:
stress_result = extract_solid_stress(op2_file, subcase=1, element_type=element_type)
results[obj['name']] = stress_result.get('max_von_mises', float('inf')) / 1000.0
elif 'displacement' in name:
disp_result = extract_displacement(op2_file, subcase=1)
results[obj['name']] = disp_result['max_displacement']
elif 'stiffness' in name:
disp_result = extract_displacement(op2_file, subcase=1)
max_disp = disp_result['max_displacement']
# Negative for minimization in multi-objective
results[obj['name']] = -1000.0 / max(abs(max_disp), 1e-6)
results['displacement'] = max_disp
except Exception as e:
print(f" Warning: Failed to extract {name}: {e}")
results[obj['name']] = float('inf')
return results
def _add_to_study(self, params: Dict, fea_results: Dict, iteration: int):
"""Add FEA result to main Optuna study."""
try:
storage = f"sqlite:///{self.results_dir / 'study.db'}"
study = optuna.load_study(
study_name=self.study_name,
storage=storage,
sampler=NSGAIISampler(population_size=20, seed=42)
)
trial = study.ask()
for var in self.config['design_variables']:
name = var['name']
value = params[name]
if var['type'] == 'integer':
trial.suggest_int(name, int(value), int(value))
else:
trial.suggest_float(name, value, value)
# Get objective values in order
obj_values = [fea_results.get(o['name'], float('inf')) for o in self.config['objectives']]
study.tell(trial, obj_values)
trial.set_user_attr('source', 'turbo_mode')
trial.set_user_attr('iteration', iteration)
except Exception as e:
print(f" Warning: couldn't add to study: {e}")
def run(self, args=None):
"""
Main entry point with argument parsing.
Handles --train, --turbo, --all flags.
"""
if args is None:
args = self.parse_args()
self._setup()
print(f"\n{'#'*60}")
print(f"# {self.study_name} - Hybrid NN Optimization")
print(f"{'#'*60}")
if args.all or args.train:
self.train(epochs=args.epochs)
if args.all or args.turbo:
self.turbo(
total_nn_trials=args.nn_trials,
batch_size=args.batch_size,
retrain_every=args.retrain_every,
epochs=args.epochs
)
print(f"\n{'#'*60}")
print("# Workflow Complete!")
print(f"{'#'*60}\n")
return 0
def parse_args(self) -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=f'{self.study_name} - Hybrid NN Optimization')
parser.add_argument('--train', action='store_true', help='Train surrogate only')
parser.add_argument('--turbo', action='store_true', help='TURBO mode (recommended)')
parser.add_argument('--all', action='store_true', help='Train then run turbo')
nn_config = self.config.get('neural_acceleration', {})
parser.add_argument('--epochs', type=int, default=nn_config.get('epochs', 200), help='Training epochs')
parser.add_argument('--nn-trials', type=int, default=nn_config.get('nn_trials', 5000), help='Total NN trials')
parser.add_argument('--batch-size', type=int, default=100, help='NN batch size')
parser.add_argument('--retrain-every', type=int, default=10, help='Retrain every N FEA')
args = parser.parse_args()
if not any([args.train, args.turbo, args.all]):
print("No phase specified. Use --train, --turbo, or --all")
print("\nRecommended workflow:")
print(f" python run_nn_optimization.py --turbo --nn-trials {nn_config.get('nn_trials', 5000)}")
sys.exit(1)
return args
def create_surrogate(script_path: str, element_type: str = 'auto') -> ConfigDrivenSurrogate:
"""
Factory function to create a ConfigDrivenSurrogate.
Args:
script_path: Path to study's run_nn_optimization.py (__file__)
element_type: Element type for stress extraction
Returns:
Configured surrogate ready to run
"""
return ConfigDrivenSurrogate(script_path, element_type=element_type)

View File

@@ -0,0 +1,322 @@
"""
Study State Detector for Atomizer
This module provides utilities to detect and summarize the state of an optimization study.
Used by Claude sessions to quickly understand study context on initialization.
"""
import json
import sqlite3
from pathlib import Path
from typing import Dict, Any, Optional, List
from datetime import datetime
def detect_study_state(study_dir: Path) -> Dict[str, Any]:
"""
Detect the current state of an optimization study.
Args:
study_dir: Path to the study directory
Returns:
Dictionary with study state information
"""
study_dir = Path(study_dir)
state = {
"is_study": False,
"study_name": study_dir.name,
"status": "unknown",
"config": None,
"fea_trials": 0,
"nn_trials": 0,
"pareto_solutions": 0,
"best_trial": None,
"last_activity": None,
"has_turbo_report": False,
"has_surrogate": False,
"warnings": [],
"next_actions": []
}
# Check if this is a valid study directory
config_path = study_dir / "optimization_config.json"
if not config_path.exists():
# Try 1_setup subdirectory
config_path = study_dir / "1_setup" / "optimization_config.json"
if not config_path.exists():
state["warnings"].append("No optimization_config.json found")
return state
state["is_study"] = True
# Load config
try:
with open(config_path, 'r') as f:
config = json.load(f)
state["config"] = _summarize_config(config)
except Exception as e:
state["warnings"].append(f"Failed to parse config: {e}")
# Check results directory
results_dir = study_dir / "2_results"
if not results_dir.exists():
state["status"] = "not_started"
state["next_actions"].append("Run: python run_optimization.py --discover")
return state
# Check study.db for FEA trials
db_path = results_dir / "study.db"
if db_path.exists():
fea_stats = _query_study_db(db_path)
state.update(fea_stats)
# Check nn_study.db for NN trials
nn_db_path = results_dir / "nn_study.db"
if nn_db_path.exists():
nn_stats = _query_study_db(nn_db_path, prefix="nn_")
state["nn_trials"] = nn_stats.get("nn_fea_trials", 0)
# Check for turbo report
turbo_report_path = results_dir / "turbo_report.json"
if turbo_report_path.exists():
state["has_turbo_report"] = True
try:
with open(turbo_report_path, 'r') as f:
turbo = json.load(f)
state["turbo_summary"] = {
"mode": turbo.get("mode"),
"nn_trials": turbo.get("total_nn_trials", 0),
"fea_validations": turbo.get("fea_validations", 0),
"time_minutes": round(turbo.get("time_minutes", 0), 1)
}
except Exception:
pass
# Check for trained surrogate
surrogate_path = results_dir / "surrogate.pt"
state["has_surrogate"] = surrogate_path.exists()
# Determine overall status
state["status"] = _determine_status(state)
# Suggest next actions
state["next_actions"] = _suggest_next_actions(state)
return state
def _summarize_config(config: Dict) -> Dict[str, Any]:
"""Extract key information from config."""
# Handle different config formats
variables = config.get("design_variables", config.get("variables", []))
objectives = config.get("objectives", [])
constraints = config.get("constraints", [])
# Get variable names (handle different key names)
var_names = []
for v in variables:
name = v.get("parameter") or v.get("name") or v.get("expression_name", "unknown")
var_names.append(name)
# Get objective names
obj_names = []
for o in objectives:
name = o.get("name") or o.get("metric", "unknown")
direction = o.get("goal") or o.get("direction", "minimize")
obj_names.append(f"{name} ({direction})")
return {
"n_variables": len(variables),
"n_objectives": len(objectives),
"n_constraints": len(constraints),
"variable_names": var_names[:5], # First 5 only
"objective_names": obj_names,
"study_type": "multi_objective" if len(objectives) > 1 else "single_objective"
}
def _query_study_db(db_path: Path, prefix: str = "") -> Dict[str, Any]:
"""Query Optuna study database for statistics."""
stats = {
f"{prefix}fea_trials": 0,
f"{prefix}completed_trials": 0,
f"{prefix}failed_trials": 0,
f"{prefix}pareto_solutions": 0,
"best_trial": None,
"last_activity": None
}
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Count trials by state
cursor.execute("""
SELECT state, COUNT(*) FROM trials
GROUP BY state
""")
for state, count in cursor.fetchall():
if state == "COMPLETE":
stats[f"{prefix}completed_trials"] = count
stats[f"{prefix}fea_trials"] = count
elif state == "FAIL":
stats[f"{prefix}failed_trials"] = count
# Get last activity time
cursor.execute("""
SELECT MAX(datetime_complete) FROM trials
WHERE datetime_complete IS NOT NULL
""")
result = cursor.fetchone()
if result and result[0]:
stats["last_activity"] = result[0]
# Get best trial (for single objective)
cursor.execute("""
SELECT trial_id, value FROM trial_values
WHERE objective_id = 0
ORDER BY value ASC
LIMIT 1
""")
result = cursor.fetchone()
if result:
stats["best_trial"] = {"trial_id": result[0], "value": result[1]}
# Count Pareto solutions (trials with user_attr pareto=True or non-dominated)
# Simplified: count distinct trials in trial_values
cursor.execute("""
SELECT COUNT(DISTINCT trial_id) FROM trial_values
""")
result = cursor.fetchone()
if result:
# For multi-objective, this is a rough estimate
stats[f"{prefix}pareto_solutions"] = min(result[0], 50) # Cap at 50
conn.close()
except Exception as e:
stats["db_error"] = str(e)
return stats
def _determine_status(state: Dict) -> str:
"""Determine overall study status."""
if state["fea_trials"] == 0:
return "not_started"
elif state["fea_trials"] < 3:
return "discovery"
elif state["fea_trials"] < 10:
return "validation"
elif state["has_turbo_report"]:
return "turbo_complete"
elif state["has_surrogate"]:
return "training_complete"
elif state["fea_trials"] >= 50:
return "fea_complete"
else:
return "in_progress"
def _suggest_next_actions(state: Dict) -> List[str]:
"""Suggest next actions based on study state."""
actions = []
if state["status"] == "not_started":
actions.append("Run: python run_optimization.py --discover")
elif state["status"] == "discovery":
actions.append("Run: python run_optimization.py --validate")
elif state["status"] == "validation":
actions.append("Run: python run_optimization.py --test")
actions.append("Or run full: python run_optimization.py --run --trials 50")
elif state["status"] == "in_progress":
actions.append("Continue: python run_optimization.py --resume")
elif state["status"] == "fea_complete":
actions.append("Analyze: python -m optimization_engine.method_selector optimization_config.json 2_results/study.db")
actions.append("Or run turbo: python run_nn_optimization.py --turbo")
elif state["status"] == "turbo_complete":
actions.append("View results in dashboard: cd atomizer-dashboard && npm run dev")
actions.append("Generate report: python generate_report.py")
return actions
def format_study_summary(state: Dict) -> str:
"""Format study state as a human-readable summary."""
if not state["is_study"]:
return f"❌ Not a valid study directory: {state['study_name']}"
lines = [
f"📊 **Study: {state['study_name']}**",
f"Status: {state['status'].replace('_', ' ').title()}",
""
]
if state["config"]:
cfg = state["config"]
lines.append(f"**Configuration:**")
lines.append(f"- Variables: {cfg['n_variables']} ({', '.join(cfg['variable_names'][:3])}{'...' if cfg['n_variables'] > 3 else ''})")
lines.append(f"- Objectives: {cfg['n_objectives']} ({', '.join(cfg['objective_names'])})")
lines.append(f"- Constraints: {cfg['n_constraints']}")
lines.append(f"- Type: {cfg['study_type']}")
lines.append("")
lines.append("**Progress:**")
lines.append(f"- FEA trials: {state['fea_trials']}")
if state["nn_trials"] > 0:
lines.append(f"- NN trials: {state['nn_trials']}")
if state["has_turbo_report"] and "turbo_summary" in state:
ts = state["turbo_summary"]
lines.append(f"- Turbo mode: {ts['nn_trials']} NN + {ts['fea_validations']} FEA validations ({ts['time_minutes']} min)")
if state["last_activity"]:
lines.append(f"- Last activity: {state['last_activity']}")
lines.append("")
if state["next_actions"]:
lines.append("**Suggested Next Actions:**")
for action in state["next_actions"]:
lines.append(f"{action}")
if state["warnings"]:
lines.append("")
lines.append("**Warnings:**")
for warning in state["warnings"]:
lines.append(f" ⚠️ {warning}")
return "\n".join(lines)
def get_all_studies(atomizer_root: Path) -> List[Dict[str, Any]]:
"""Get state of all studies in the Atomizer studies directory."""
studies_dir = atomizer_root / "studies"
if not studies_dir.exists():
return []
studies = []
for study_path in studies_dir.iterdir():
if study_path.is_dir() and not study_path.name.startswith("."):
state = detect_study_state(study_path)
if state["is_study"]:
studies.append(state)
# Sort by last activity (most recent first)
studies.sort(
key=lambda s: s.get("last_activity") or "1970-01-01",
reverse=True
)
return studies
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
study_path = Path(sys.argv[1])
else:
# Default to current directory
study_path = Path.cwd()
state = detect_study_state(study_path)
print(format_study_summary(state))

View File

@@ -0,0 +1,183 @@
"""
Template Registry for Atomizer
Provides study templates for common optimization scenarios.
Used by Claude to quickly create new studies via wizard-driven workflow.
"""
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
REGISTRY_PATH = Path(__file__).parent / "registry.json"
def load_registry() -> Dict[str, Any]:
"""Load the template registry."""
with open(REGISTRY_PATH, 'r') as f:
return json.load(f)
def list_templates() -> List[Dict[str, Any]]:
"""List all available templates with summary info."""
registry = load_registry()
templates = []
for t in registry["templates"]:
templates.append({
"id": t["id"],
"name": t["name"],
"description": t["description"],
"category": t["category"],
"n_objectives": len(t["objectives"]),
"turbo_suitable": t.get("turbo_suitable", False),
"example_study": t.get("example_study")
})
return templates
def get_template(template_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific template by ID."""
registry = load_registry()
for t in registry["templates"]:
if t["id"] == template_id:
return t
return None
def get_templates_by_category(category: str) -> List[Dict[str, Any]]:
"""Get all templates in a category."""
registry = load_registry()
return [t for t in registry["templates"] if t["category"] == category]
def list_categories() -> Dict[str, Dict[str, str]]:
"""List all template categories."""
registry = load_registry()
return registry.get("categories", {})
def get_extractor_info(extractor_id: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific extractor."""
registry = load_registry()
return registry.get("extractors", {}).get(extractor_id)
def suggest_template(
n_objectives: int = 1,
physics_type: str = "structural",
element_types: Optional[List[str]] = None
) -> Optional[Dict[str, Any]]:
"""
Suggest a template based on problem characteristics.
Args:
n_objectives: Number of objectives (1 = single, 2+ = multi)
physics_type: Type of physics (structural, dynamics, optics, multiphysics)
element_types: List of element types in the mesh
Returns:
Best matching template or None
"""
registry = load_registry()
candidates = []
for t in registry["templates"]:
score = 0
# Match number of objectives
t_obj = len(t["objectives"])
if n_objectives == 1 and t_obj == 1:
score += 10
elif n_objectives > 1 and t_obj > 1:
score += 10
# Match category
if t["category"] == physics_type:
score += 20
# Match element types
if element_types:
t_elements = set(t.get("element_types", []))
user_elements = set(element_types)
if t_elements & user_elements:
score += 15
if "CQUAD4" in user_elements and "shell" in t["id"].lower():
score += 10
if score > 0:
candidates.append((score, t))
if not candidates:
return None
# Sort by score descending
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[0][1]
def format_template_summary(template: Dict[str, Any]) -> str:
"""Format a template as a human-readable summary."""
lines = [
f"**{template['name']}**",
f"_{template['description']}_",
"",
f"**Category**: {template['category']}",
f"**Solver**: {template.get('solver', 'SOL 101')}",
"",
"**Objectives**:"
]
for obj in template["objectives"]:
lines.append(f" - {obj['name']} ({obj['direction']}) → Extractor {obj['extractor']}")
lines.append("")
lines.append("**Recommended Trials**:")
trials = template.get("recommended_trials", {})
for phase, count in trials.items():
lines.append(f" - {phase}: {count}")
if template.get("turbo_suitable"):
lines.append("")
lines.append("✅ **Turbo Mode**: Suitable for neural acceleration")
if template.get("notes"):
lines.append("")
lines.append(f"⚠️ **Note**: {template['notes']}")
if template.get("example_study"):
lines.append("")
lines.append(f"📁 **Example**: studies/{template['example_study']}/")
return "\n".join(lines)
def get_wizard_questions(template_id: str) -> List[Dict[str, Any]]:
"""Get wizard questions for a template."""
template = get_template(template_id)
if not template:
return []
return template.get("wizard_questions", [])
if __name__ == "__main__":
# Demo: list all templates
print("=== Atomizer Template Registry ===\n")
for category_id, category in list_categories().items():
print(f"{category['icon']} {category['name']}")
print(f" {category['description']}\n")
print("\n=== Available Templates ===\n")
for t in list_templates():
status = "🚀" if t["turbo_suitable"] else "📊"
print(f"{status} {t['name']} ({t['id']})")
print(f" {t['description']}")
print(f" Objectives: {t['n_objectives']} | Example: {t['example_study'] or 'N/A'}")
print()

View File

@@ -0,0 +1,28 @@
"""
CLI for the Atomizer Template Registry.
"""
from . import list_templates, list_categories, format_template_summary, get_template
def main():
print("=== Atomizer Template Registry ===\n")
for category_id, category in list_categories().items():
# Use ASCII-safe icons for Windows compatibility
icon = "[" + category_id[:3].upper() + "]"
print(f"{icon} {category['name']}")
print(f" {category['description']}\n")
print("\n=== Available Templates ===\n")
for t in list_templates():
status = "[TURBO]" if t["turbo_suitable"] else "[FEA]"
print(f"{status} {t['name']} ({t['id']})")
print(f" {t['description']}")
print(f" Objectives: {t['n_objectives']} | Example: {t['example_study'] or 'N/A'}")
print()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,205 @@
{
"version": "1.0",
"last_updated": "2025-12-07",
"templates": [
{
"id": "multi_objective_structural",
"name": "Multi-Objective Structural",
"description": "NSGA-II optimization for structural analysis with mass, stress, and stiffness objectives",
"category": "structural",
"objectives": [
{"name": "mass", "direction": "minimize", "extractor": "E4"},
{"name": "stress", "direction": "minimize", "extractor": "E3"},
{"name": "stiffness", "direction": "maximize", "extractor": "E1"}
],
"extractors": ["E1", "E3", "E4"],
"solver": "SOL 101",
"element_types": ["CTETRA", "CHEXA", "CQUAD4"],
"sampler": "NSGAIISampler",
"recommended_trials": {
"discovery": 1,
"validation": 3,
"quick": 20,
"full": 50,
"comprehensive": 100
},
"turbo_suitable": true,
"example_study": "bracket_pareto_3obj",
"wizard_questions": [
{"key": "element_type", "question": "What element type does your mesh use?", "options": ["CTETRA (solid)", "CHEXA (solid)", "CQUAD4 (shell)"]},
{"key": "stress_limit", "question": "What is the allowable stress limit (MPa)?", "default": 200},
{"key": "displacement_limit", "question": "What is the max allowable displacement (mm)?", "default": 10}
]
},
{
"id": "frequency_optimization",
"name": "Frequency Optimization",
"description": "Maximize natural frequency while minimizing mass for vibration-sensitive structures",
"category": "dynamics",
"objectives": [
{"name": "frequency", "direction": "maximize", "extractor": "E2"},
{"name": "mass", "direction": "minimize", "extractor": "E4"}
],
"extractors": ["E2", "E4"],
"solver": "SOL 103",
"element_types": ["CTETRA", "CHEXA", "CQUAD4", "CBAR"],
"sampler": "NSGAIISampler",
"recommended_trials": {
"discovery": 1,
"validation": 3,
"quick": 20,
"full": 50
},
"turbo_suitable": true,
"example_study": "uav_arm_optimization",
"wizard_questions": [
{"key": "target_mode", "question": "Which vibration mode to optimize?", "default": 1},
{"key": "min_frequency", "question": "Minimum acceptable frequency (Hz)?", "default": 50}
]
},
{
"id": "single_objective_mass",
"name": "Mass Minimization",
"description": "Minimize mass subject to stress and displacement constraints",
"category": "structural",
"objectives": [
{"name": "mass", "direction": "minimize", "extractor": "E4"}
],
"extractors": ["E1", "E3", "E4"],
"solver": "SOL 101",
"element_types": ["CTETRA", "CHEXA", "CQUAD4"],
"sampler": "TPESampler",
"recommended_trials": {
"discovery": 1,
"validation": 3,
"quick": 30,
"full": 100
},
"turbo_suitable": true,
"example_study": "bracket_stiffness_optimization_V3",
"wizard_questions": [
{"key": "stress_constraint", "question": "Max stress constraint (MPa)?", "default": 200},
{"key": "displacement_constraint", "question": "Max displacement constraint (mm)?", "default": 5}
]
},
{
"id": "mirror_wavefront",
"name": "Mirror Wavefront Optimization",
"description": "Minimize Zernike wavefront error for optical mirror deformation",
"category": "optics",
"objectives": [
{"name": "zernike_rms", "direction": "minimize", "extractor": "E8"}
],
"extractors": ["E8", "E9", "E10"],
"solver": "SOL 101",
"element_types": ["CQUAD4", "CTRIA3"],
"sampler": "TPESampler",
"recommended_trials": {
"discovery": 1,
"validation": 3,
"quick": 30,
"full": 100
},
"turbo_suitable": false,
"example_study": "m1_mirror_zernike_optimization",
"wizard_questions": [
{"key": "mirror_radius", "question": "Mirror radius (mm)?", "required": true},
{"key": "zernike_modes", "question": "Number of Zernike modes?", "default": 36},
{"key": "target_wfe", "question": "Target WFE RMS (nm)?", "default": 50}
]
},
{
"id": "thermal_structural",
"name": "Thermal-Structural Coupled",
"description": "Optimize for thermal and structural performance",
"category": "multiphysics",
"objectives": [
{"name": "max_temperature", "direction": "minimize", "extractor": "E15"},
{"name": "thermal_stress", "direction": "minimize", "extractor": "E3"}
],
"extractors": ["E3", "E15", "E16"],
"solver": "SOL 153/400",
"element_types": ["CTETRA", "CHEXA"],
"sampler": "NSGAIISampler",
"recommended_trials": {
"discovery": 1,
"validation": 3,
"quick": 20,
"full": 50
},
"turbo_suitable": false,
"example_study": null,
"wizard_questions": [
{"key": "max_temp_limit", "question": "Maximum allowable temperature (°C)?", "default": 100},
{"key": "stress_limit", "question": "Maximum allowable thermal stress (MPa)?", "default": 150}
]
},
{
"id": "shell_structural",
"name": "Shell Structure Optimization",
"description": "Optimize shell structures (CQUAD4/CTRIA3) for mass and stress",
"category": "structural",
"objectives": [
{"name": "mass", "direction": "minimize", "extractor": "E4"},
{"name": "stress", "direction": "minimize", "extractor": "E3"}
],
"extractors": ["E1", "E3", "E4"],
"solver": "SOL 101",
"element_types": ["CQUAD4", "CTRIA3"],
"sampler": "NSGAIISampler",
"recommended_trials": {
"discovery": 1,
"validation": 3,
"quick": 20,
"full": 50
},
"turbo_suitable": true,
"example_study": "beam_pareto_4var",
"notes": "Remember to specify element_type='cquad4' in stress extractor",
"wizard_questions": [
{"key": "stress_limit", "question": "Max stress constraint (MPa)?", "default": 200}
]
}
],
"extractors": {
"E1": {"name": "Displacement", "function": "extract_displacement", "units": "mm", "phase": 1},
"E2": {"name": "Frequency", "function": "extract_frequency", "units": "Hz", "phase": 1},
"E3": {"name": "Von Mises Stress", "function": "extract_solid_stress", "units": "MPa", "phase": 1, "notes": "Specify element_type for shell elements"},
"E4": {"name": "BDF Mass", "function": "extract_mass_from_bdf", "units": "kg", "phase": 1},
"E5": {"name": "CAD Mass", "function": "extract_mass_from_expression", "units": "kg", "phase": 1},
"E6": {"name": "Stiffness (from disp)", "function": "calculate_stiffness", "units": "N/mm", "phase": 1},
"E7": {"name": "Compliance", "function": "calculate_compliance", "units": "mm/N", "phase": 1},
"E8": {"name": "Zernike WFE RMS", "function": "extract_zernike_wfe_rms", "units": "nm", "phase": 1},
"E9": {"name": "Zernike Coefficients", "function": "extract_zernike_coefficients", "units": "nm", "phase": 1},
"E10": {"name": "Zernike RMS per Mode", "function": "extract_zernike_rms_per_mode", "units": "nm", "phase": 1},
"E12": {"name": "Principal Stress", "function": "extract_principal_stress", "units": "MPa", "phase": 2},
"E13": {"name": "Strain Energy", "function": "extract_strain_energy", "units": "J", "phase": 2},
"E14": {"name": "SPC Forces", "function": "extract_spc_forces", "units": "N", "phase": 2},
"E15": {"name": "Temperature", "function": "extract_temperature", "units": "°C", "phase": 3},
"E16": {"name": "Temperature Gradient", "function": "extract_temperature_gradient", "units": "°C/mm", "phase": 3},
"E17": {"name": "Heat Flux", "function": "extract_heat_flux", "units": "W/mm²", "phase": 3},
"E18": {"name": "Modal Mass", "function": "extract_modal_mass", "units": "kg", "phase": 3}
},
"categories": {
"structural": {
"name": "Structural Analysis",
"description": "Static structural optimization (SOL 101)",
"icon": "🏗️"
},
"dynamics": {
"name": "Dynamics / Modal",
"description": "Frequency and modal optimization (SOL 103)",
"icon": "📳"
},
"optics": {
"name": "Optical Systems",
"description": "Wavefront error optimization for mirrors/lenses",
"icon": "🔭"
},
"multiphysics": {
"name": "Multi-Physics",
"description": "Coupled thermal-structural analysis",
"icon": "🔥"
}
}
}

View File

@@ -0,0 +1,42 @@
#!/usr/bin/env python
"""
{STUDY_NAME} - Neural Network Acceleration Script (Simplified)
================================================================
This script uses ConfigDrivenSurrogate for config-driven NN optimization.
The ~600 lines of boilerplate code is now handled automatically.
Workflow:
---------
1. First run FEA: python run_optimization.py --run --trials 50
2. Then run NN: python run_nn_optimization.py --turbo --nn-trials 5000
Or combine:
python run_nn_optimization.py --all
Generated by Atomizer StudyWizard
"""
from pathlib import Path
import sys
# Add project root to path
project_root = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root))
from optimization_engine.generic_surrogate import ConfigDrivenSurrogate
def main():
"""Run neural acceleration using config-driven surrogate."""
# Create surrogate - all config read from optimization_config.json
surrogate = ConfigDrivenSurrogate(__file__)
# Element type: 'auto' detects from DAT file
# Override if needed: surrogate.element_type = 'cquad4' (shell) or 'ctetra' (solid)
return surrogate.run()
if __name__ == "__main__":
exit(main())

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python
"""
{STUDY_NAME} - Optimization Script (Simplified)
================================================================
This script uses the ConfigDrivenRunner for config-driven optimization.
The ~300 lines of boilerplate code is now handled automatically.
Workflow:
---------
1. python run_optimization.py --discover # Model introspection
2. python run_optimization.py --validate # Single trial validation
3. python run_optimization.py --test # Quick 3-trial test
4. python run_optimization.py --run # Full optimization
Generated by Atomizer StudyWizard
"""
from pathlib import Path
import sys
# Add project root to path
project_root = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root))
from optimization_engine.base_runner import ConfigDrivenRunner
def main():
"""Run optimization using config-driven runner."""
# Create runner - all config read from optimization_config.json
runner = ConfigDrivenRunner(__file__)
# Element type: 'auto' detects from DAT file
# Override if needed: runner.element_type = 'cquad4' (shell) or 'ctetra' (solid)
return runner.run()
if __name__ == "__main__":
exit(main())