Files
Atomizer/optimization_engine/future/extractor_orchestrator.py
Anto01 d228ccec66 refactor: Archive experimental LLM features for MVP stability (Phase 1.1)
Moved experimental LLM integration code to optimization_engine/future/:
- llm_optimization_runner.py - Runtime LLM API runner
- llm_workflow_analyzer.py - Workflow analysis
- inline_code_generator.py - Auto-generate calculations
- hook_generator.py - Auto-generate hooks
- report_generator.py - LLM report generation
- extractor_orchestrator.py - Extractor orchestration

Added comprehensive optimization_engine/future/README.md explaining:
- MVP LLM strategy (Claude Code skills, not runtime LLM)
- Why files were archived
- When to revisit post-MVP
- Production architecture reference

Production runner confirmed: optimization_engine/runner.py is sole active runner.

This establishes clear separation between:
- Production code (stable, no runtime LLM dependencies)
- Experimental code (archived for post-MVP exploration)

Part of Phase 1: Core Stabilization & Organization for MVP

Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 09:12:36 -05:00

395 lines
14 KiB
Python

"""
Extractor Orchestrator - Phase 3.1
Integrates Phase 2.7 LLM workflow analysis with Phase 3 pyNastran research agent
to automatically generate and manage OP2 extractors.
This orchestrator:
1. Takes Phase 2.7 LLM output (engineering_features)
2. Uses Phase 3 research agent to generate extractors
3. Saves generated extractors to result_extractors/
4. Provides dynamic loading for optimization runtime
Author: Atomizer Development Team
Version: 0.1.0 (Phase 3.1)
Last Updated: 2025-01-16
"""
from typing import Dict, Any, List, Optional
from pathlib import Path
import importlib.util
import logging
from dataclasses import dataclass
from optimization_engine.pynastran_research_agent import PyNastranResearchAgent, ExtractionPattern
from optimization_engine.extractor_library import ExtractorLibrary, create_study_manifest
logger = logging.getLogger(__name__)
@dataclass
class GeneratedExtractor:
"""Represents a generated extractor module."""
name: str
file_path: Path
function_name: str
extraction_pattern: ExtractionPattern
params: Dict[str, Any]
class ExtractorOrchestrator:
"""
Orchestrates automatic extractor generation from LLM workflow analysis.
This class bridges Phase 2.7 (LLM analysis) and Phase 3 (pyNastran research)
to create a complete end-to-end automation pipeline.
"""
def __init__(self,
extractors_dir: Optional[Path] = None,
knowledge_base_path: Optional[Path] = None,
use_core_library: bool = True):
"""
Initialize the orchestrator.
Args:
extractors_dir: Directory to save study manifest (not extractor code!)
knowledge_base_path: Path to pyNastran pattern knowledge base
use_core_library: Use centralized library (True) or per-study generation (False, legacy)
"""
self.use_core_library = use_core_library
if extractors_dir is None:
extractors_dir = Path(__file__).parent / "result_extractors" / "generated"
self.extractors_dir = Path(extractors_dir)
self.extractors_dir.mkdir(parents=True, exist_ok=True)
# Initialize Phase 3 research agent
self.research_agent = PyNastranResearchAgent(knowledge_base_path)
# Initialize centralized library (NEW ARCHITECTURE)
if use_core_library:
self.library = ExtractorLibrary()
logger.info(f"Using centralized extractor library: {self.library.library_dir}")
else:
self.library = None
logger.warning("Using legacy per-study extractor generation (not recommended)")
# Registry of generated extractors for this session
self.extractors: Dict[str, GeneratedExtractor] = {}
self.extractor_signatures: List[str] = [] # Track which library extractors were used
logger.info(f"ExtractorOrchestrator initialized")
def process_llm_workflow(self, llm_output: Dict[str, Any]) -> List[GeneratedExtractor]:
"""
Process Phase 2.7 LLM workflow output and generate all required extractors.
Args:
llm_output: Dict with structure:
{
"engineering_features": [
{
"action": "extract_1d_element_forces",
"domain": "result_extraction",
"description": "Extract element forces from CBAR in Z direction",
"params": {
"element_types": ["CBAR"],
"result_type": "element_force",
"direction": "Z"
}
}
],
"inline_calculations": [...],
"post_processing_hooks": [...],
"optimization": {...}
}
Returns:
List of GeneratedExtractor objects
"""
engineering_features = llm_output.get('engineering_features', [])
generated_extractors = []
for feature in engineering_features:
domain = feature.get('domain', '')
# Only process result extraction features
if domain == 'result_extraction':
logger.info(f"Processing extraction feature: {feature.get('action')}")
try:
extractor = self.generate_extractor_from_feature(feature)
generated_extractors.append(extractor)
except Exception as e:
logger.error(f"Failed to generate extractor for {feature.get('action')}: {e}")
# Continue with other features
# NEW ARCHITECTURE: Create study manifest (not copy code)
if self.use_core_library and self.library and self.extractor_signatures:
create_study_manifest(self.extractor_signatures, self.extractors_dir)
logger.info("Study manifest created - extractors referenced from core library")
logger.info(f"Generated {len(generated_extractors)} extractors")
return generated_extractors
def generate_extractor_from_feature(self, feature: Dict[str, Any]) -> GeneratedExtractor:
"""
Generate a single extractor from an engineering feature.
Args:
feature: Engineering feature dict from Phase 2.7 LLM
Returns:
GeneratedExtractor object
"""
action = feature.get('action', '')
description = feature.get('description', '')
params = feature.get('params', {})
# Prepare request for Phase 3 research agent
research_request = {
'action': action,
'domain': 'result_extraction',
'description': description,
'params': params
}
# Use Phase 3 research agent to find/generate extraction pattern
logger.info(f"Researching extraction pattern for: {action}")
pattern = self.research_agent.research_extraction(research_request)
# Generate complete extractor code
logger.info(f"Generating extractor code using pattern: {pattern.name}")
extractor_code = self.research_agent.generate_extractor_code(research_request)
# NEW ARCHITECTURE: Use centralized library
if self.use_core_library and self.library:
# Add to/retrieve from core library (deduplication happens here)
file_path = self.library.get_or_create(feature, extractor_code)
# Track signature for study manifest
signature = self.library._compute_signature(feature)
self.extractor_signatures.append(signature)
logger.info(f"Extractor available in core library: {file_path}")
else:
# LEGACY: Save to per-study directory
filename = self._action_to_filename(action)
file_path = self.extractors_dir / filename
logger.info(f"Saving extractor to study directory (legacy): {file_path}")
with open(file_path, 'w') as f:
f.write(extractor_code)
# Extract function name from generated code
function_name = self._extract_function_name(extractor_code)
# Create GeneratedExtractor object
extractor = GeneratedExtractor(
name=action,
file_path=file_path,
function_name=function_name,
extraction_pattern=pattern,
params=params
)
# Register in session
self.extractors[action] = extractor
logger.info(f"Successfully generated extractor: {action}{function_name}")
return extractor
def _action_to_filename(self, action: str) -> str:
"""Convert action name to Python filename."""
# e.g., "extract_1d_element_forces" → "extract_1d_element_forces.py"
return f"{action}.py"
def _extract_function_name(self, code: str) -> str:
"""Extract the main function name from generated code."""
# Look for "def function_name(" pattern
import re
match = re.search(r'def\s+(\w+)\s*\(', code)
if match:
return match.group(1)
return "extract" # fallback
def load_extractor(self, extractor_name: str) -> Any:
"""
Dynamically load a generated extractor module.
Args:
extractor_name: Name of the extractor (action name)
Returns:
The extractor function (callable)
"""
if extractor_name not in self.extractors:
raise ValueError(f"Extractor '{extractor_name}' not found in registry")
extractor = self.extractors[extractor_name]
# Dynamic import
spec = importlib.util.spec_from_file_location(extractor_name, extractor.file_path)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load extractor from {extractor.file_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Get the function
if not hasattr(module, extractor.function_name):
raise AttributeError(f"Function '{extractor.function_name}' not found in {extractor_name}")
return getattr(module, extractor.function_name)
def execute_extractor(self,
extractor_name: str,
op2_file: Path,
**kwargs) -> Dict[str, Any]:
"""
Load and execute an extractor.
Args:
extractor_name: Name of the extractor
op2_file: Path to OP2 file
**kwargs: Additional arguments for the extractor
Returns:
Extraction results dictionary
"""
logger.info(f"Executing extractor: {extractor_name}")
# Load the extractor function
extractor_func = self.load_extractor(extractor_name)
# Get extractor params - filter to only relevant params for each pattern
extractor = self.extractors[extractor_name]
pattern_name = extractor.extraction_pattern.name
# Pattern-specific parameter filtering
if pattern_name == 'displacement':
# Displacement extractor only takes op2_file and subcase
params = {k: v for k, v in kwargs.items() if k in ['subcase']}
elif pattern_name == 'cbar_force':
# CBAR force takes direction, subcase
params = {k: v for k, v in kwargs.items() if k in ['direction', 'subcase']}
elif pattern_name == 'solid_stress':
# Solid stress takes element_type, subcase
params = {k: v for k, v in kwargs.items() if k in ['element_type', 'subcase']}
else:
# Generic - pass all kwargs
params = kwargs.copy()
# Execute
try:
result = extractor_func(op2_file, **params)
logger.info(f"Extraction successful: {extractor_name}")
return result
except Exception as e:
logger.error(f"Extraction failed: {extractor_name} - {e}")
raise
def get_summary(self) -> Dict[str, Any]:
"""Get summary of all generated extractors."""
return {
'total_extractors': len(self.extractors),
'extractors': [
{
'name': name,
'file': str(ext.file_path),
'function': ext.function_name,
'pattern': ext.extraction_pattern.name,
'params': ext.params
}
for name, ext in self.extractors.items()
]
}
def main():
"""Test the extractor orchestrator with Phase 2.7 example."""
print("=" * 80)
print("Phase 3.1: Extractor Orchestrator Test")
print("=" * 80)
print()
# Phase 2.7 LLM output example (CBAR forces)
llm_output = {
"engineering_features": [
{
"action": "extract_1d_element_forces",
"domain": "result_extraction",
"description": "Extract element forces from CBAR in Z direction from OP2",
"params": {
"element_types": ["CBAR"],
"result_type": "element_force",
"direction": "Z"
}
}
],
"inline_calculations": [
{
"action": "calculate_average",
"params": {"input": "forces_z", "operation": "mean"}
},
{
"action": "find_minimum",
"params": {"input": "forces_z", "operation": "min"}
}
],
"post_processing_hooks": [
{
"action": "comparison",
"params": {
"inputs": ["min_force", "avg_force"],
"operation": "ratio",
"output_name": "min_to_avg_ratio"
}
}
]
}
print("Test Input: Phase 2.7 LLM Output")
print(f" Engineering features: {len(llm_output['engineering_features'])}")
print(f" Inline calculations: {len(llm_output['inline_calculations'])}")
print(f" Post-processing hooks: {len(llm_output['post_processing_hooks'])}")
print()
# Initialize orchestrator
orchestrator = ExtractorOrchestrator()
# Process LLM workflow
print("1. Processing LLM workflow...")
extractors = orchestrator.process_llm_workflow(llm_output)
print(f" Generated {len(extractors)} extractors:")
for ext in extractors:
print(f" - {ext.name}{ext.function_name}() in {ext.file_path.name}")
print()
# Show summary
print("2. Orchestrator summary:")
summary = orchestrator.get_summary()
print(f" Total extractors: {summary['total_extractors']}")
for ext_info in summary['extractors']:
print(f" {ext_info['name']}:")
print(f" Pattern: {ext_info['pattern']}")
print(f" File: {ext_info['file']}")
print(f" Function: {ext_info['function']}")
print()
print("=" * 80)
print("Phase 3.1 Test Complete!")
print("=" * 80)
print()
print("Next step: Test extractor execution on real OP2 file")
if __name__ == '__main__':
main()