""" Specification Checker ===================== Validates atomizer_spec.json (or optimization_config.json) for: - Schema compliance - Semantic correctness - Anti-pattern detection - Expression existence This catches configuration errors BEFORE wasting time on failed trials. """ from __future__ import annotations import json import logging from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) class IssueSeverity(str, Enum): """Severity level for validation issues.""" ERROR = "error" # Must fix before proceeding WARNING = "warning" # Should review, but can proceed INFO = "info" # Informational note @dataclass class ValidationIssue: """A single validation issue.""" severity: IssueSeverity code: str message: str path: Optional[str] = None # JSON path to the issue suggestion: Optional[str] = None def __str__(self) -> str: prefix = { IssueSeverity.ERROR: "[ERROR]", IssueSeverity.WARNING: "[WARN]", IssueSeverity.INFO: "[INFO]", }[self.severity] location = f" at {self.path}" if self.path else "" return f"{prefix} {self.message}{location}" @dataclass class CheckResult: """Result of running the spec checker.""" valid: bool issues: List[ValidationIssue] = field(default_factory=list) @property def errors(self) -> List[ValidationIssue]: return [i for i in self.issues if i.severity == IssueSeverity.ERROR] @property def warnings(self) -> List[ValidationIssue]: return [i for i in self.issues if i.severity == IssueSeverity.WARNING] def add_error(self, code: str, message: str, path: str = None, suggestion: str = None): self.issues.append( ValidationIssue( severity=IssueSeverity.ERROR, code=code, message=message, path=path, suggestion=suggestion, ) ) self.valid = False def add_warning(self, code: str, message: str, path: str = None, suggestion: str = None): self.issues.append( ValidationIssue( severity=IssueSeverity.WARNING, code=code, message=message, path=path, suggestion=suggestion, ) ) def add_info(self, code: str, message: str, path: str = None): self.issues.append( ValidationIssue( severity=IssueSeverity.INFO, code=code, message=message, path=path, ) ) class SpecChecker: """ Validates study specification files. Checks: 1. Required fields present 2. Design variable bounds valid 3. Expressions exist in model (if introspection available) 4. Extractors available for objectives/constraints 5. Anti-patterns (mass minimization without constraints, etc.) """ # Known extractors KNOWN_EXTRACTORS = { "extract_mass_from_bdf", "extract_part_mass", "extract_displacement", "extract_solid_stress", "extract_principal_stress", "extract_frequency", "extract_strain_energy", "extract_temperature", "extract_zernike_from_op2", } def __init__( self, spec_path: Optional[Path] = None, available_expressions: Optional[List[str]] = None, ): """ Initialize the checker. Args: spec_path: Path to spec file (atomizer_spec.json or optimization_config.json) available_expressions: List of expression names from introspection """ self.spec_path = spec_path self.available_expressions = available_expressions or [] self.spec: Dict[str, Any] = {} def check(self, spec_data: Optional[Dict[str, Any]] = None) -> CheckResult: """ Run all validation checks. Args: spec_data: Spec dict (or load from spec_path if not provided) Returns: CheckResult with all issues found """ result = CheckResult(valid=True) # Load spec if not provided if spec_data: self.spec = spec_data elif self.spec_path and self.spec_path.exists(): with open(self.spec_path) as f: self.spec = json.load(f) else: result.add_error("SPEC_NOT_FOUND", "No specification file found") return result # Run checks self._check_required_fields(result) self._check_design_variables(result) self._check_objectives(result) self._check_constraints(result) self._check_extractors(result) self._check_anti_patterns(result) self._check_files(result) return result def _check_required_fields(self, result: CheckResult) -> None: """Check that required fields are present.""" # Check for design variables dvs = self.spec.get("design_variables", []) if not dvs: result.add_error( "NO_DESIGN_VARIABLES", "No design variables defined", suggestion="Add at least one design variable to optimize", ) # Check for objectives objectives = self.spec.get("objectives", []) if not objectives: result.add_error( "NO_OBJECTIVES", "No objectives defined", suggestion="Define at least one objective (e.g., minimize mass)", ) # Check for simulation settings sim = self.spec.get("simulation", {}) if not sim.get("sim_file"): result.add_warning( "NO_SIM_FILE", "No simulation file specified", path="simulation.sim_file" ) def _check_design_variables(self, result: CheckResult) -> None: """Check design variable definitions.""" dvs = self.spec.get("design_variables", []) for i, dv in enumerate(dvs): param = dv.get("parameter", dv.get("expression_name", dv.get("name", f"dv_{i}"))) bounds = dv.get("bounds", []) path = f"design_variables[{i}]" # Handle both formats: [min, max] or {"min": x, "max": y} if isinstance(bounds, dict): min_val = bounds.get("min") max_val = bounds.get("max") elif isinstance(bounds, (list, tuple)) and len(bounds) == 2: min_val, max_val = bounds else: result.add_error( "INVALID_BOUNDS", f"Design variable '{param}' has invalid bounds format", path=path, suggestion="Bounds must be [min, max] or {min: x, max: y}", ) continue # Convert to float if strings try: min_val = float(min_val) max_val = float(max_val) except (TypeError, ValueError): result.add_error( "INVALID_BOUNDS_TYPE", f"Design variable '{param}' bounds must be numeric", path=path, ) continue # Check bounds order if min_val >= max_val: result.add_error( "BOUNDS_INVERTED", f"Design variable '{param}': min ({min_val}) >= max ({max_val})", path=path, suggestion="Ensure min < max", ) # Check for very wide bounds if max_val > 0 and min_val > 0: ratio = max_val / min_val if ratio > 100: result.add_warning( "BOUNDS_TOO_WIDE", f"Design variable '{param}' has very wide bounds (ratio: {ratio:.1f}x)", path=path, suggestion="Consider narrowing bounds for faster convergence", ) # Check for very narrow bounds if max_val > 0 and min_val > 0: ratio = max_val / min_val if ratio < 1.1: result.add_warning( "BOUNDS_TOO_NARROW", f"Design variable '{param}' has very narrow bounds (ratio: {ratio:.2f}x)", path=path, suggestion="Consider widening bounds to explore more design space", ) # Check expression exists (if introspection available) if self.available_expressions and param not in self.available_expressions: result.add_error( "EXPRESSION_NOT_FOUND", f"Expression '{param}' not found in model", path=path, suggestion=f"Available expressions: {', '.join(self.available_expressions[:5])}...", ) def _check_objectives(self, result: CheckResult) -> None: """Check objective definitions.""" objectives = self.spec.get("objectives", []) for i, obj in enumerate(objectives): name = obj.get("name", f"objective_{i}") # Handle both formats: "goal" or "direction" goal = obj.get("goal", obj.get("direction", "")).lower() path = f"objectives[{i}]" # Check goal is valid if goal not in ("minimize", "maximize"): result.add_error( "INVALID_GOAL", f"Objective '{name}' has invalid goal: '{goal}'", path=path, suggestion="Use 'minimize' or 'maximize'", ) # Check extraction is defined extraction = obj.get("extraction", {}) if not extraction.get("action"): result.add_warning( "NO_EXTRACTOR", f"Objective '{name}' has no extractor specified", path=path, ) def _check_constraints(self, result: CheckResult) -> None: """Check constraint definitions.""" constraints = self.spec.get("constraints", []) for i, const in enumerate(constraints): name = const.get("name", f"constraint_{i}") const_type = const.get("type", "").lower() threshold = const.get("threshold") path = f"constraints[{i}]" # Check type is valid if const_type not in ("less_than", "greater_than", "equal_to"): result.add_warning( "INVALID_CONSTRAINT_TYPE", f"Constraint '{name}' has unusual type: '{const_type}'", path=path, suggestion="Use 'less_than' or 'greater_than'", ) # Check threshold is defined if threshold is None: result.add_error( "NO_THRESHOLD", f"Constraint '{name}' has no threshold defined", path=path, ) def _check_extractors(self, result: CheckResult) -> None: """Check that referenced extractors exist.""" # Check objective extractors for obj in self.spec.get("objectives", []): extraction = obj.get("extraction", {}) action = extraction.get("action", "") if action and action not in self.KNOWN_EXTRACTORS: result.add_warning( "UNKNOWN_EXTRACTOR", f"Extractor '{action}' is not in the standard library", suggestion="Ensure custom extractor is available", ) # Check constraint extractors for const in self.spec.get("constraints", []): extraction = const.get("extraction", {}) action = extraction.get("action", "") if action and action not in self.KNOWN_EXTRACTORS: result.add_warning( "UNKNOWN_EXTRACTOR", f"Extractor '{action}' is not in the standard library", ) def _check_anti_patterns(self, result: CheckResult) -> None: """Check for common optimization anti-patterns.""" objectives = self.spec.get("objectives", []) constraints = self.spec.get("constraints", []) # Anti-pattern: Mass minimization without stress/displacement constraints has_mass_objective = any( "mass" in obj.get("name", "").lower() and obj.get("goal") == "minimize" for obj in objectives ) has_structural_constraint = any( any( kw in const.get("name", "").lower() for kw in ["stress", "displacement", "deflection"] ) for const in constraints ) if has_mass_objective and not has_structural_constraint: result.add_warning( "MASS_NO_CONSTRAINT", "Mass minimization without structural constraints", suggestion="Add stress or displacement constraints to prevent over-optimization", ) # Anti-pattern: Too many design variables for trial count n_dvs = len(self.spec.get("design_variables", [])) n_trials = self.spec.get("optimization_settings", {}).get("n_trials", 100) if n_dvs > 0 and n_trials / n_dvs < 10: result.add_warning( "LOW_TRIALS_PER_DV", f"Only {n_trials / n_dvs:.1f} trials per design variable", suggestion=f"Consider increasing trials to at least {n_dvs * 20} for better coverage", ) # Anti-pattern: Too many objectives n_objectives = len(objectives) if n_objectives > 3: result.add_warning( "TOO_MANY_OBJECTIVES", f"{n_objectives} objectives may lead to sparse Pareto front", suggestion="Consider consolidating or using weighted objectives", ) def _check_files(self, result: CheckResult) -> None: """Check that referenced files exist.""" if not self.spec_path: return study_dir = self.spec_path.parent.parent # Assuming spec is in 1_setup/ sim = self.spec.get("simulation", {}) sim_file = sim.get("sim_file") if sim_file: # Check multiple possible locations possible_paths = [ study_dir / "1_model" / sim_file, study_dir / "1_setup" / "model" / sim_file, study_dir / sim_file, ] found = any(p.exists() for p in possible_paths) if not found: result.add_error( "SIM_FILE_NOT_FOUND", f"Simulation file not found: {sim_file}", path="simulation.sim_file", suggestion="Ensure model files are copied to study directory", ) def validate_spec(spec_path: Path, expressions: List[str] = None) -> CheckResult: """ Convenience function to validate a spec file. Args: spec_path: Path to spec file expressions: List of available expressions (from introspection) Returns: CheckResult with validation issues """ checker = SpecChecker(spec_path, expressions) return checker.check()