BREAKING CHANGE: Module paths have been reorganized for better maintainability. Backwards compatibility aliases with deprecation warnings are provided. New Structure: - core/ - Optimization runners (runner, intelligent_optimizer, etc.) - processors/ - Data processing - surrogates/ - Neural network surrogates - nx/ - NX/Nastran integration (solver, updater, session_manager) - study/ - Study management (creator, wizard, state, reset) - reporting/ - Reports and analysis (visualizer, report_generator) - config/ - Configuration management (manager, builder) - utils/ - Utilities (logger, auto_doc, etc.) - future/ - Research/experimental code Migration: - ~200 import changes across 125 files - All __init__.py files use lazy loading to avoid circular imports - Backwards compatibility layer supports old import paths with warnings - All existing functionality preserved To migrate existing code: OLD: from optimization_engine.nx_solver import NXSolver NEW: from optimization_engine.nx.solver import NXSolver OLD: from optimization_engine.runner import OptimizationRunner NEW: from optimization_engine.core.runner import OptimizationRunner 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
526 lines
19 KiB
Python
526 lines
19 KiB
Python
"""
|
|
Workflow Decomposer
|
|
|
|
Breaks complex user requests into atomic workflow steps that can be matched
|
|
against existing codebase capabilities.
|
|
|
|
IMPROVED VERSION: Handles multi-objective optimization, constraints, and complex requests.
|
|
|
|
Author: Atomizer Development Team
|
|
Version: 0.2.0 (Phase 2.5 - Improved)
|
|
Last Updated: 2025-01-16
|
|
"""
|
|
|
|
import re
|
|
from typing import List, Dict, Any, Set
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class WorkflowStep:
|
|
"""Represents a single atomic step in a workflow."""
|
|
action: str
|
|
domain: str
|
|
params: Dict[str, Any]
|
|
priority: int = 0
|
|
|
|
|
|
class WorkflowDecomposer:
|
|
"""Breaks complex requests into atomic workflow steps."""
|
|
|
|
def __init__(self):
|
|
# Extended result type mapping
|
|
self.result_types = {
|
|
'displacement': 'displacement',
|
|
'deformation': 'displacement',
|
|
'stress': 'stress',
|
|
'von mises': 'stress',
|
|
'strain': 'strain',
|
|
'modal': 'modal',
|
|
'mode': 'modal',
|
|
'eigenvalue': 'modal',
|
|
'frequency': 'modal',
|
|
'temperature': 'temperature',
|
|
'thermal': 'temperature',
|
|
'reaction': 'reaction_force',
|
|
'reaction force': 'reaction_force',
|
|
'nodal reaction': 'reaction_force',
|
|
'force': 'reaction_force',
|
|
'mass': 'mass',
|
|
'weight': 'mass',
|
|
'volume': 'volume'
|
|
}
|
|
|
|
# Solver type mapping
|
|
self.solver_types = {
|
|
'sol101': 'SOL101',
|
|
'sol 101': 'SOL101',
|
|
'static': 'SOL101',
|
|
'sol103': 'SOL103',
|
|
'sol 103': 'SOL103',
|
|
'modal': 'SOL103',
|
|
'sol106': 'SOL106',
|
|
'sol 106': 'SOL106',
|
|
'nonlinear': 'SOL106',
|
|
'sol105': 'SOL105',
|
|
'buckling': 'SOL105'
|
|
}
|
|
|
|
def decompose(self, user_request: str) -> List[WorkflowStep]:
|
|
"""
|
|
Break user request into atomic workflow steps.
|
|
|
|
Handles:
|
|
- Multi-objective optimization
|
|
- Constraints
|
|
- Multiple result extractions
|
|
- Custom expressions
|
|
- Parameter filtering
|
|
"""
|
|
steps = []
|
|
request_lower = user_request.lower()
|
|
|
|
# Check if this is an optimization request
|
|
is_optimization = self._is_optimization_request(request_lower)
|
|
|
|
if is_optimization:
|
|
steps = self._decompose_optimization_workflow(user_request, request_lower)
|
|
else:
|
|
steps = self._decompose_simple_workflow(user_request, request_lower)
|
|
|
|
# Sort by priority
|
|
steps.sort(key=lambda s: s.priority)
|
|
|
|
return steps
|
|
|
|
def _is_optimization_request(self, text: str) -> bool:
|
|
"""Check if request involves optimization."""
|
|
optimization_keywords = [
|
|
'optimize', 'optimiz', 'minimize', 'minimiz', 'maximize', 'maximiz',
|
|
'optuna', 'genetic', 'iteration', 'vary', 'varying'
|
|
]
|
|
return any(kw in text for kw in optimization_keywords)
|
|
|
|
def _decompose_optimization_workflow(self, request: str, request_lower: str) -> List[WorkflowStep]:
|
|
"""Decompose an optimization request into workflow steps."""
|
|
steps = []
|
|
priority = 1
|
|
|
|
# 1. Identify and filter parameters
|
|
param_filter = self._extract_parameter_filter(request, request_lower)
|
|
if param_filter:
|
|
steps.append(WorkflowStep(
|
|
action='identify_parameters',
|
|
domain='geometry',
|
|
params={'filter': param_filter},
|
|
priority=priority
|
|
))
|
|
priority += 1
|
|
|
|
# 2. Update parameters (this happens in the optimization loop)
|
|
steps.append(WorkflowStep(
|
|
action='update_parameters',
|
|
domain='geometry',
|
|
params={'source': 'optimization_algorithm'},
|
|
priority=priority
|
|
))
|
|
priority += 1
|
|
|
|
# 3. Run simulation
|
|
solver = self._extract_solver_type(request_lower)
|
|
if solver:
|
|
steps.append(WorkflowStep(
|
|
action='run_analysis',
|
|
domain='simulation',
|
|
params={'solver': solver},
|
|
priority=priority
|
|
))
|
|
priority += 1
|
|
|
|
# 4. Extract ALL result types mentioned (multi-objective!)
|
|
result_extractions = self._extract_all_results(request, request_lower)
|
|
for result_info in result_extractions:
|
|
# If result has custom_expression (e.g., mass from .prt expression),
|
|
# it's a geometry operation, not result_extraction (OP2 file)
|
|
if 'custom_expression' in result_info:
|
|
steps.append(WorkflowStep(
|
|
action='read_expression',
|
|
domain='geometry',
|
|
params=result_info,
|
|
priority=priority
|
|
))
|
|
else:
|
|
steps.append(WorkflowStep(
|
|
action='extract_result',
|
|
domain='result_extraction',
|
|
params=result_info,
|
|
priority=priority
|
|
))
|
|
priority += 1
|
|
|
|
# 5. Handle constraints
|
|
constraints = self._extract_constraints(request, request_lower)
|
|
if constraints:
|
|
steps.append(WorkflowStep(
|
|
action='apply_constraints',
|
|
domain='optimization',
|
|
params={'constraints': constraints},
|
|
priority=priority
|
|
))
|
|
priority += 1
|
|
|
|
# 6. Optimize (multi-objective if multiple objectives detected)
|
|
objectives = self._extract_objectives(request, request_lower)
|
|
algorithm = self._extract_algorithm(request_lower)
|
|
|
|
steps.append(WorkflowStep(
|
|
action='optimize',
|
|
domain='optimization',
|
|
params={
|
|
'objectives': objectives,
|
|
'algorithm': algorithm,
|
|
'multi_objective': len(objectives) > 1
|
|
},
|
|
priority=priority
|
|
))
|
|
|
|
return steps
|
|
|
|
def _decompose_simple_workflow(self, request: str, request_lower: str) -> List[WorkflowStep]:
|
|
"""Decompose a non-optimization request."""
|
|
steps = []
|
|
|
|
# Check for material creation
|
|
if 'material' in request_lower and ('create' in request_lower or 'generate' in request_lower):
|
|
steps.append(WorkflowStep(
|
|
action='create_material',
|
|
domain='materials',
|
|
params={}
|
|
))
|
|
|
|
# Check for simulation run
|
|
solver = self._extract_solver_type(request_lower)
|
|
if solver:
|
|
steps.append(WorkflowStep(
|
|
action='run_analysis',
|
|
domain='simulation',
|
|
params={'solver': solver}
|
|
))
|
|
|
|
# Check for result extraction
|
|
result_extractions = self._extract_all_results(request, request_lower)
|
|
for result_info in result_extractions:
|
|
# If result has custom_expression (e.g., mass from .prt expression),
|
|
# it's a geometry operation, not result_extraction (OP2 file)
|
|
if 'custom_expression' in result_info:
|
|
steps.append(WorkflowStep(
|
|
action='read_expression',
|
|
domain='geometry',
|
|
params=result_info
|
|
))
|
|
else:
|
|
steps.append(WorkflowStep(
|
|
action='extract_result',
|
|
domain='result_extraction',
|
|
params=result_info
|
|
))
|
|
|
|
return steps
|
|
|
|
def _extract_parameter_filter(self, request: str, request_lower: str) -> str:
|
|
"""Extract parameter filter from text."""
|
|
# Look for specific suffixes/prefixes
|
|
if '_opt' in request_lower or ' opt ' in request_lower:
|
|
return '_opt'
|
|
if 'v_' in request_lower:
|
|
return 'v_'
|
|
if '_var' in request_lower:
|
|
return '_var'
|
|
if 'design variable' in request_lower or 'design parameter' in request_lower:
|
|
return 'design_variables'
|
|
if 'all parameter' in request_lower or 'all expression' in request_lower:
|
|
return 'all'
|
|
|
|
# Default to none if not specified
|
|
return ''
|
|
|
|
def _extract_solver_type(self, text: str) -> str:
|
|
"""Extract solver type from text."""
|
|
for keyword, solver in self.solver_types.items():
|
|
if keyword in text:
|
|
return solver
|
|
return ''
|
|
|
|
def _extract_all_results(self, request: str, request_lower: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Extract ALL result types mentioned in the request.
|
|
Handles multiple objectives and constraints.
|
|
"""
|
|
result_extractions = []
|
|
|
|
# Find all result types mentioned
|
|
found_types = set()
|
|
for keyword, result_type in self.result_types.items():
|
|
if keyword in request_lower:
|
|
found_types.add(result_type)
|
|
|
|
# For each result type, extract details
|
|
for result_type in found_types:
|
|
result_info = {
|
|
'result_type': result_type
|
|
}
|
|
|
|
# Extract subcase information
|
|
subcase = self._extract_subcase(request, request_lower)
|
|
if subcase:
|
|
result_info['subcase'] = subcase
|
|
|
|
# Extract direction (for reaction forces, displacements)
|
|
if result_type in ['reaction_force', 'displacement']:
|
|
direction = self._extract_direction(request, request_lower)
|
|
if direction:
|
|
result_info['direction'] = direction
|
|
|
|
# Extract metric (min, max, specific location)
|
|
metric = self._extract_metric_for_type(request, request_lower, result_type)
|
|
if metric:
|
|
result_info['metric'] = metric
|
|
|
|
# Extract custom expression (for mass, etc.)
|
|
if result_type == 'mass':
|
|
custom_expr = self._extract_custom_expression(request, request_lower, 'mass')
|
|
if custom_expr:
|
|
result_info['custom_expression'] = custom_expr
|
|
|
|
result_extractions.append(result_info)
|
|
|
|
return result_extractions
|
|
|
|
def _extract_subcase(self, request: str, request_lower: str) -> str:
|
|
"""Extract subcase information (solution X subcase Y)."""
|
|
# Look for patterns like "solution 1 subcase 3"
|
|
match = re.search(r'solution\s+(\d+)\s+subcase\s+(\d+)', request_lower)
|
|
if match:
|
|
return f"solution_{match.group(1)}_subcase_{match.group(2)}"
|
|
|
|
# Look for just "subcase X"
|
|
match = re.search(r'subcase\s+(\d+)', request_lower)
|
|
if match:
|
|
return f"subcase_{match.group(1)}"
|
|
|
|
return ''
|
|
|
|
def _extract_direction(self, request: str, request_lower: str) -> str:
|
|
"""Extract direction (X, Y, Z) for vectorial results."""
|
|
# Look for explicit direction mentions
|
|
if re.search(r'\bin\s+[xyz]\b', request_lower):
|
|
match = re.search(r'in\s+([xyz])\b', request_lower)
|
|
if match:
|
|
return match.group(1).upper()
|
|
|
|
# Look for "Y direction" pattern
|
|
if re.search(r'[xyz]\s+direction', request_lower):
|
|
match = re.search(r'([xyz])\s+direction', request_lower)
|
|
if match:
|
|
return match.group(1).upper()
|
|
|
|
return ''
|
|
|
|
def _extract_metric_for_type(self, request: str, request_lower: str, result_type: str) -> str:
|
|
"""Extract metric (min, max, average) for specific result type."""
|
|
# Check for explicit min/max keywords near the result type
|
|
if 'max' in request_lower or 'maximum' in request_lower:
|
|
return f'max_{result_type}'
|
|
if 'min' in request_lower or 'minimum' in request_lower:
|
|
return f'min_{result_type}'
|
|
if 'average' in request_lower or 'mean' in request_lower:
|
|
return f'avg_{result_type}'
|
|
|
|
# Default to max for most result types
|
|
return f'max_{result_type}'
|
|
|
|
def _extract_custom_expression(self, request: str, request_lower: str, expr_type: str) -> str:
|
|
"""Extract custom expression names (e.g., mass_of_only_this_part)."""
|
|
if expr_type == 'mass':
|
|
# Look for custom mass expressions
|
|
match = re.search(r'mass[_\w]*(?:of|for)[_\w]*', request_lower)
|
|
if match:
|
|
return match.group(0).replace(' ', '_')
|
|
|
|
# Look for explicit expression names
|
|
if 'expression' in request_lower:
|
|
match = re.search(r'expression\s+(\w+)', request_lower)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return ''
|
|
|
|
def _extract_constraints(self, request: str, request_lower: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Extract constraints from the request.
|
|
Examples: "maintain stress under 100 MPa", "keep displacement < 5mm"
|
|
"""
|
|
constraints = []
|
|
|
|
# Pattern 1: "maintain X under/below Y"
|
|
maintain_pattern = r'maintain\s+(\w+)\s+(?:under|below|less than|<)\s+([\d.]+)\s*(\w+)?'
|
|
for match in re.finditer(maintain_pattern, request_lower):
|
|
result_type = self.result_types.get(match.group(1), match.group(1))
|
|
value = float(match.group(2))
|
|
unit = match.group(3) if match.group(3) else ''
|
|
|
|
constraints.append({
|
|
'type': 'upper_bound',
|
|
'result_type': result_type,
|
|
'value': value,
|
|
'unit': unit
|
|
})
|
|
|
|
# Pattern 2: "stress < 100 MPa" or "stress < 100MPa"
|
|
comparison_pattern = r'(\w+)\s*(<|>|<=|>=)\s*([\d.]+)\s*(\w+)?'
|
|
for match in re.finditer(comparison_pattern, request_lower):
|
|
result_type = self.result_types.get(match.group(1), match.group(1))
|
|
operator = match.group(2)
|
|
value = float(match.group(3))
|
|
unit = match.group(4) if match.group(4) else ''
|
|
|
|
constraint_type = 'upper_bound' if operator in ['<', '<='] else 'lower_bound'
|
|
|
|
constraints.append({
|
|
'type': constraint_type,
|
|
'result_type': result_type,
|
|
'operator': operator,
|
|
'value': value,
|
|
'unit': unit
|
|
})
|
|
|
|
return constraints
|
|
|
|
def _extract_objectives(self, request: str, request_lower: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Extract optimization objectives.
|
|
Can be multiple for multi-objective optimization.
|
|
"""
|
|
objectives = []
|
|
|
|
# Find all "minimize X" or "maximize X" patterns
|
|
minimize_pattern = r'minimi[zs]e\s+(\w+(?:\s+\w+)*?)(?:\s+(?:and|but|with|using|varying|to)|\.|\,|$)'
|
|
for match in re.finditer(minimize_pattern, request_lower):
|
|
objective_text = match.group(1).strip()
|
|
result_type = self._map_to_result_type(objective_text)
|
|
objectives.append({
|
|
'type': 'minimize',
|
|
'target': result_type if result_type else objective_text
|
|
})
|
|
|
|
maximize_pattern = r'maximi[zs]e\s+(\w+(?:\s+\w+)*?)(?:\s+(?:and|but|with|using|varying|to)|\.|\,|$)'
|
|
for match in re.finditer(maximize_pattern, request_lower):
|
|
objective_text = match.group(1).strip()
|
|
result_type = self._map_to_result_type(objective_text)
|
|
objectives.append({
|
|
'type': 'maximize',
|
|
'target': result_type if result_type else objective_text
|
|
})
|
|
|
|
# If no explicit minimize/maximize but mentions optimization
|
|
if not objectives and ('optimize' in request_lower or 'optim' in request_lower):
|
|
# Try to infer from context
|
|
for keyword, result_type in self.result_types.items():
|
|
if keyword in request_lower:
|
|
# Assume minimize for stress, strain, displacement
|
|
# Assume maximize for modal frequencies
|
|
obj_type = 'maximize' if result_type == 'modal' else 'minimize'
|
|
objectives.append({
|
|
'type': obj_type,
|
|
'target': result_type
|
|
})
|
|
|
|
return objectives if objectives else [{'type': 'minimize', 'target': 'unknown'}]
|
|
|
|
def _map_to_result_type(self, text: str) -> str:
|
|
"""Map objective text to result type."""
|
|
text_lower = text.lower().strip()
|
|
for keyword, result_type in self.result_types.items():
|
|
if keyword in text_lower:
|
|
return result_type
|
|
return text # Return as-is if no mapping found
|
|
|
|
def _extract_algorithm(self, text: str) -> str:
|
|
"""Extract optimization algorithm."""
|
|
if 'optuna' in text:
|
|
return 'optuna'
|
|
if 'genetic' in text or 'ga' in text:
|
|
return 'genetic_algorithm'
|
|
if 'gradient' in text:
|
|
return 'gradient_based'
|
|
if 'pso' in text or 'particle swarm' in text:
|
|
return 'pso'
|
|
return 'optuna' # Default
|
|
|
|
def get_workflow_summary(self, steps: List[WorkflowStep]) -> str:
|
|
"""Get human-readable summary of workflow."""
|
|
if not steps:
|
|
return "No workflow steps identified"
|
|
|
|
lines = ["Workflow Steps Identified:", "=" * 60, ""]
|
|
|
|
for i, step in enumerate(steps, 1):
|
|
lines.append(f"{i}. {step.action.replace('_', ' ').title()}")
|
|
lines.append(f" Domain: {step.domain}")
|
|
if step.params:
|
|
lines.append(f" Parameters:")
|
|
for key, value in step.params.items():
|
|
if isinstance(value, list) and value:
|
|
lines.append(f" {key}:")
|
|
for item in value[:3]: # Show first 3 items
|
|
lines.append(f" - {item}")
|
|
if len(value) > 3:
|
|
lines.append(f" ... and {len(value) - 3} more")
|
|
else:
|
|
lines.append(f" {key}: {value}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
"""Test the improved workflow decomposer."""
|
|
decomposer = WorkflowDecomposer()
|
|
|
|
# Test case 1: Complex multi-objective with constraints
|
|
test_request_1 = """update a geometry (.prt) with all expressions that have a _opt suffix to make the mass minimized. But the mass is not directly the total mass used, its the value under the part expression mass_of_only_this_part which is the calculation of 1of the body mass of my part, the one that I want to minimize.
|
|
|
|
the objective is to minimize mass but maintain stress of the solution 1 subcase 3 under 100Mpa. And also, as a second objective in my objective function, I want to minimize nodal reaction force in y of the same subcase."""
|
|
|
|
print("Test 1: Complex Multi-Objective Optimization with Constraints")
|
|
print("=" * 80)
|
|
print(f"Request: {test_request_1[:100]}...")
|
|
print()
|
|
|
|
steps_1 = decomposer.decompose(test_request_1)
|
|
print(decomposer.get_workflow_summary(steps_1))
|
|
|
|
print("\nDetailed Analysis:")
|
|
print("-" * 80)
|
|
for i, step in enumerate(steps_1, 1):
|
|
print(f"{i}. Action: {step.action}")
|
|
print(f" Domain: {step.domain}")
|
|
print(f" Params: {step.params}")
|
|
print()
|
|
|
|
# Test case 2: Simple strain optimization
|
|
test_request_2 = "minimize strain using SOL101 and optuna varying v_ parameters"
|
|
|
|
print("\n" + "=" * 80)
|
|
print("Test 2: Simple Strain Optimization")
|
|
print("=" * 80)
|
|
print(f"Request: {test_request_2}")
|
|
print()
|
|
|
|
steps_2 = decomposer.decompose(test_request_2)
|
|
print(decomposer.get_workflow_summary(steps_2))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|