""" Study Insights - Base Classes and Infrastructure Study Insights provide physics-focused visualizations for optimization results. Unlike Analysis (optimizer-centric), Insights show the engineering reality of specific designs. Architecture: - StudyInsight: Abstract base class for all insight types - InsightRegistry: Central registry for available insight types - InsightSpec: Config-defined insight specification from optimization_config.json - InsightReport: Aggregates multiple insights into a study report with PDF export - Each insight can generate standalone HTML or Plotly data for dashboard Config Schema (in optimization_config.json): "insights": [ { "type": "zernike_wfe", "name": "WFE at 40 vs 20 deg", "enabled": true, "linked_objective": "rel_filtered_rms_40_vs_20", "config": { "target_subcase": "3", "reference_subcase": "2" }, "include_in_report": true }, { "type": "design_space", "name": "Parameter Exploration", "enabled": true, "linked_objective": null, "config": {}, "include_in_report": true } ] Usage: from optimization_engine.insights import get_insight, list_insights # Get specific insight insight = get_insight('zernike_wfe', study_path) if insight.can_generate(): html_path = insight.generate_html(trial_id=47) plotly_data = insight.get_plotly_data(trial_id=47) # Get insights from config specs = get_configured_insights(study_path) for spec in specs: insight = get_insight(spec.type, study_path) result = insight.generate(spec.to_config()) """ from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Type, Union from datetime import datetime import json @dataclass class InsightSpec: """ Insight specification from optimization_config.json. Defines what insights the user wants for their study, optionally linking them to specific objectives. """ type: str # Insight type ID (e.g., 'zernike_wfe') name: str # User-defined display name enabled: bool = True # Whether to generate this insight linked_objective: Optional[str] = None # Objective name this visualizes (or None) config: Dict[str, Any] = field(default_factory=dict) # Type-specific config include_in_report: bool = True # Include in study report @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'InsightSpec': """Create InsightSpec from config dictionary.""" return cls( type=data['type'], name=data.get('name', data['type']), enabled=data.get('enabled', True), linked_objective=data.get('linked_objective'), config=data.get('config', {}), include_in_report=data.get('include_in_report', True) ) def to_config(self) -> 'InsightConfig': """Convert spec to InsightConfig for generation.""" return InsightConfig(extra=self.config) @dataclass class InsightConfig: """Configuration for an insight instance.""" trial_id: Optional[int] = None # Specific trial to visualize (None = best) colorscale: str = 'Turbo' output_dir: Optional[Path] = None # Where to save HTML (None = study/3_insights/) # Visual settings amplification: float = 1.0 # Deformation scale factor lighting: bool = True # 3D lighting effects # Type-specific config (passed through) extra: Dict[str, Any] = field(default_factory=dict) @dataclass class InsightResult: """Result from generating an insight.""" success: bool insight_type: str = "" # Which insight generated this insight_name: str = "" # Display name html_path: Optional[Path] = None plotly_figure: Optional[Dict[str, Any]] = None # Plotly figure as dict summary: Optional[Dict[str, Any]] = None # Key metrics error: Optional[str] = None linked_objective: Optional[str] = None # Objective this relates to (if any) generated_at: Optional[str] = None # ISO timestamp class StudyInsight(ABC): """ Abstract base class for study-specific physics visualizations. Each insight type provides: - Detection: Can this insight be generated for this study? - HTML generation: Standalone interactive report - Plotly data: For embedding in dashboard - Summary: Key metrics extracted Subclasses must implement: - insight_type: Unique identifier (e.g., 'zernike_wfe') - name: Human-readable name - description: What this insight shows - category: Physics domain (see INSIGHT_CATEGORIES) - applicable_to: List of study types this applies to - can_generate(): Check if study has required data - _generate(): Core generation logic """ # Physics categories for grouping INSIGHT_CATEGORIES = { 'optical': 'Optical', 'structural_static': 'Structural (Static)', 'structural_dynamic': 'Structural (Dynamic)', 'structural_modal': 'Structural (Modal)', 'thermal': 'Thermal', 'kinematic': 'Kinematic', 'design_exploration': 'Design Exploration', 'other': 'Other' } # Class-level metadata (override in subclasses) insight_type: str = "base" name: str = "Base Insight" description: str = "Abstract base insight" category: str = "other" # Physics domain (key from INSIGHT_CATEGORIES) applicable_to: List[str] = [] # e.g., ['mirror', 'structural', 'all'] # Required files/data patterns required_files: List[str] = [] # e.g., ['*.op2', '*.bdf'] def __init__(self, study_path: Path): """ Initialize insight for a specific study. Args: study_path: Path to study directory (studies/{name}/) """ self.study_path = Path(study_path) self.setup_path = self.study_path / "1_setup" self.results_path = self.study_path / "2_results" self.insights_path = self.study_path / "3_insights" # Load study config if available self.config = self._load_study_config() def _load_study_config(self) -> Dict[str, Any]: """Load optimization_config.json if it exists.""" config_path = self.setup_path / "optimization_config.json" if config_path.exists(): with open(config_path) as f: return json.load(f) return {} @abstractmethod def can_generate(self) -> bool: """ Check if this insight can be generated for the study. Returns: True if all required data is available """ pass @abstractmethod def _generate(self, config: InsightConfig) -> InsightResult: """ Core generation logic. Implemented by subclasses. Args: config: Insight configuration Returns: InsightResult with HTML path and/or Plotly data """ pass def generate(self, config: Optional[InsightConfig] = None) -> InsightResult: """ Generate the insight visualization. Args: config: Optional configuration (uses defaults if None) Returns: InsightResult with generated content """ if config is None: config = InsightConfig() # Ensure output directory exists if config.output_dir is None: config.output_dir = self.insights_path config.output_dir.mkdir(parents=True, exist_ok=True) # Check prerequisites if not self.can_generate(): return InsightResult( success=False, error=f"Cannot generate {self.name}: required data not found" ) try: return self._generate(config) except Exception as e: return InsightResult( success=False, error=f"Error generating {self.name}: {str(e)}" ) def generate_html( self, trial_id: Optional[int] = None, **kwargs ) -> Optional[Path]: """ Convenience method to generate standalone HTML. Args: trial_id: Specific trial to visualize (None = best) **kwargs: Additional config options Returns: Path to generated HTML file, or None on failure """ config = InsightConfig(trial_id=trial_id, extra=kwargs) result = self.generate(config) return result.html_path if result.success else None def get_plotly_data( self, trial_id: Optional[int] = None, **kwargs ) -> Optional[Dict[str, Any]]: """ Get Plotly figure data for dashboard embedding. Args: trial_id: Specific trial to visualize (None = best) **kwargs: Additional config options Returns: Plotly figure as dictionary, or None on failure """ config = InsightConfig(trial_id=trial_id, extra=kwargs) result = self.generate(config) return result.plotly_figure if result.success else None def get_summary(self, trial_id: Optional[int] = None) -> Optional[Dict[str, Any]]: """ Get key metrics summary without full visualization. Args: trial_id: Specific trial (None = best) Returns: Dictionary of key metrics """ config = InsightConfig(trial_id=trial_id) result = self.generate(config) return result.summary if result.success else None class InsightRegistry: """ Central registry for available insight types. Usage: registry = InsightRegistry() registry.register(ZernikeWFEInsight) # Get insight for a study insight = registry.get('zernike_wfe', study_path) # List available insights for a study available = registry.list_available(study_path) """ _instance = None _insights: Dict[str, Type[StudyInsight]] = {} def __new__(cls): """Singleton pattern.""" if cls._instance is None: cls._instance = super().__new__(cls) cls._insights = {} return cls._instance def register(self, insight_class: Type[StudyInsight]) -> None: """ Register an insight type. Args: insight_class: StudyInsight subclass to register """ self._insights[insight_class.insight_type] = insight_class def get(self, insight_type: str, study_path: Path) -> Optional[StudyInsight]: """ Get an insight instance for a study. Args: insight_type: Registered insight type ID study_path: Path to study directory Returns: Configured insight instance, or None if not found """ if insight_type not in self._insights: return None return self._insights[insight_type](study_path) def list_all(self) -> List[Dict[str, Any]]: """ List all registered insight types. Returns: List of insight metadata dictionaries """ return [ { 'type': cls.insight_type, 'name': cls.name, 'description': cls.description, 'category': cls.category, 'category_label': StudyInsight.INSIGHT_CATEGORIES.get(cls.category, 'Other'), 'applicable_to': cls.applicable_to } for cls in self._insights.values() ] def list_available(self, study_path: Path, fast_mode: bool = True) -> List[Dict[str, Any]]: """ List insights that can be generated for a specific study. Args: study_path: Path to study directory fast_mode: If True, use quick file existence checks instead of full can_generate() Returns: List of available insight metadata """ study_path = Path(study_path) # Fast mode: check file patterns once, then filter insights by what they need if fast_mode: # Quick scan for data files (non-recursive, just check known locations) has_op2 = self._quick_has_op2(study_path) has_db = (study_path / "2_results" / "study.db").exists() available = [] for insight_type, cls in self._insights.items(): # Quick filter based on required_files attribute required = getattr(cls, 'required_files', []) can_gen = True if '*.op2' in required and not has_op2: can_gen = False if 'study.db' in required and not has_db: can_gen = False if can_gen: available.append({ 'type': insight_type, 'name': cls.name, 'description': cls.description, 'category': getattr(cls, 'category', 'other'), 'category_label': StudyInsight.INSIGHT_CATEGORIES.get( getattr(cls, 'category', 'other'), 'Other' ), 'applicable_to': getattr(cls, 'applicable_to', []) }) return available # Slow mode: full can_generate() check (for accuracy) available = [] for insight_type, cls in self._insights.items(): try: insight = cls(study_path) if insight.can_generate(): available.append({ 'type': insight_type, 'name': cls.name, 'description': cls.description, 'category': getattr(cls, 'category', 'other'), 'category_label': StudyInsight.INSIGHT_CATEGORIES.get( getattr(cls, 'category', 'other'), 'Other' ), 'applicable_to': getattr(cls, 'applicable_to', []) }) except Exception: pass # Skip insights that fail to initialize return available def _quick_has_op2(self, study_path: Path) -> bool: """Quick check if study has OP2 files without recursive search.""" # Check common locations only (non-recursive) check_dirs = [ study_path / "3_results" / "best_design_archive", study_path / "2_iterations", study_path / "1_setup" / "model", ] for check_dir in check_dirs: if not check_dir.exists(): continue # Non-recursive check of immediate children try: for item in check_dir.iterdir(): if item.suffix.lower() == '.op2': return True # Check one level down for iterations if item.is_dir(): for subitem in item.iterdir(): if subitem.suffix.lower() == '.op2': return True except (PermissionError, OSError): continue return False # Global registry instance _registry = InsightRegistry() def register_insight(insight_class: Type[StudyInsight]) -> Type[StudyInsight]: """ Decorator to register an insight class. Usage: @register_insight class MyInsight(StudyInsight): insight_type = 'my_insight' ... """ _registry.register(insight_class) return insight_class def get_insight(insight_type: str, study_path: Path) -> Optional[StudyInsight]: """Get an insight instance by type.""" return _registry.get(insight_type, study_path) def list_insights() -> List[Dict[str, Any]]: """List all registered insight types.""" return _registry.list_all() def list_available_insights(study_path: Path) -> List[Dict[str, Any]]: """List insights available for a specific study.""" return _registry.list_available(study_path) def get_configured_insights(study_path: Path) -> List[InsightSpec]: """ Get insight specifications from study's optimization_config.json. Returns: List of InsightSpec objects defined in the config, or empty list """ study_path = Path(study_path) config_path = study_path / "1_setup" / "optimization_config.json" if not config_path.exists(): return [] with open(config_path) as f: config = json.load(f) insights_config = config.get('insights', []) return [InsightSpec.from_dict(spec) for spec in insights_config] def recommend_insights_for_study(study_path: Path) -> List[Dict[str, Any]]: """ Recommend insights based on study type and objectives. Analyzes the optimization_config.json to suggest appropriate insights. Returns recommendations with reasoning. Returns: List of recommendation dicts with 'type', 'name', 'reason', 'linked_objective' """ study_path = Path(study_path) config_path = study_path / "1_setup" / "optimization_config.json" if not config_path.exists(): return [] with open(config_path) as f: config = json.load(f) recommendations = [] objectives = config.get('objectives', []) # Check for Zernike-related objectives for obj in objectives: obj_name = obj.get('name', '') extractor = obj.get('extractor_config', {}) # WFE objectives -> recommend zernike_wfe if any(kw in obj_name.lower() for kw in ['rms', 'wfe', 'zernike', 'filtered']): target_sc = extractor.get('target_subcase', '') ref_sc = extractor.get('reference_subcase', '') recommendations.append({ 'type': 'zernike_wfe', 'name': f"WFE: {obj.get('description', obj_name)[:50]}", 'reason': f"Visualizes Zernike WFE for objective '{obj_name}'", 'linked_objective': obj_name, 'config': { 'target_subcase': target_sc, 'reference_subcase': ref_sc } }) # Mass objectives -> no direct insight, but note for design space elif 'mass' in obj_name.lower(): pass # Covered by design_space # Stress-related elif any(kw in obj_name.lower() for kw in ['stress', 'von_mises', 'strain']): recommendations.append({ 'type': 'stress_field', 'name': f"Stress: {obj.get('description', obj_name)[:50]}", 'reason': f"Visualizes stress distribution for objective '{obj_name}'", 'linked_objective': obj_name, 'config': {} }) # Frequency/modal elif any(kw in obj_name.lower() for kw in ['freq', 'modal', 'eigen', 'vibration']): recommendations.append({ 'type': 'modal', 'name': f"Modal: {obj.get('description', obj_name)[:50]}", 'reason': f"Shows mode shapes for objective '{obj_name}'", 'linked_objective': obj_name, 'config': {} }) # Temperature/thermal elif any(kw in obj_name.lower() for kw in ['temp', 'thermal', 'heat']): recommendations.append({ 'type': 'thermal', 'name': f"Thermal: {obj.get('description', obj_name)[:50]}", 'reason': f"Shows temperature distribution for objective '{obj_name}'", 'linked_objective': obj_name, 'config': {} }) # Always recommend design_space for any optimization if objectives: recommendations.append({ 'type': 'design_space', 'name': 'Design Space Exploration', 'reason': 'Shows parameter-objective relationships across all trials', 'linked_objective': None, 'config': {} }) return recommendations class InsightReport: """ Aggregates multiple insights into a comprehensive study report. Generates: - Full HTML report with all insights embedded - Individual insight HTMLs in 3_insights/ - PDF export capability (via browser print) - Summary JSON for Results page Usage: report = InsightReport(study_path) report.add_insight(result1) report.add_insight(result2) report.generate_html() # Creates 3_insights/STUDY_INSIGHTS_REPORT.html """ def __init__(self, study_path: Path): self.study_path = Path(study_path) self.insights_path = self.study_path / "3_insights" self.results_path = self.study_path / "3_results" self.results: List[InsightResult] = [] # Load study config for metadata config_path = self.study_path / "1_setup" / "optimization_config.json" if config_path.exists(): with open(config_path) as f: self.config = json.load(f) else: self.config = {} def add_insight(self, result: InsightResult) -> None: """Add an insight result to the report.""" if result.success: self.results.append(result) def generate_all(self, specs: Optional[List[InsightSpec]] = None) -> List[InsightResult]: """ Generate all configured or specified insights. Args: specs: List of InsightSpecs to generate (or None to use config) Returns: List of InsightResult objects """ if specs is None: specs = get_configured_insights(self.study_path) results = [] for spec in specs: if not spec.enabled: continue insight = get_insight(spec.type, self.study_path) if insight is None: results.append(InsightResult( success=False, insight_type=spec.type, insight_name=spec.name, error=f"Unknown insight type: {spec.type}" )) continue if not insight.can_generate(): results.append(InsightResult( success=False, insight_type=spec.type, insight_name=spec.name, error=f"Cannot generate {spec.name}: required data not found" )) continue config = spec.to_config() result = insight.generate(config) result.insight_type = spec.type result.insight_name = spec.name result.linked_objective = spec.linked_objective result.generated_at = datetime.now().isoformat() results.append(result) self.add_insight(result) return results def generate_report_html(self, include_appendix: bool = True) -> Path: """ Generate comprehensive HTML report with all insights. Args: include_appendix: Whether to include full-size insight appendix Returns: Path to generated HTML report """ self.insights_path.mkdir(parents=True, exist_ok=True) study_name = self.config.get('study_name', self.study_path.name) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") # Build HTML html_parts = [self._report_header(study_name, timestamp)] # Executive summary html_parts.append(self._executive_summary()) # Objective-linked insights linked = [r for r in self.results if r.linked_objective] if linked: html_parts.append('

