feat(config): AtomizerSpec v2.0 Pydantic models, validators, and tests

Config Layer:
- spec_models.py: Pydantic models for AtomizerSpec v2.0
- spec_validator.py: Semantic validation with detailed error reporting

Extractors:
- custom_extractor_loader.py: Runtime custom extractor loading
- spec_extractor_builder.py: Build extractors from spec definitions

Tools:
- migrate_to_spec_v2.py: CLI tool for batch migration

Tests:
- test_migrator.py: Migration tests
- test_spec_manager.py: SpecManager service tests
- test_spec_api.py: REST API tests
- test_mcp_tools.py: MCP tool tests
- test_e2e_unified_config.py: End-to-end config tests
This commit is contained in:
2026-01-20 13:12:03 -05:00
parent 27e78d3d56
commit 6c30224341
10 changed files with 4705 additions and 0 deletions

View File

@@ -0,0 +1,674 @@
"""
AtomizerSpec v2.0 Pydantic Models
These models match the JSON Schema at optimization_engine/schemas/atomizer_spec_v2.json
They provide validation and type safety for the unified configuration system.
"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field, field_validator, model_validator
import re
# ============================================================================
# Enums
# ============================================================================
class SpecCreatedBy(str, Enum):
"""Who/what created the spec."""
CANVAS = "canvas"
CLAUDE = "claude"
API = "api"
MIGRATION = "migration"
MANUAL = "manual"
class SolverType(str, Enum):
"""Supported solver types."""
NASTRAN = "nastran"
NX_NASTRAN = "NX_Nastran"
ABAQUS = "abaqus"
class SubcaseType(str, Enum):
"""Subcase analysis types."""
STATIC = "static"
MODAL = "modal"
THERMAL = "thermal"
BUCKLING = "buckling"
class DesignVariableType(str, Enum):
"""Design variable types."""
CONTINUOUS = "continuous"
INTEGER = "integer"
CATEGORICAL = "categorical"
class ExtractorType(str, Enum):
"""Physics extractor types."""
DISPLACEMENT = "displacement"
FREQUENCY = "frequency"
STRESS = "stress"
MASS = "mass"
MASS_EXPRESSION = "mass_expression"
ZERNIKE_OPD = "zernike_opd"
ZERNIKE_CSV = "zernike_csv"
TEMPERATURE = "temperature"
CUSTOM_FUNCTION = "custom_function"
class OptimizationDirection(str, Enum):
"""Optimization direction."""
MINIMIZE = "minimize"
MAXIMIZE = "maximize"
class ConstraintType(str, Enum):
"""Constraint types."""
HARD = "hard"
SOFT = "soft"
class ConstraintOperator(str, Enum):
"""Constraint comparison operators."""
LE = "<="
GE = ">="
LT = "<"
GT = ">"
EQ = "=="
class PenaltyMethod(str, Enum):
"""Penalty methods for constraints."""
LINEAR = "linear"
QUADRATIC = "quadratic"
EXPONENTIAL = "exponential"
class AlgorithmType(str, Enum):
"""Optimization algorithm types."""
TPE = "TPE"
CMA_ES = "CMA-ES"
NSGA_II = "NSGA-II"
RANDOM_SEARCH = "RandomSearch"
SAT_V3 = "SAT_v3"
GP_BO = "GP-BO"
class SurrogateType(str, Enum):
"""Surrogate model types."""
MLP = "MLP"
GNN = "GNN"
ENSEMBLE = "ensemble"
# ============================================================================
# Position Model
# ============================================================================
class CanvasPosition(BaseModel):
"""Canvas position for nodes."""
x: float = 0
y: float = 0
# ============================================================================
# Meta Models
# ============================================================================
class SpecMeta(BaseModel):
"""Metadata about the spec."""
version: str = Field(
...,
pattern=r"^2\.\d+$",
description="Schema version (e.g., '2.0')"
)
created: Optional[datetime] = Field(
default=None,
description="When the spec was created"
)
modified: Optional[datetime] = Field(
default=None,
description="When the spec was last modified"
)
created_by: Optional[SpecCreatedBy] = Field(
default=None,
description="Who/what created the spec"
)
modified_by: Optional[str] = Field(
default=None,
description="Who/what last modified the spec"
)
study_name: str = Field(
...,
min_length=3,
max_length=100,
pattern=r"^[a-z0-9_]+$",
description="Unique study identifier (snake_case)"
)
description: Optional[str] = Field(
default=None,
max_length=1000,
description="Human-readable description"
)
tags: Optional[List[str]] = Field(
default=None,
description="Tags for categorization"
)
engineering_context: Optional[str] = Field(
default=None,
description="Real-world engineering context"
)
# ============================================================================
# Model Configuration Models
# ============================================================================
class NxPartConfig(BaseModel):
"""NX geometry part file configuration."""
path: Optional[str] = Field(default=None, description="Path to .prt file")
hash: Optional[str] = Field(default=None, description="File hash for change detection")
idealized_part: Optional[str] = Field(default=None, description="Idealized part filename (_i.prt)")
class FemConfig(BaseModel):
"""FEM mesh file configuration."""
path: Optional[str] = Field(default=None, description="Path to .fem file")
element_count: Optional[int] = Field(default=None, description="Number of elements")
node_count: Optional[int] = Field(default=None, description="Number of nodes")
class Subcase(BaseModel):
"""Simulation subcase definition."""
id: int
name: Optional[str] = None
type: Optional[SubcaseType] = None
class SimConfig(BaseModel):
"""Simulation file configuration."""
path: str = Field(..., description="Path to .sim file")
solver: SolverType = Field(..., description="Solver type")
solution_type: Optional[str] = Field(
default=None,
pattern=r"^SOL\d+$",
description="Solution type (e.g., SOL101)"
)
subcases: Optional[List[Subcase]] = Field(default=None, description="Defined subcases")
class NxSettings(BaseModel):
"""NX runtime settings."""
nx_install_path: Optional[str] = None
simulation_timeout_s: Optional[int] = Field(default=None, ge=60, le=7200)
auto_start_nx: Optional[bool] = None
class ModelConfig(BaseModel):
"""NX model files and configuration."""
nx_part: Optional[NxPartConfig] = None
fem: Optional[FemConfig] = None
sim: SimConfig
nx_settings: Optional[NxSettings] = None
# ============================================================================
# Design Variable Models
# ============================================================================
class DesignVariableBounds(BaseModel):
"""Design variable bounds."""
min: float
max: float
@model_validator(mode='after')
def validate_bounds(self) -> 'DesignVariableBounds':
if self.min >= self.max:
raise ValueError(f"min ({self.min}) must be less than max ({self.max})")
return self
class DesignVariable(BaseModel):
"""A design variable to optimize."""
id: str = Field(
...,
pattern=r"^dv_\d{3}$",
description="Unique identifier (pattern: dv_XXX)"
)
name: str = Field(..., description="Human-readable name")
expression_name: str = Field(
...,
pattern=r"^[a-zA-Z_][a-zA-Z0-9_]*$",
description="NX expression name (must match model)"
)
type: DesignVariableType = Field(..., description="Variable type")
bounds: DesignVariableBounds = Field(..., description="Value bounds")
baseline: Optional[float] = Field(default=None, description="Current/initial value")
units: Optional[str] = Field(default=None, description="Physical units (mm, deg, etc.)")
step: Optional[float] = Field(default=None, description="Step size for integer/discrete")
enabled: bool = Field(default=True, description="Whether to include in optimization")
description: Optional[str] = None
canvas_position: Optional[CanvasPosition] = None
# ============================================================================
# Extractor Models
# ============================================================================
class ExtractorConfig(BaseModel):
"""Type-specific extractor configuration."""
inner_radius_mm: Optional[float] = None
outer_radius_mm: Optional[float] = None
n_modes: Optional[int] = None
filter_low_orders: Optional[int] = None
displacement_unit: Optional[str] = None
reference_subcase: Optional[int] = None
expression_name: Optional[str] = None
mode_number: Optional[int] = None
element_type: Optional[str] = None
result_type: Optional[str] = None
metric: Optional[str] = None
class Config:
extra = "allow" # Allow additional fields for flexibility
class CustomFunction(BaseModel):
"""Custom function definition for custom_function extractors."""
name: Optional[str] = Field(default=None, description="Function name")
module: Optional[str] = Field(default=None, description="Python module path")
signature: Optional[str] = Field(default=None, description="Function signature")
source_code: Optional[str] = Field(default=None, description="Python source code")
class ExtractorOutput(BaseModel):
"""Output definition for an extractor."""
name: str = Field(..., description="Output name (used by objectives/constraints)")
metric: Optional[str] = Field(default=None, description="Specific metric (max, total, rms, etc.)")
subcase: Optional[int] = Field(default=None, description="Subcase ID for this output")
units: Optional[str] = None
class Extractor(BaseModel):
"""Physics extractor that computes outputs from FEA."""
id: str = Field(
...,
pattern=r"^ext_\d{3}$",
description="Unique identifier (pattern: ext_XXX)"
)
name: str = Field(..., description="Human-readable name")
type: ExtractorType = Field(..., description="Extractor type")
builtin: bool = Field(default=True, description="Whether this is a built-in extractor")
config: Optional[ExtractorConfig] = Field(default=None, description="Type-specific configuration")
function: Optional[CustomFunction] = Field(
default=None,
description="Custom function definition (for custom_function type)"
)
outputs: List[ExtractorOutput] = Field(..., min_length=1, description="Output values")
canvas_position: Optional[CanvasPosition] = None
@model_validator(mode='after')
def validate_custom_function(self) -> 'Extractor':
if self.type == ExtractorType.CUSTOM_FUNCTION and self.function is None:
raise ValueError("custom_function extractor requires function definition")
return self
# ============================================================================
# Objective Models
# ============================================================================
class ObjectiveSource(BaseModel):
"""Source reference for objective value."""
extractor_id: str = Field(..., description="Reference to extractor")
output_name: str = Field(..., description="Which output from the extractor")
class Objective(BaseModel):
"""Optimization objective."""
id: str = Field(
...,
pattern=r"^obj_\d{3}$",
description="Unique identifier (pattern: obj_XXX)"
)
name: str = Field(..., description="Human-readable name")
direction: OptimizationDirection = Field(..., description="Optimization direction")
weight: float = Field(default=1.0, ge=0, description="Weight for weighted sum")
source: ObjectiveSource = Field(..., description="Where the value comes from")
target: Optional[float] = Field(default=None, description="Target value (for goal programming)")
units: Optional[str] = None
description: Optional[str] = None
canvas_position: Optional[CanvasPosition] = None
# ============================================================================
# Constraint Models
# ============================================================================
class ConstraintSource(BaseModel):
"""Source reference for constraint value."""
extractor_id: str
output_name: str
class PenaltyConfig(BaseModel):
"""Penalty method configuration for constraints."""
method: Optional[PenaltyMethod] = None
weight: Optional[float] = None
margin: Optional[float] = Field(default=None, description="Soft margin before penalty kicks in")
class Constraint(BaseModel):
"""Hard or soft constraint."""
id: str = Field(
...,
pattern=r"^con_\d{3}$",
description="Unique identifier (pattern: con_XXX)"
)
name: str
type: ConstraintType = Field(..., description="Constraint type")
operator: ConstraintOperator = Field(..., description="Comparison operator")
threshold: float = Field(..., description="Constraint threshold value")
source: ConstraintSource = Field(..., description="Where the value comes from")
penalty_config: Optional[PenaltyConfig] = None
description: Optional[str] = None
canvas_position: Optional[CanvasPosition] = None
# ============================================================================
# Optimization Models
# ============================================================================
class AlgorithmConfig(BaseModel):
"""Algorithm-specific settings."""
population_size: Optional[int] = None
n_generations: Optional[int] = None
mutation_prob: Optional[float] = None
crossover_prob: Optional[float] = None
seed: Optional[int] = None
n_startup_trials: Optional[int] = None
sigma0: Optional[float] = None
class Config:
extra = "allow" # Allow additional algorithm-specific fields
class Algorithm(BaseModel):
"""Optimization algorithm configuration."""
type: AlgorithmType
config: Optional[AlgorithmConfig] = None
class OptimizationBudget(BaseModel):
"""Computational budget for optimization."""
max_trials: Optional[int] = Field(default=None, ge=1, le=10000)
max_time_hours: Optional[float] = None
convergence_patience: Optional[int] = Field(
default=None,
description="Stop if no improvement for N trials"
)
class SurrogateConfig(BaseModel):
"""Neural surrogate model configuration."""
n_models: Optional[int] = None
architecture: Optional[List[int]] = None
train_every_n_trials: Optional[int] = None
min_training_samples: Optional[int] = None
acquisition_candidates: Optional[int] = None
fea_validations_per_round: Optional[int] = None
class Surrogate(BaseModel):
"""Surrogate model settings."""
enabled: Optional[bool] = None
type: Optional[SurrogateType] = None
config: Optional[SurrogateConfig] = None
class OptimizationConfig(BaseModel):
"""Optimization algorithm configuration."""
algorithm: Algorithm
budget: OptimizationBudget
surrogate: Optional[Surrogate] = None
canvas_position: Optional[CanvasPosition] = None
# ============================================================================
# Workflow Models
# ============================================================================
class WorkflowStage(BaseModel):
"""A stage in a multi-stage optimization workflow."""
id: str
name: str
algorithm: Optional[str] = None
trials: Optional[int] = None
purpose: Optional[str] = None
class WorkflowTransition(BaseModel):
"""Transition between workflow stages."""
from_: str = Field(..., alias="from")
to: str
condition: Optional[str] = None
class Config:
populate_by_name = True
class Workflow(BaseModel):
"""Multi-stage optimization workflow."""
stages: Optional[List[WorkflowStage]] = None
transitions: Optional[List[WorkflowTransition]] = None
# ============================================================================
# Reporting Models
# ============================================================================
class InsightConfig(BaseModel):
"""Insight-specific configuration."""
include_html: Optional[bool] = None
show_pareto_evolution: Optional[bool] = None
class Config:
extra = "allow"
class Insight(BaseModel):
"""Reporting insight definition."""
type: Optional[str] = None
for_trials: Optional[str] = None
config: Optional[InsightConfig] = None
class ReportingConfig(BaseModel):
"""Reporting configuration."""
auto_report: Optional[bool] = None
report_triggers: Optional[List[str]] = None
insights: Optional[List[Insight]] = None
# ============================================================================
# Canvas Models
# ============================================================================
class CanvasViewport(BaseModel):
"""Canvas viewport settings."""
x: float = 0
y: float = 0
zoom: float = 1.0
class CanvasEdge(BaseModel):
"""Connection between canvas nodes."""
source: str
target: str
sourceHandle: Optional[str] = None
targetHandle: Optional[str] = None
class CanvasGroup(BaseModel):
"""Grouping of canvas nodes."""
id: str
name: str
node_ids: List[str]
class CanvasConfig(BaseModel):
"""Canvas UI state (persisted for reconstruction)."""
layout_version: Optional[str] = None
viewport: Optional[CanvasViewport] = None
edges: Optional[List[CanvasEdge]] = None
groups: Optional[List[CanvasGroup]] = None
# ============================================================================
# Main AtomizerSpec Model
# ============================================================================
class AtomizerSpec(BaseModel):
"""
AtomizerSpec v2.0 - The unified configuration schema for Atomizer optimization studies.
This is the single source of truth used by:
- Canvas UI (rendering and editing)
- Backend API (validation and storage)
- Claude Assistant (reading and modifying)
- Optimization Engine (execution)
"""
meta: SpecMeta = Field(..., description="Metadata about the spec")
model: ModelConfig = Field(..., description="NX model files and configuration")
design_variables: List[DesignVariable] = Field(
...,
min_length=1,
max_length=50,
description="Design variables to optimize"
)
extractors: List[Extractor] = Field(
...,
min_length=1,
description="Physics extractors"
)
objectives: List[Objective] = Field(
...,
min_length=1,
max_length=5,
description="Optimization objectives"
)
constraints: Optional[List[Constraint]] = Field(
default=None,
description="Hard and soft constraints"
)
optimization: OptimizationConfig = Field(..., description="Algorithm configuration")
workflow: Optional[Workflow] = Field(default=None, description="Multi-stage workflow")
reporting: Optional[ReportingConfig] = Field(default=None, description="Reporting config")
canvas: Optional[CanvasConfig] = Field(default=None, description="Canvas UI state")
@model_validator(mode='after')
def validate_references(self) -> 'AtomizerSpec':
"""Validate that all references are valid."""
# Collect valid extractor IDs and their outputs
extractor_outputs: Dict[str, set] = {}
for ext in self.extractors:
extractor_outputs[ext.id] = {o.name for o in ext.outputs}
# Validate objective sources
for obj in self.objectives:
if obj.source.extractor_id not in extractor_outputs:
raise ValueError(
f"Objective '{obj.name}' references unknown extractor: {obj.source.extractor_id}"
)
if obj.source.output_name not in extractor_outputs[obj.source.extractor_id]:
raise ValueError(
f"Objective '{obj.name}' references unknown output: {obj.source.output_name}"
)
# Validate constraint sources
if self.constraints:
for con in self.constraints:
if con.source.extractor_id not in extractor_outputs:
raise ValueError(
f"Constraint '{con.name}' references unknown extractor: {con.source.extractor_id}"
)
if con.source.output_name not in extractor_outputs[con.source.extractor_id]:
raise ValueError(
f"Constraint '{con.name}' references unknown output: {con.source.output_name}"
)
return self
def get_enabled_design_variables(self) -> List[DesignVariable]:
"""Return only enabled design variables."""
return [dv for dv in self.design_variables if dv.enabled]
def get_extractor_by_id(self, extractor_id: str) -> Optional[Extractor]:
"""Find an extractor by ID."""
for ext in self.extractors:
if ext.id == extractor_id:
return ext
return None
def get_objective_by_id(self, objective_id: str) -> Optional[Objective]:
"""Find an objective by ID."""
for obj in self.objectives:
if obj.id == objective_id:
return obj
return None
def get_constraint_by_id(self, constraint_id: str) -> Optional[Constraint]:
"""Find a constraint by ID."""
if not self.constraints:
return None
for con in self.constraints:
if con.id == constraint_id:
return con
return None
def has_custom_extractors(self) -> bool:
"""Check if spec has any custom function extractors."""
return any(ext.type == ExtractorType.CUSTOM_FUNCTION for ext in self.extractors)
def is_multi_objective(self) -> bool:
"""Check if this is a multi-objective optimization."""
return len(self.objectives) > 1
# ============================================================================
# Validation Response Models
# ============================================================================
class ValidationError(BaseModel):
"""A validation error."""
type: str # 'schema', 'semantic', 'reference'
path: List[str]
message: str
class ValidationWarning(BaseModel):
"""A validation warning."""
type: str
path: List[str]
message: str
class ValidationSummary(BaseModel):
"""Summary of spec contents."""
design_variables: int
extractors: int
objectives: int
constraints: int
custom_functions: int
class ValidationReport(BaseModel):
"""Full validation report."""
valid: bool
errors: List[ValidationError]
warnings: List[ValidationWarning]
summary: ValidationSummary

View File

@@ -0,0 +1,654 @@
"""
AtomizerSpec v2.0 Validator
Provides comprehensive validation including:
- JSON Schema validation
- Pydantic model validation
- Semantic validation (bounds, references, dependencies)
- Extractor-specific validation
"""
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import ValidationError as PydanticValidationError
try:
import jsonschema
HAS_JSONSCHEMA = True
except ImportError:
HAS_JSONSCHEMA = False
from .spec_models import (
AtomizerSpec,
ValidationReport,
ValidationError,
ValidationWarning,
ValidationSummary,
ExtractorType,
AlgorithmType,
ConstraintType,
)
class SpecValidationError(Exception):
"""Raised when spec validation fails."""
def __init__(self, message: str, errors: List[ValidationError] = None):
super().__init__(message)
self.errors = errors or []
class SpecValidator:
"""
Validates AtomizerSpec v2.0 configurations.
Provides three levels of validation:
1. JSON Schema validation (structural)
2. Pydantic model validation (type safety)
3. Semantic validation (business logic)
"""
# Path to JSON Schema file
SCHEMA_PATH = Path(__file__).parent.parent / "schemas" / "atomizer_spec_v2.json"
def __init__(self):
"""Initialize validator with schema."""
self._schema: Optional[Dict] = None
@property
def schema(self) -> Dict:
"""Lazy load the JSON Schema."""
if self._schema is None:
if self.SCHEMA_PATH.exists():
with open(self.SCHEMA_PATH) as f:
self._schema = json.load(f)
else:
self._schema = {}
return self._schema
def validate(
self,
spec_data: Union[Dict[str, Any], AtomizerSpec],
strict: bool = True
) -> ValidationReport:
"""
Validate a spec and return a detailed report.
Args:
spec_data: Either a dict or AtomizerSpec instance
strict: If True, raise exception on errors; if False, return report only
Returns:
ValidationReport with errors, warnings, and summary
Raises:
SpecValidationError: If strict=True and validation fails
"""
errors: List[ValidationError] = []
warnings: List[ValidationWarning] = []
# Convert to dict if needed
if isinstance(spec_data, AtomizerSpec):
data = spec_data.model_dump(mode='json')
else:
data = spec_data
# Phase 1: JSON Schema validation
schema_errors = self._validate_json_schema(data)
errors.extend(schema_errors)
# Phase 2: Pydantic model validation (only if schema passes)
if not schema_errors:
pydantic_errors = self._validate_pydantic(data)
errors.extend(pydantic_errors)
# Phase 3: Semantic validation (only if pydantic passes)
if not errors:
spec = AtomizerSpec.model_validate(data)
semantic_errors, semantic_warnings = self._validate_semantic(spec)
errors.extend(semantic_errors)
warnings.extend(semantic_warnings)
# Build summary
summary = self._build_summary(data)
# Build report
report = ValidationReport(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
summary=summary
)
# Raise if strict mode and errors found
if strict and not report.valid:
error_messages = "; ".join(e.message for e in report.errors[:3])
raise SpecValidationError(
f"Spec validation failed: {error_messages}",
errors=report.errors
)
return report
def validate_partial(
self,
path: str,
value: Any,
current_spec: AtomizerSpec
) -> Tuple[bool, List[str]]:
"""
Validate a partial update before applying.
Args:
path: JSONPath to the field being updated
value: New value
current_spec: Current full spec
Returns:
Tuple of (is_valid, list of error messages)
"""
errors = []
# Parse path
parts = self._parse_path(path)
if not parts:
return False, ["Invalid path format"]
# Get target type from path
root = parts[0]
# Validate based on root section
if root == "design_variables":
errors.extend(self._validate_dv_update(parts, value, current_spec))
elif root == "extractors":
errors.extend(self._validate_extractor_update(parts, value, current_spec))
elif root == "objectives":
errors.extend(self._validate_objective_update(parts, value, current_spec))
elif root == "constraints":
errors.extend(self._validate_constraint_update(parts, value, current_spec))
elif root == "optimization":
errors.extend(self._validate_optimization_update(parts, value))
elif root == "meta":
errors.extend(self._validate_meta_update(parts, value))
return len(errors) == 0, errors
def _validate_json_schema(self, data: Dict) -> List[ValidationError]:
"""Validate against JSON Schema."""
errors = []
if not HAS_JSONSCHEMA or not self.schema:
return errors # Skip if jsonschema not available
try:
jsonschema.validate(instance=data, schema=self.schema)
except jsonschema.ValidationError as e:
errors.append(ValidationError(
type="schema",
path=list(e.absolute_path),
message=e.message
))
except jsonschema.SchemaError as e:
errors.append(ValidationError(
type="schema",
path=[],
message=f"Invalid schema: {e.message}"
))
return errors
def _validate_pydantic(self, data: Dict) -> List[ValidationError]:
"""Validate using Pydantic models."""
errors = []
try:
AtomizerSpec.model_validate(data)
except PydanticValidationError as e:
for err in e.errors():
errors.append(ValidationError(
type="schema",
path=[str(p) for p in err.get("loc", [])],
message=err.get("msg", "Validation error")
))
return errors
def _validate_semantic(
self,
spec: AtomizerSpec
) -> Tuple[List[ValidationError], List[ValidationWarning]]:
"""
Perform semantic validation.
Checks business logic and constraints that can't be expressed in schema.
"""
errors: List[ValidationError] = []
warnings: List[ValidationWarning] = []
# Validate design variable bounds
errors.extend(self._validate_dv_bounds(spec))
# Validate extractor configurations
errors.extend(self._validate_extractor_configs(spec))
warnings.extend(self._warn_extractor_configs(spec))
# Validate reference integrity (done in Pydantic, but double-check)
errors.extend(self._validate_references(spec))
# Validate optimization settings
errors.extend(self._validate_optimization_settings(spec))
warnings.extend(self._warn_optimization_settings(spec))
# Validate canvas edges
warnings.extend(self._validate_canvas_edges(spec))
# Check for duplicate IDs
errors.extend(self._validate_unique_ids(spec))
# Validate custom function syntax
errors.extend(self._validate_custom_functions(spec))
return errors, warnings
def _validate_dv_bounds(self, spec: AtomizerSpec) -> List[ValidationError]:
"""Validate design variable bounds."""
errors = []
for i, dv in enumerate(spec.design_variables):
# Check baseline within bounds
if dv.baseline is not None:
if dv.baseline < dv.bounds.min or dv.baseline > dv.bounds.max:
errors.append(ValidationError(
type="semantic",
path=["design_variables", str(i), "baseline"],
message=f"Baseline {dv.baseline} outside bounds [{dv.bounds.min}, {dv.bounds.max}]"
))
# Check step size for integer type
if dv.type.value == "integer":
range_size = dv.bounds.max - dv.bounds.min
if range_size < 1:
errors.append(ValidationError(
type="semantic",
path=["design_variables", str(i), "bounds"],
message="Integer variable must have range >= 1"
))
return errors
def _validate_extractor_configs(self, spec: AtomizerSpec) -> List[ValidationError]:
"""Validate extractor-specific configurations."""
errors = []
for i, ext in enumerate(spec.extractors):
# Zernike extractors need specific config
if ext.type in [ExtractorType.ZERNIKE_OPD, ExtractorType.ZERNIKE_CSV]:
if not ext.config:
errors.append(ValidationError(
type="semantic",
path=["extractors", str(i), "config"],
message=f"Zernike extractor requires config with radius settings"
))
elif ext.config:
if ext.config.inner_radius_mm is None:
errors.append(ValidationError(
type="semantic",
path=["extractors", str(i), "config", "inner_radius_mm"],
message="Zernike extractor requires inner_radius_mm"
))
if ext.config.outer_radius_mm is None:
errors.append(ValidationError(
type="semantic",
path=["extractors", str(i), "config", "outer_radius_mm"],
message="Zernike extractor requires outer_radius_mm"
))
# Mass expression extractor needs expression_name
if ext.type == ExtractorType.MASS_EXPRESSION:
if not ext.config or not ext.config.expression_name:
errors.append(ValidationError(
type="semantic",
path=["extractors", str(i), "config", "expression_name"],
message="Mass expression extractor requires expression_name in config"
))
return errors
def _warn_extractor_configs(self, spec: AtomizerSpec) -> List[ValidationWarning]:
"""Generate warnings for extractor configurations."""
warnings = []
for i, ext in enumerate(spec.extractors):
# Zernike mode count warning
if ext.type in [ExtractorType.ZERNIKE_OPD, ExtractorType.ZERNIKE_CSV]:
if ext.config and ext.config.n_modes:
if ext.config.n_modes > 66:
warnings.append(ValidationWarning(
type="performance",
path=["extractors", str(i), "config", "n_modes"],
message=f"n_modes={ext.config.n_modes} is high; consider <=66 for performance"
))
return warnings
def _validate_references(self, spec: AtomizerSpec) -> List[ValidationError]:
"""Validate reference integrity."""
errors = []
# Collect all valid IDs
dv_ids = {dv.id for dv in spec.design_variables}
ext_ids = {ext.id for ext in spec.extractors}
ext_outputs: Dict[str, set] = {}
for ext in spec.extractors:
ext_outputs[ext.id] = {o.name for o in ext.outputs}
# Validate canvas edges
if spec.canvas and spec.canvas.edges:
all_ids = dv_ids | ext_ids
all_ids.add("model")
all_ids.add("solver")
all_ids.add("optimization")
all_ids.update(obj.id for obj in spec.objectives)
if spec.constraints:
all_ids.update(con.id for con in spec.constraints)
for i, edge in enumerate(spec.canvas.edges):
if edge.source not in all_ids:
errors.append(ValidationError(
type="reference",
path=["canvas", "edges", str(i), "source"],
message=f"Edge source '{edge.source}' not found"
))
if edge.target not in all_ids:
errors.append(ValidationError(
type="reference",
path=["canvas", "edges", str(i), "target"],
message=f"Edge target '{edge.target}' not found"
))
return errors
def _validate_optimization_settings(self, spec: AtomizerSpec) -> List[ValidationError]:
"""Validate optimization settings."""
errors = []
algo_type = spec.optimization.algorithm.type
# NSGA-II requires multiple objectives
if algo_type == AlgorithmType.NSGA_II and len(spec.objectives) < 2:
errors.append(ValidationError(
type="semantic",
path=["optimization", "algorithm", "type"],
message="NSGA-II requires at least 2 objectives"
))
return errors
def _warn_optimization_settings(self, spec: AtomizerSpec) -> List[ValidationWarning]:
"""Generate warnings for optimization settings."""
warnings = []
budget = spec.optimization.budget
# Warn about small trial budgets
if budget.max_trials and budget.max_trials < 20:
warnings.append(ValidationWarning(
type="recommendation",
path=["optimization", "budget", "max_trials"],
message=f"max_trials={budget.max_trials} is low; recommend >= 20 for convergence"
))
# Warn about large design space with small budget
num_dvs = len(spec.get_enabled_design_variables())
if budget.max_trials and num_dvs > 5 and budget.max_trials < num_dvs * 10:
warnings.append(ValidationWarning(
type="recommendation",
path=["optimization", "budget", "max_trials"],
message=f"{num_dvs} DVs suggest at least {num_dvs * 10} trials"
))
return warnings
def _validate_canvas_edges(self, spec: AtomizerSpec) -> List[ValidationWarning]:
"""Validate canvas edge structure."""
warnings = []
if not spec.canvas or not spec.canvas.edges:
warnings.append(ValidationWarning(
type="completeness",
path=["canvas", "edges"],
message="No canvas edges defined; canvas may not render correctly"
))
return warnings
def _validate_unique_ids(self, spec: AtomizerSpec) -> List[ValidationError]:
"""Validate that all IDs are unique."""
errors = []
seen_ids: Dict[str, str] = {}
# Check all ID-bearing elements
for i, dv in enumerate(spec.design_variables):
if dv.id in seen_ids:
errors.append(ValidationError(
type="semantic",
path=["design_variables", str(i), "id"],
message=f"Duplicate ID '{dv.id}' (also in {seen_ids[dv.id]})"
))
seen_ids[dv.id] = f"design_variables[{i}]"
for i, ext in enumerate(spec.extractors):
if ext.id in seen_ids:
errors.append(ValidationError(
type="semantic",
path=["extractors", str(i), "id"],
message=f"Duplicate ID '{ext.id}' (also in {seen_ids[ext.id]})"
))
seen_ids[ext.id] = f"extractors[{i}]"
for i, obj in enumerate(spec.objectives):
if obj.id in seen_ids:
errors.append(ValidationError(
type="semantic",
path=["objectives", str(i), "id"],
message=f"Duplicate ID '{obj.id}' (also in {seen_ids[obj.id]})"
))
seen_ids[obj.id] = f"objectives[{i}]"
if spec.constraints:
for i, con in enumerate(spec.constraints):
if con.id in seen_ids:
errors.append(ValidationError(
type="semantic",
path=["constraints", str(i), "id"],
message=f"Duplicate ID '{con.id}' (also in {seen_ids[con.id]})"
))
seen_ids[con.id] = f"constraints[{i}]"
return errors
def _validate_custom_functions(self, spec: AtomizerSpec) -> List[ValidationError]:
"""Validate custom function Python syntax."""
errors = []
for i, ext in enumerate(spec.extractors):
if ext.type == ExtractorType.CUSTOM_FUNCTION and ext.function:
if ext.function.source_code:
try:
compile(ext.function.source_code, f"<custom:{ext.name}>", "exec")
except SyntaxError as e:
errors.append(ValidationError(
type="semantic",
path=["extractors", str(i), "function", "source_code"],
message=f"Python syntax error: {e.msg} at line {e.lineno}"
))
return errors
def _build_summary(self, data: Dict) -> ValidationSummary:
"""Build validation summary."""
extractors = data.get("extractors", [])
custom_count = sum(
1 for e in extractors
if e.get("type") == "custom_function" or not e.get("builtin", True)
)
return ValidationSummary(
design_variables=len(data.get("design_variables", [])),
extractors=len(extractors),
objectives=len(data.get("objectives", [])),
constraints=len(data.get("constraints", []) or []),
custom_functions=custom_count
)
def _parse_path(self, path: str) -> List[str]:
"""Parse a JSONPath-style path into parts."""
import re
# Handle both dot notation and bracket notation
# e.g., "design_variables[0].bounds.max" or "objectives.0.weight"
parts = []
for part in re.split(r'\.|\[|\]', path):
if part:
parts.append(part)
return parts
def _validate_dv_update(
self,
parts: List[str],
value: Any,
spec: AtomizerSpec
) -> List[str]:
"""Validate a design variable update."""
errors = []
if len(parts) >= 2:
try:
idx = int(parts[1])
if idx >= len(spec.design_variables):
errors.append(f"Design variable index {idx} out of range")
except ValueError:
errors.append(f"Invalid design variable index: {parts[1]}")
return errors
def _validate_extractor_update(
self,
parts: List[str],
value: Any,
spec: AtomizerSpec
) -> List[str]:
"""Validate an extractor update."""
errors = []
if len(parts) >= 2:
try:
idx = int(parts[1])
if idx >= len(spec.extractors):
errors.append(f"Extractor index {idx} out of range")
except ValueError:
errors.append(f"Invalid extractor index: {parts[1]}")
return errors
def _validate_objective_update(
self,
parts: List[str],
value: Any,
spec: AtomizerSpec
) -> List[str]:
"""Validate an objective update."""
errors = []
if len(parts) >= 2:
try:
idx = int(parts[1])
if idx >= len(spec.objectives):
errors.append(f"Objective index {idx} out of range")
except ValueError:
errors.append(f"Invalid objective index: {parts[1]}")
# Validate weight
if len(parts) >= 3 and parts[2] == "weight":
if not isinstance(value, (int, float)) or value < 0:
errors.append("Weight must be a non-negative number")
return errors
def _validate_constraint_update(
self,
parts: List[str],
value: Any,
spec: AtomizerSpec
) -> List[str]:
"""Validate a constraint update."""
errors = []
if not spec.constraints:
errors.append("No constraints defined")
return errors
if len(parts) >= 2:
try:
idx = int(parts[1])
if idx >= len(spec.constraints):
errors.append(f"Constraint index {idx} out of range")
except ValueError:
errors.append(f"Invalid constraint index: {parts[1]}")
return errors
def _validate_optimization_update(
self,
parts: List[str],
value: Any
) -> List[str]:
"""Validate an optimization update."""
errors = []
if len(parts) >= 2:
if parts[1] == "algorithm" and len(parts) >= 3:
if parts[2] == "type":
valid_types = [t.value for t in AlgorithmType]
if value not in valid_types:
errors.append(f"Invalid algorithm type. Valid: {valid_types}")
return errors
def _validate_meta_update(
self,
parts: List[str],
value: Any
) -> List[str]:
"""Validate a meta update."""
errors = []
if len(parts) >= 2:
if parts[1] == "study_name":
import re
if not re.match(r"^[a-z0-9_]+$", str(value)):
errors.append("study_name must be snake_case (lowercase, numbers, underscores)")
return errors
# Module-level convenience function
def validate_spec(
spec_data: Union[Dict[str, Any], AtomizerSpec],
strict: bool = True
) -> ValidationReport:
"""
Validate an AtomizerSpec.
Args:
spec_data: Spec data (dict or AtomizerSpec)
strict: Raise exception on errors
Returns:
ValidationReport
Raises:
SpecValidationError: If strict=True and validation fails
"""
validator = SpecValidator()
return validator.validate(spec_data, strict=strict)