Files

455 lines
15 KiB
Python
Raw Permalink Normal View History

"""
Specification Checker
=====================
Validates atomizer_spec.json (or optimization_config.json) for:
- Schema compliance
- Semantic correctness
- Anti-pattern detection
- Expression existence
This catches configuration errors BEFORE wasting time on failed trials.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class IssueSeverity(str, Enum):
"""Severity level for validation issues."""
ERROR = "error" # Must fix before proceeding
WARNING = "warning" # Should review, but can proceed
INFO = "info" # Informational note
@dataclass
class ValidationIssue:
"""A single validation issue."""
severity: IssueSeverity
code: str
message: str
path: Optional[str] = None # JSON path to the issue
suggestion: Optional[str] = None
def __str__(self) -> str:
prefix = {
IssueSeverity.ERROR: "[ERROR]",
IssueSeverity.WARNING: "[WARN]",
IssueSeverity.INFO: "[INFO]",
}[self.severity]
location = f" at {self.path}" if self.path else ""
return f"{prefix} {self.message}{location}"
@dataclass
class CheckResult:
"""Result of running the spec checker."""
valid: bool
issues: List[ValidationIssue] = field(default_factory=list)
@property
def errors(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == IssueSeverity.ERROR]
@property
def warnings(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == IssueSeverity.WARNING]
def add_error(self, code: str, message: str, path: str = None, suggestion: str = None):
self.issues.append(
ValidationIssue(
severity=IssueSeverity.ERROR,
code=code,
message=message,
path=path,
suggestion=suggestion,
)
)
self.valid = False
def add_warning(self, code: str, message: str, path: str = None, suggestion: str = None):
self.issues.append(
ValidationIssue(
severity=IssueSeverity.WARNING,
code=code,
message=message,
path=path,
suggestion=suggestion,
)
)
def add_info(self, code: str, message: str, path: str = None):
self.issues.append(
ValidationIssue(
severity=IssueSeverity.INFO,
code=code,
message=message,
path=path,
)
)
class SpecChecker:
"""
Validates study specification files.
Checks:
1. Required fields present
2. Design variable bounds valid
3. Expressions exist in model (if introspection available)
4. Extractors available for objectives/constraints
5. Anti-patterns (mass minimization without constraints, etc.)
"""
# Known extractors
KNOWN_EXTRACTORS = {
"extract_mass_from_bdf",
"extract_part_mass",
"extract_displacement",
"extract_solid_stress",
"extract_principal_stress",
"extract_frequency",
"extract_strain_energy",
"extract_temperature",
"extract_zernike_from_op2",
}
def __init__(
self,
spec_path: Optional[Path] = None,
available_expressions: Optional[List[str]] = None,
):
"""
Initialize the checker.
Args:
spec_path: Path to spec file (atomizer_spec.json or optimization_config.json)
available_expressions: List of expression names from introspection
"""
self.spec_path = spec_path
self.available_expressions = available_expressions or []
self.spec: Dict[str, Any] = {}
def check(self, spec_data: Optional[Dict[str, Any]] = None) -> CheckResult:
"""
Run all validation checks.
Args:
spec_data: Spec dict (or load from spec_path if not provided)
Returns:
CheckResult with all issues found
"""
result = CheckResult(valid=True)
# Load spec if not provided
if spec_data:
self.spec = spec_data
elif self.spec_path and self.spec_path.exists():
with open(self.spec_path) as f:
self.spec = json.load(f)
else:
result.add_error("SPEC_NOT_FOUND", "No specification file found")
return result
# Run checks
self._check_required_fields(result)
self._check_design_variables(result)
self._check_objectives(result)
self._check_constraints(result)
self._check_extractors(result)
self._check_anti_patterns(result)
self._check_files(result)
return result
def _check_required_fields(self, result: CheckResult) -> None:
"""Check that required fields are present."""
# Check for design variables
dvs = self.spec.get("design_variables", [])
if not dvs:
result.add_error(
"NO_DESIGN_VARIABLES",
"No design variables defined",
suggestion="Add at least one design variable to optimize",
)
# Check for objectives
objectives = self.spec.get("objectives", [])
if not objectives:
result.add_error(
"NO_OBJECTIVES",
"No objectives defined",
suggestion="Define at least one objective (e.g., minimize mass)",
)
# Check for simulation settings
sim = self.spec.get("simulation", {})
if not sim.get("sim_file"):
result.add_warning(
"NO_SIM_FILE", "No simulation file specified", path="simulation.sim_file"
)
def _check_design_variables(self, result: CheckResult) -> None:
"""Check design variable definitions."""
dvs = self.spec.get("design_variables", [])
for i, dv in enumerate(dvs):
param = dv.get("parameter", dv.get("expression_name", dv.get("name", f"dv_{i}")))
bounds = dv.get("bounds", [])
path = f"design_variables[{i}]"
# Handle both formats: [min, max] or {"min": x, "max": y}
if isinstance(bounds, dict):
min_val = bounds.get("min")
max_val = bounds.get("max")
elif isinstance(bounds, (list, tuple)) and len(bounds) == 2:
min_val, max_val = bounds
else:
result.add_error(
"INVALID_BOUNDS",
f"Design variable '{param}' has invalid bounds format",
path=path,
suggestion="Bounds must be [min, max] or {min: x, max: y}",
)
continue
# Convert to float if strings
try:
min_val = float(min_val)
max_val = float(max_val)
except (TypeError, ValueError):
result.add_error(
"INVALID_BOUNDS_TYPE",
f"Design variable '{param}' bounds must be numeric",
path=path,
)
continue
# Check bounds order
if min_val >= max_val:
result.add_error(
"BOUNDS_INVERTED",
f"Design variable '{param}': min ({min_val}) >= max ({max_val})",
path=path,
suggestion="Ensure min < max",
)
# Check for very wide bounds
if max_val > 0 and min_val > 0:
ratio = max_val / min_val
if ratio > 100:
result.add_warning(
"BOUNDS_TOO_WIDE",
f"Design variable '{param}' has very wide bounds (ratio: {ratio:.1f}x)",
path=path,
suggestion="Consider narrowing bounds for faster convergence",
)
# Check for very narrow bounds
if max_val > 0 and min_val > 0:
ratio = max_val / min_val
if ratio < 1.1:
result.add_warning(
"BOUNDS_TOO_NARROW",
f"Design variable '{param}' has very narrow bounds (ratio: {ratio:.2f}x)",
path=path,
suggestion="Consider widening bounds to explore more design space",
)
# Check expression exists (if introspection available)
if self.available_expressions and param not in self.available_expressions:
result.add_error(
"EXPRESSION_NOT_FOUND",
f"Expression '{param}' not found in model",
path=path,
suggestion=f"Available expressions: {', '.join(self.available_expressions[:5])}...",
)
def _check_objectives(self, result: CheckResult) -> None:
"""Check objective definitions."""
objectives = self.spec.get("objectives", [])
for i, obj in enumerate(objectives):
name = obj.get("name", f"objective_{i}")
# Handle both formats: "goal" or "direction"
goal = obj.get("goal", obj.get("direction", "")).lower()
path = f"objectives[{i}]"
# Check goal is valid
if goal not in ("minimize", "maximize"):
result.add_error(
"INVALID_GOAL",
f"Objective '{name}' has invalid goal: '{goal}'",
path=path,
suggestion="Use 'minimize' or 'maximize'",
)
# Check extraction is defined
extraction = obj.get("extraction", {})
if not extraction.get("action"):
result.add_warning(
"NO_EXTRACTOR",
f"Objective '{name}' has no extractor specified",
path=path,
)
def _check_constraints(self, result: CheckResult) -> None:
"""Check constraint definitions."""
constraints = self.spec.get("constraints", [])
for i, const in enumerate(constraints):
name = const.get("name", f"constraint_{i}")
const_type = const.get("type", "").lower()
threshold = const.get("threshold")
path = f"constraints[{i}]"
# Check type is valid
if const_type not in ("less_than", "greater_than", "equal_to"):
result.add_warning(
"INVALID_CONSTRAINT_TYPE",
f"Constraint '{name}' has unusual type: '{const_type}'",
path=path,
suggestion="Use 'less_than' or 'greater_than'",
)
# Check threshold is defined
if threshold is None:
result.add_error(
"NO_THRESHOLD",
f"Constraint '{name}' has no threshold defined",
path=path,
)
def _check_extractors(self, result: CheckResult) -> None:
"""Check that referenced extractors exist."""
# Check objective extractors
for obj in self.spec.get("objectives", []):
extraction = obj.get("extraction", {})
action = extraction.get("action", "")
if action and action not in self.KNOWN_EXTRACTORS:
result.add_warning(
"UNKNOWN_EXTRACTOR",
f"Extractor '{action}' is not in the standard library",
suggestion="Ensure custom extractor is available",
)
# Check constraint extractors
for const in self.spec.get("constraints", []):
extraction = const.get("extraction", {})
action = extraction.get("action", "")
if action and action not in self.KNOWN_EXTRACTORS:
result.add_warning(
"UNKNOWN_EXTRACTOR",
f"Extractor '{action}' is not in the standard library",
)
def _check_anti_patterns(self, result: CheckResult) -> None:
"""Check for common optimization anti-patterns."""
objectives = self.spec.get("objectives", [])
constraints = self.spec.get("constraints", [])
# Anti-pattern: Mass minimization without stress/displacement constraints
has_mass_objective = any(
"mass" in obj.get("name", "").lower() and obj.get("goal") == "minimize"
for obj in objectives
)
has_structural_constraint = any(
any(
kw in const.get("name", "").lower()
for kw in ["stress", "displacement", "deflection"]
)
for const in constraints
)
if has_mass_objective and not has_structural_constraint:
result.add_warning(
"MASS_NO_CONSTRAINT",
"Mass minimization without structural constraints",
suggestion="Add stress or displacement constraints to prevent over-optimization",
)
# Anti-pattern: Too many design variables for trial count
n_dvs = len(self.spec.get("design_variables", []))
n_trials = self.spec.get("optimization_settings", {}).get("n_trials", 100)
if n_dvs > 0 and n_trials / n_dvs < 10:
result.add_warning(
"LOW_TRIALS_PER_DV",
f"Only {n_trials / n_dvs:.1f} trials per design variable",
suggestion=f"Consider increasing trials to at least {n_dvs * 20} for better coverage",
)
# Anti-pattern: Too many objectives
n_objectives = len(objectives)
if n_objectives > 3:
result.add_warning(
"TOO_MANY_OBJECTIVES",
f"{n_objectives} objectives may lead to sparse Pareto front",
suggestion="Consider consolidating or using weighted objectives",
)
def _check_files(self, result: CheckResult) -> None:
"""Check that referenced files exist."""
if not self.spec_path:
return
study_dir = self.spec_path.parent.parent # Assuming spec is in 1_setup/
sim = self.spec.get("simulation", {})
sim_file = sim.get("sim_file")
if sim_file:
# Check multiple possible locations
possible_paths = [
study_dir / "1_model" / sim_file,
study_dir / "1_setup" / "model" / sim_file,
study_dir / sim_file,
]
found = any(p.exists() for p in possible_paths)
if not found:
result.add_error(
"SIM_FILE_NOT_FOUND",
f"Simulation file not found: {sim_file}",
path="simulation.sim_file",
suggestion="Ensure model files are copied to study directory",
)
def validate_spec(spec_path: Path, expressions: List[str] = None) -> CheckResult:
"""
Convenience function to validate a spec file.
Args:
spec_path: Path to spec file
expressions: List of available expressions (from introspection)
Returns:
CheckResult with validation issues
"""
checker = SpecChecker(spec_path, expressions)
return checker.check()