Objective Insights

') for result in linked: html_parts.append(self._insight_section(result, compact=True)) # Standalone insights standalone = [r for r in self.results if not r.linked_objective] if standalone: html_parts.append('

General Insights

') for result in standalone: html_parts.append(self._insight_section(result, compact=True)) # Appendix with full-size figures if include_appendix and self.results: html_parts.append(self._appendix_section()) html_parts.append(self._report_footer()) # Write report report_path = self.insights_path / "STUDY_INSIGHTS_REPORT.html" report_path.write_text('\n'.join(html_parts), encoding='utf-8') # Also write summary JSON for Results page self._write_summary_json() return report_path def _report_header(self, study_name: str, timestamp: str) -> str: """Generate HTML header.""" return f''' Study Insights Report - {study_name}

Study Insights Report

{study_name} | Generated: {timestamp}

''' def _executive_summary(self) -> str: """Generate executive summary section.""" n_objectives = len(self.config.get('objectives', [])) n_insights = len(self.results) linked = len([r for r in self.results if r.linked_objective]) return f'''
{n_insights}
Total Insights
{linked}
Objective-Linked
{n_objectives}
Study Objectives
''' def _insight_section(self, result: InsightResult, compact: bool = False) -> str: """Generate HTML section for a single insight.""" objective_tag = "" if result.linked_objective: objective_tag = f'{result.linked_objective}' # Summary table summary_html = "" if result.summary: rows = "" for key, value in list(result.summary.items())[:6]: if isinstance(value, float): value = f"{value:.4g}" rows += f"{key}{value}" summary_html = f"{rows}
" # Compact plot or placeholder plot_html = "" if result.plotly_figure and compact: plot_id = f"plot_{result.insight_type}_{id(result)}" plot_json = json.dumps(result.plotly_figure) plot_html = f'''
''' return f'''

{result.insight_name}{objective_tag}

{summary_html} {plot_html}
''' def _appendix_section(self) -> str: """Generate appendix with full-size figures.""" html = '

Appendix: Full-Size Visualizations

' for i, result in enumerate(self.results): if not result.plotly_figure: continue plot_id = f"appendix_plot_{i}" plot_json = json.dumps(result.plotly_figure) html += f'''

Appendix {i+1}: {result.insight_name}

''' return html def _report_footer(self) -> str: """Generate HTML footer.""" return '''
''' def _write_summary_json(self) -> None: """Write summary JSON for Results page integration.""" summary = { 'generated_at': datetime.now().isoformat(), 'study_name': self.config.get('study_name', self.study_path.name), 'insights': [] } for result in self.results: summary['insights'].append({ 'type': result.insight_type, 'name': result.insight_name, 'success': result.success, 'linked_objective': result.linked_objective, 'html_path': str(result.html_path) if result.html_path else None, 'summary': result.summary }) summary_path = self.insights_path / "insights_summary.json" with open(summary_path, 'w') as f: json.dump(summary, f, indent=2)