""" AtomizerSpec v2.0 Validator Provides comprehensive validation including: - JSON Schema validation - Pydantic model validation - Semantic validation (bounds, references, dependencies) - Extractor-specific validation """ import json from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from pydantic import ValidationError as PydanticValidationError try: import jsonschema HAS_JSONSCHEMA = True except ImportError: HAS_JSONSCHEMA = False from .spec_models import ( AtomizerSpec, ValidationReport, ValidationError, ValidationWarning, ValidationSummary, ExtractorType, AlgorithmType, ConstraintType, ) class SpecValidationError(Exception): """Raised when spec validation fails.""" def __init__(self, message: str, errors: List[ValidationError] = None): super().__init__(message) self.errors = errors or [] class SpecValidator: """ Validates AtomizerSpec v2.0 configurations. Provides three levels of validation: 1. JSON Schema validation (structural) 2. Pydantic model validation (type safety) 3. Semantic validation (business logic) """ # Path to JSON Schema file SCHEMA_PATH = Path(__file__).parent.parent / "schemas" / "atomizer_spec_v2.json" def __init__(self): """Initialize validator with schema.""" self._schema: Optional[Dict] = None @property def schema(self) -> Dict: """Lazy load the JSON Schema.""" if self._schema is None: if self.SCHEMA_PATH.exists(): with open(self.SCHEMA_PATH) as f: self._schema = json.load(f) else: self._schema = {} return self._schema def validate( self, spec_data: Union[Dict[str, Any], AtomizerSpec], strict: bool = True ) -> ValidationReport: """ Validate a spec and return a detailed report. Args: spec_data: Either a dict or AtomizerSpec instance strict: If True, raise exception on errors; if False, return report only Returns: ValidationReport with errors, warnings, and summary Raises: SpecValidationError: If strict=True and validation fails """ errors: List[ValidationError] = [] warnings: List[ValidationWarning] = [] # Convert to dict if needed if isinstance(spec_data, AtomizerSpec): data = spec_data.model_dump(mode='json') else: data = spec_data # Phase 1: JSON Schema validation schema_errors = self._validate_json_schema(data) errors.extend(schema_errors) # Phase 2: Pydantic model validation (only if schema passes) if not schema_errors: pydantic_errors = self._validate_pydantic(data) errors.extend(pydantic_errors) # Phase 3: Semantic validation (only if pydantic passes) if not errors: spec = AtomizerSpec.model_validate(data) semantic_errors, semantic_warnings = self._validate_semantic(spec) errors.extend(semantic_errors) warnings.extend(semantic_warnings) # Build summary summary = self._build_summary(data) # Build report report = ValidationReport( valid=len(errors) == 0, errors=errors, warnings=warnings, summary=summary ) # Raise if strict mode and errors found if strict and not report.valid: error_messages = "; ".join(e.message for e in report.errors[:3]) raise SpecValidationError( f"Spec validation failed: {error_messages}", errors=report.errors ) return report def validate_partial( self, path: str, value: Any, current_spec: AtomizerSpec ) -> Tuple[bool, List[str]]: """ Validate a partial update before applying. Args: path: JSONPath to the field being updated value: New value current_spec: Current full spec Returns: Tuple of (is_valid, list of error messages) """ errors = [] # Parse path parts = self._parse_path(path) if not parts: return False, ["Invalid path format"] # Get target type from path root = parts[0] # Validate based on root section if root == "design_variables": errors.extend(self._validate_dv_update(parts, value, current_spec)) elif root == "extractors": errors.extend(self._validate_extractor_update(parts, value, current_spec)) elif root == "objectives": errors.extend(self._validate_objective_update(parts, value, current_spec)) elif root == "constraints": errors.extend(self._validate_constraint_update(parts, value, current_spec)) elif root == "optimization": errors.extend(self._validate_optimization_update(parts, value)) elif root == "meta": errors.extend(self._validate_meta_update(parts, value)) return len(errors) == 0, errors def _validate_json_schema(self, data: Dict) -> List[ValidationError]: """Validate against JSON Schema.""" errors = [] if not HAS_JSONSCHEMA or not self.schema: return errors # Skip if jsonschema not available try: jsonschema.validate(instance=data, schema=self.schema) except jsonschema.ValidationError as e: errors.append(ValidationError( type="schema", path=list(e.absolute_path), message=e.message )) except jsonschema.SchemaError as e: errors.append(ValidationError( type="schema", path=[], message=f"Invalid schema: {e.message}" )) return errors def _validate_pydantic(self, data: Dict) -> List[ValidationError]: """Validate using Pydantic models.""" errors = [] try: AtomizerSpec.model_validate(data) except PydanticValidationError as e: for err in e.errors(): errors.append(ValidationError( type="schema", path=[str(p) for p in err.get("loc", [])], message=err.get("msg", "Validation error") )) return errors def _validate_semantic( self, spec: AtomizerSpec ) -> Tuple[List[ValidationError], List[ValidationWarning]]: """ Perform semantic validation. Checks business logic and constraints that can't be expressed in schema. """ errors: List[ValidationError] = [] warnings: List[ValidationWarning] = [] # Validate design variable bounds errors.extend(self._validate_dv_bounds(spec)) # Validate extractor configurations errors.extend(self._validate_extractor_configs(spec)) warnings.extend(self._warn_extractor_configs(spec)) # Validate reference integrity (done in Pydantic, but double-check) errors.extend(self._validate_references(spec)) # Validate optimization settings errors.extend(self._validate_optimization_settings(spec)) warnings.extend(self._warn_optimization_settings(spec)) # Validate canvas edges warnings.extend(self._validate_canvas_edges(spec)) # Check for duplicate IDs errors.extend(self._validate_unique_ids(spec)) # Validate custom function syntax errors.extend(self._validate_custom_functions(spec)) return errors, warnings def _validate_dv_bounds(self, spec: AtomizerSpec) -> List[ValidationError]: """Validate design variable bounds.""" errors = [] for i, dv in enumerate(spec.design_variables): # Check baseline within bounds if dv.baseline is not None: if dv.baseline < dv.bounds.min or dv.baseline > dv.bounds.max: errors.append(ValidationError( type="semantic", path=["design_variables", str(i), "baseline"], message=f"Baseline {dv.baseline} outside bounds [{dv.bounds.min}, {dv.bounds.max}]" )) # Check step size for integer type if dv.type.value == "integer": range_size = dv.bounds.max - dv.bounds.min if range_size < 1: errors.append(ValidationError( type="semantic", path=["design_variables", str(i), "bounds"], message="Integer variable must have range >= 1" )) return errors def _validate_extractor_configs(self, spec: AtomizerSpec) -> List[ValidationError]: """Validate extractor-specific configurations.""" errors = [] for i, ext in enumerate(spec.extractors): # Zernike extractors need specific config if ext.type in [ExtractorType.ZERNIKE_OPD, ExtractorType.ZERNIKE_CSV]: if not ext.config: errors.append(ValidationError( type="semantic", path=["extractors", str(i), "config"], message=f"Zernike extractor requires config with radius settings" )) elif ext.config: if ext.config.inner_radius_mm is None: errors.append(ValidationError( type="semantic", path=["extractors", str(i), "config", "inner_radius_mm"], message="Zernike extractor requires inner_radius_mm" )) if ext.config.outer_radius_mm is None: errors.append(ValidationError( type="semantic", path=["extractors", str(i), "config", "outer_radius_mm"], message="Zernike extractor requires outer_radius_mm" )) # Mass expression extractor needs expression_name if ext.type == ExtractorType.MASS_EXPRESSION: if not ext.config or not ext.config.expression_name: errors.append(ValidationError( type="semantic", path=["extractors", str(i), "config", "expression_name"], message="Mass expression extractor requires expression_name in config" )) return errors def _warn_extractor_configs(self, spec: AtomizerSpec) -> List[ValidationWarning]: """Generate warnings for extractor configurations.""" warnings = [] for i, ext in enumerate(spec.extractors): # Zernike mode count warning if ext.type in [ExtractorType.ZERNIKE_OPD, ExtractorType.ZERNIKE_CSV]: if ext.config and ext.config.n_modes: if ext.config.n_modes > 66: warnings.append(ValidationWarning( type="performance", path=["extractors", str(i), "config", "n_modes"], message=f"n_modes={ext.config.n_modes} is high; consider <=66 for performance" )) return warnings def _validate_references(self, spec: AtomizerSpec) -> List[ValidationError]: """Validate reference integrity.""" errors = [] # Collect all valid IDs dv_ids = {dv.id for dv in spec.design_variables} ext_ids = {ext.id for ext in spec.extractors} ext_outputs: Dict[str, set] = {} for ext in spec.extractors: ext_outputs[ext.id] = {o.name for o in ext.outputs} # Validate canvas edges if spec.canvas and spec.canvas.edges: all_ids = dv_ids | ext_ids all_ids.add("model") all_ids.add("solver") all_ids.add("optimization") all_ids.update(obj.id for obj in spec.objectives) if spec.constraints: all_ids.update(con.id for con in spec.constraints) for i, edge in enumerate(spec.canvas.edges): if edge.source not in all_ids: errors.append(ValidationError( type="reference", path=["canvas", "edges", str(i), "source"], message=f"Edge source '{edge.source}' not found" )) if edge.target not in all_ids: errors.append(ValidationError( type="reference", path=["canvas", "edges", str(i), "target"], message=f"Edge target '{edge.target}' not found" )) return errors def _validate_optimization_settings(self, spec: AtomizerSpec) -> List[ValidationError]: """Validate optimization settings.""" errors = [] algo_type = spec.optimization.algorithm.type # NSGA-II requires multiple objectives if algo_type == AlgorithmType.NSGA_II and len(spec.objectives) < 2: errors.append(ValidationError( type="semantic", path=["optimization", "algorithm", "type"], message="NSGA-II requires at least 2 objectives" )) return errors def _warn_optimization_settings(self, spec: AtomizerSpec) -> List[ValidationWarning]: """Generate warnings for optimization settings.""" warnings = [] budget = spec.optimization.budget # Warn about small trial budgets if budget.max_trials and budget.max_trials < 20: warnings.append(ValidationWarning( type="recommendation", path=["optimization", "budget", "max_trials"], message=f"max_trials={budget.max_trials} is low; recommend >= 20 for convergence" )) # Warn about large design space with small budget num_dvs = len(spec.get_enabled_design_variables()) if budget.max_trials and num_dvs > 5 and budget.max_trials < num_dvs * 10: warnings.append(ValidationWarning( type="recommendation", path=["optimization", "budget", "max_trials"], message=f"{num_dvs} DVs suggest at least {num_dvs * 10} trials" )) return warnings def _validate_canvas_edges(self, spec: AtomizerSpec) -> List[ValidationWarning]: """Validate canvas edge structure.""" warnings = [] if not spec.canvas or not spec.canvas.edges: warnings.append(ValidationWarning( type="completeness", path=["canvas", "edges"], message="No canvas edges defined; canvas may not render correctly" )) return warnings def _validate_unique_ids(self, spec: AtomizerSpec) -> List[ValidationError]: """Validate that all IDs are unique.""" errors = [] seen_ids: Dict[str, str] = {} # Check all ID-bearing elements for i, dv in enumerate(spec.design_variables): if dv.id in seen_ids: errors.append(ValidationError( type="semantic", path=["design_variables", str(i), "id"], message=f"Duplicate ID '{dv.id}' (also in {seen_ids[dv.id]})" )) seen_ids[dv.id] = f"design_variables[{i}]" for i, ext in enumerate(spec.extractors): if ext.id in seen_ids: errors.append(ValidationError( type="semantic", path=["extractors", str(i), "id"], message=f"Duplicate ID '{ext.id}' (also in {seen_ids[ext.id]})" )) seen_ids[ext.id] = f"extractors[{i}]" for i, obj in enumerate(spec.objectives): if obj.id in seen_ids: errors.append(ValidationError( type="semantic", path=["objectives", str(i), "id"], message=f"Duplicate ID '{obj.id}' (also in {seen_ids[obj.id]})" )) seen_ids[obj.id] = f"objectives[{i}]" if spec.constraints: for i, con in enumerate(spec.constraints): if con.id in seen_ids: errors.append(ValidationError( type="semantic", path=["constraints", str(i), "id"], message=f"Duplicate ID '{con.id}' (also in {seen_ids[con.id]})" )) seen_ids[con.id] = f"constraints[{i}]" return errors def _validate_custom_functions(self, spec: AtomizerSpec) -> List[ValidationError]: """Validate custom function Python syntax.""" errors = [] for i, ext in enumerate(spec.extractors): if ext.type == ExtractorType.CUSTOM_FUNCTION and ext.function: if ext.function.source_code: try: compile(ext.function.source_code, f"", "exec") except SyntaxError as e: errors.append(ValidationError( type="semantic", path=["extractors", str(i), "function", "source_code"], message=f"Python syntax error: {e.msg} at line {e.lineno}" )) return errors def _build_summary(self, data: Dict) -> ValidationSummary: """Build validation summary.""" extractors = data.get("extractors", []) custom_count = sum( 1 for e in extractors if e.get("type") == "custom_function" or not e.get("builtin", True) ) return ValidationSummary( design_variables=len(data.get("design_variables", [])), extractors=len(extractors), objectives=len(data.get("objectives", [])), constraints=len(data.get("constraints", []) or []), custom_functions=custom_count ) def _parse_path(self, path: str) -> List[str]: """Parse a JSONPath-style path into parts.""" import re # Handle both dot notation and bracket notation # e.g., "design_variables[0].bounds.max" or "objectives.0.weight" parts = [] for part in re.split(r'\.|\[|\]', path): if part: parts.append(part) return parts def _validate_dv_update( self, parts: List[str], value: Any, spec: AtomizerSpec ) -> List[str]: """Validate a design variable update.""" errors = [] if len(parts) >= 2: try: idx = int(parts[1]) if idx >= len(spec.design_variables): errors.append(f"Design variable index {idx} out of range") except ValueError: errors.append(f"Invalid design variable index: {parts[1]}") return errors def _validate_extractor_update( self, parts: List[str], value: Any, spec: AtomizerSpec ) -> List[str]: """Validate an extractor update.""" errors = [] if len(parts) >= 2: try: idx = int(parts[1]) if idx >= len(spec.extractors): errors.append(f"Extractor index {idx} out of range") except ValueError: errors.append(f"Invalid extractor index: {parts[1]}") return errors def _validate_objective_update( self, parts: List[str], value: Any, spec: AtomizerSpec ) -> List[str]: """Validate an objective update.""" errors = [] if len(parts) >= 2: try: idx = int(parts[1]) if idx >= len(spec.objectives): errors.append(f"Objective index {idx} out of range") except ValueError: errors.append(f"Invalid objective index: {parts[1]}") # Validate weight if len(parts) >= 3 and parts[2] == "weight": if not isinstance(value, (int, float)) or value < 0: errors.append("Weight must be a non-negative number") return errors def _validate_constraint_update( self, parts: List[str], value: Any, spec: AtomizerSpec ) -> List[str]: """Validate a constraint update.""" errors = [] if not spec.constraints: errors.append("No constraints defined") return errors if len(parts) >= 2: try: idx = int(parts[1]) if idx >= len(spec.constraints): errors.append(f"Constraint index {idx} out of range") except ValueError: errors.append(f"Invalid constraint index: {parts[1]}") return errors def _validate_optimization_update( self, parts: List[str], value: Any ) -> List[str]: """Validate an optimization update.""" errors = [] if len(parts) >= 2: if parts[1] == "algorithm" and len(parts) >= 3: if parts[2] == "type": valid_types = [t.value for t in AlgorithmType] if value not in valid_types: errors.append(f"Invalid algorithm type. Valid: {valid_types}") return errors def _validate_meta_update( self, parts: List[str], value: Any ) -> List[str]: """Validate a meta update.""" errors = [] if len(parts) >= 2: if parts[1] == "study_name": import re if not re.match(r"^[a-z0-9_]+$", str(value)): errors.append("study_name must be snake_case (lowercase, numbers, underscores)") return errors # Module-level convenience function def validate_spec( spec_data: Union[Dict[str, Any], AtomizerSpec], strict: bool = True ) -> ValidationReport: """ Validate an AtomizerSpec. Args: spec_data: Spec data (dict or AtomizerSpec) strict: Raise exception on errors Returns: ValidationReport Raises: SpecValidationError: If strict=True and validation fails """ validator = SpecValidator() return validator.validate(spec_data, strict=strict)