""" Extractor Orchestrator - Phase 3.1 Integrates Phase 2.7 LLM workflow analysis with Phase 3 pyNastran research agent to automatically generate and manage OP2 extractors. This orchestrator: 1. Takes Phase 2.7 LLM output (engineering_features) 2. Uses Phase 3 research agent to generate extractors 3. Saves generated extractors to result_extractors/ 4. Provides dynamic loading for optimization runtime Author: Atomizer Development Team Version: 0.1.0 (Phase 3.1) Last Updated: 2025-01-16 """ from typing import Dict, Any, List, Optional from pathlib import Path import importlib.util import logging from dataclasses import dataclass from optimization_engine.future.pynastran_research_agent import PyNastranResearchAgent, ExtractionPattern from optimization_engine.extractors.extractor_library import ExtractorLibrary, create_study_manifest logger = logging.getLogger(__name__) @dataclass class GeneratedExtractor: """Represents a generated extractor module.""" name: str file_path: Path function_name: str extraction_pattern: ExtractionPattern params: Dict[str, Any] class ExtractorOrchestrator: """ Orchestrates automatic extractor generation from LLM workflow analysis. This class bridges Phase 2.7 (LLM analysis) and Phase 3 (pyNastran research) to create a complete end-to-end automation pipeline. """ def __init__(self, extractors_dir: Optional[Path] = None, knowledge_base_path: Optional[Path] = None, use_core_library: bool = True): """ Initialize the orchestrator. Args: extractors_dir: Directory to save study manifest (not extractor code!) knowledge_base_path: Path to pyNastran pattern knowledge base use_core_library: Use centralized library (True) or per-study generation (False, legacy) """ self.use_core_library = use_core_library if extractors_dir is None: extractors_dir = Path(__file__).parent / "result_extractors" / "generated" self.extractors_dir = Path(extractors_dir) self.extractors_dir.mkdir(parents=True, exist_ok=True) # Initialize Phase 3 research agent self.research_agent = PyNastranResearchAgent(knowledge_base_path) # Initialize centralized library (NEW ARCHITECTURE) if use_core_library: self.library = ExtractorLibrary() logger.info(f"Using centralized extractor library: {self.library.library_dir}") else: self.library = None logger.warning("Using legacy per-study extractor generation (not recommended)") # Registry of generated extractors for this session self.extractors: Dict[str, GeneratedExtractor] = {} self.extractor_signatures: List[str] = [] # Track which library extractors were used logger.info(f"ExtractorOrchestrator initialized") def process_llm_workflow(self, llm_output: Dict[str, Any]) -> List[GeneratedExtractor]: """ Process Phase 2.7 LLM workflow output and generate all required extractors. Args: llm_output: Dict with structure: { "engineering_features": [ { "action": "extract_1d_element_forces", "domain": "result_extraction", "description": "Extract element forces from CBAR in Z direction", "params": { "element_types": ["CBAR"], "result_type": "element_force", "direction": "Z" } } ], "inline_calculations": [...], "post_processing_hooks": [...], "optimization": {...} } Returns: List of GeneratedExtractor objects """ engineering_features = llm_output.get('engineering_features', []) generated_extractors = [] for feature in engineering_features: domain = feature.get('domain', '') # Only process result extraction features if domain == 'result_extraction': logger.info(f"Processing extraction feature: {feature.get('action')}") try: extractor = self.generate_extractor_from_feature(feature) generated_extractors.append(extractor) except Exception as e: logger.error(f"Failed to generate extractor for {feature.get('action')}: {e}") # Continue with other features # NEW ARCHITECTURE: Create study manifest (not copy code) if self.use_core_library and self.library and self.extractor_signatures: create_study_manifest(self.extractor_signatures, self.extractors_dir) logger.info("Study manifest created - extractors referenced from core library") logger.info(f"Generated {len(generated_extractors)} extractors") return generated_extractors def generate_extractor_from_feature(self, feature: Dict[str, Any]) -> GeneratedExtractor: """ Generate a single extractor from an engineering feature. Args: feature: Engineering feature dict from Phase 2.7 LLM Returns: GeneratedExtractor object """ action = feature.get('action', '') description = feature.get('description', '') params = feature.get('params', {}) # Prepare request for Phase 3 research agent research_request = { 'action': action, 'domain': 'result_extraction', 'description': description, 'params': params } # Use Phase 3 research agent to find/generate extraction pattern logger.info(f"Researching extraction pattern for: {action}") pattern = self.research_agent.research_extraction(research_request) # Generate complete extractor code logger.info(f"Generating extractor code using pattern: {pattern.name}") extractor_code = self.research_agent.generate_extractor_code(research_request) # NEW ARCHITECTURE: Use centralized library if self.use_core_library and self.library: # Add to/retrieve from core library (deduplication happens here) file_path = self.library.get_or_create(feature, extractor_code) # Track signature for study manifest signature = self.library._compute_signature(feature) self.extractor_signatures.append(signature) logger.info(f"Extractor available in core library: {file_path}") else: # LEGACY: Save to per-study directory filename = self._action_to_filename(action) file_path = self.extractors_dir / filename logger.info(f"Saving extractor to study directory (legacy): {file_path}") with open(file_path, 'w') as f: f.write(extractor_code) # Extract function name from generated code function_name = self._extract_function_name(extractor_code) # Create GeneratedExtractor object extractor = GeneratedExtractor( name=action, file_path=file_path, function_name=function_name, extraction_pattern=pattern, params=params ) # Register in session self.extractors[action] = extractor logger.info(f"Successfully generated extractor: {action} → {function_name}") return extractor def _action_to_filename(self, action: str) -> str: """Convert action name to Python filename.""" # e.g., "extract_1d_element_forces" → "extract_1d_element_forces.py" return f"{action}.py" def _extract_function_name(self, code: str) -> str: """Extract the main function name from generated code.""" # Look for "def function_name(" pattern import re match = re.search(r'def\s+(\w+)\s*\(', code) if match: return match.group(1) return "extract" # fallback def load_extractor(self, extractor_name: str) -> Any: """ Dynamically load a generated extractor module. Args: extractor_name: Name of the extractor (action name) Returns: The extractor function (callable) """ if extractor_name not in self.extractors: raise ValueError(f"Extractor '{extractor_name}' not found in registry") extractor = self.extractors[extractor_name] # Dynamic import spec = importlib.util.spec_from_file_location(extractor_name, extractor.file_path) if spec is None or spec.loader is None: raise ImportError(f"Could not load extractor from {extractor.file_path}") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Get the function if not hasattr(module, extractor.function_name): raise AttributeError(f"Function '{extractor.function_name}' not found in {extractor_name}") return getattr(module, extractor.function_name) def execute_extractor(self, extractor_name: str, op2_file: Path, **kwargs) -> Dict[str, Any]: """ Load and execute an extractor. Args: extractor_name: Name of the extractor op2_file: Path to OP2 file **kwargs: Additional arguments for the extractor Returns: Extraction results dictionary """ logger.info(f"Executing extractor: {extractor_name}") # Load the extractor function extractor_func = self.load_extractor(extractor_name) # Get extractor params - filter to only relevant params for each pattern extractor = self.extractors[extractor_name] pattern_name = extractor.extraction_pattern.name # Pattern-specific parameter filtering if pattern_name == 'displacement': # Displacement extractor only takes op2_file and subcase params = {k: v for k, v in kwargs.items() if k in ['subcase']} elif pattern_name == 'cbar_force': # CBAR force takes direction, subcase params = {k: v for k, v in kwargs.items() if k in ['direction', 'subcase']} elif pattern_name == 'solid_stress': # Solid stress takes element_type, subcase params = {k: v for k, v in kwargs.items() if k in ['element_type', 'subcase']} else: # Generic - pass all kwargs params = kwargs.copy() # Execute try: result = extractor_func(op2_file, **params) logger.info(f"Extraction successful: {extractor_name}") return result except Exception as e: logger.error(f"Extraction failed: {extractor_name} - {e}") raise def get_summary(self) -> Dict[str, Any]: """Get summary of all generated extractors.""" return { 'total_extractors': len(self.extractors), 'extractors': [ { 'name': name, 'file': str(ext.file_path), 'function': ext.function_name, 'pattern': ext.extraction_pattern.name, 'params': ext.params } for name, ext in self.extractors.items() ] } def main(): """Test the extractor orchestrator with Phase 2.7 example.""" print("=" * 80) print("Phase 3.1: Extractor Orchestrator Test") print("=" * 80) print() # Phase 2.7 LLM output example (CBAR forces) llm_output = { "engineering_features": [ { "action": "extract_1d_element_forces", "domain": "result_extraction", "description": "Extract element forces from CBAR in Z direction from OP2", "params": { "element_types": ["CBAR"], "result_type": "element_force", "direction": "Z" } } ], "inline_calculations": [ { "action": "calculate_average", "params": {"input": "forces_z", "operation": "mean"} }, { "action": "find_minimum", "params": {"input": "forces_z", "operation": "min"} } ], "post_processing_hooks": [ { "action": "comparison", "params": { "inputs": ["min_force", "avg_force"], "operation": "ratio", "output_name": "min_to_avg_ratio" } } ] } print("Test Input: Phase 2.7 LLM Output") print(f" Engineering features: {len(llm_output['engineering_features'])}") print(f" Inline calculations: {len(llm_output['inline_calculations'])}") print(f" Post-processing hooks: {len(llm_output['post_processing_hooks'])}") print() # Initialize orchestrator orchestrator = ExtractorOrchestrator() # Process LLM workflow print("1. Processing LLM workflow...") extractors = orchestrator.process_llm_workflow(llm_output) print(f" Generated {len(extractors)} extractors:") for ext in extractors: print(f" - {ext.name} → {ext.function_name}() in {ext.file_path.name}") print() # Show summary print("2. Orchestrator summary:") summary = orchestrator.get_summary() print(f" Total extractors: {summary['total_extractors']}") for ext_info in summary['extractors']: print(f" {ext_info['name']}:") print(f" Pattern: {ext_info['pattern']}") print(f" File: {ext_info['file']}") print(f" Function: {ext_info['function']}") print() print("=" * 80) print("Phase 3.1 Test Complete!") print("=" * 80) print() print("Next step: Test extractor execution on real OP2 file") if __name__ == '__main__': main()