Dashboard: - Add Studio page with drag-drop model upload and Claude chat - Add intake system for study creation workflow - Improve session manager and context builder - Add intake API routes and frontend components Optimization Engine: - Add CLI module for command-line operations - Add intake module for study preprocessing - Add validation module with gate checks - Improve Zernike extractor documentation - Update spec models with better validation - Enhance solve_simulation robustness Documentation: - Add ATOMIZER_STUDIO.md planning doc - Add ATOMIZER_UX_SYSTEM.md for UX patterns - Update extractor library docs - Add study-readme-generator skill Tools: - Add test scripts for extraction validation - Add Zernike recentering test Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
455 lines
15 KiB
Python
455 lines
15 KiB
Python
"""
|
|
Specification Checker
|
|
=====================
|
|
|
|
Validates atomizer_spec.json (or optimization_config.json) for:
|
|
- Schema compliance
|
|
- Semantic correctness
|
|
- Anti-pattern detection
|
|
- Expression existence
|
|
|
|
This catches configuration errors BEFORE wasting time on failed trials.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IssueSeverity(str, Enum):
|
|
"""Severity level for validation issues."""
|
|
|
|
ERROR = "error" # Must fix before proceeding
|
|
WARNING = "warning" # Should review, but can proceed
|
|
INFO = "info" # Informational note
|
|
|
|
|
|
@dataclass
|
|
class ValidationIssue:
|
|
"""A single validation issue."""
|
|
|
|
severity: IssueSeverity
|
|
code: str
|
|
message: str
|
|
path: Optional[str] = None # JSON path to the issue
|
|
suggestion: Optional[str] = None
|
|
|
|
def __str__(self) -> str:
|
|
prefix = {
|
|
IssueSeverity.ERROR: "[ERROR]",
|
|
IssueSeverity.WARNING: "[WARN]",
|
|
IssueSeverity.INFO: "[INFO]",
|
|
}[self.severity]
|
|
|
|
location = f" at {self.path}" if self.path else ""
|
|
return f"{prefix} {self.message}{location}"
|
|
|
|
|
|
@dataclass
|
|
class CheckResult:
|
|
"""Result of running the spec checker."""
|
|
|
|
valid: bool
|
|
issues: List[ValidationIssue] = field(default_factory=list)
|
|
|
|
@property
|
|
def errors(self) -> List[ValidationIssue]:
|
|
return [i for i in self.issues if i.severity == IssueSeverity.ERROR]
|
|
|
|
@property
|
|
def warnings(self) -> List[ValidationIssue]:
|
|
return [i for i in self.issues if i.severity == IssueSeverity.WARNING]
|
|
|
|
def add_error(self, code: str, message: str, path: str = None, suggestion: str = None):
|
|
self.issues.append(
|
|
ValidationIssue(
|
|
severity=IssueSeverity.ERROR,
|
|
code=code,
|
|
message=message,
|
|
path=path,
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
self.valid = False
|
|
|
|
def add_warning(self, code: str, message: str, path: str = None, suggestion: str = None):
|
|
self.issues.append(
|
|
ValidationIssue(
|
|
severity=IssueSeverity.WARNING,
|
|
code=code,
|
|
message=message,
|
|
path=path,
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
|
|
def add_info(self, code: str, message: str, path: str = None):
|
|
self.issues.append(
|
|
ValidationIssue(
|
|
severity=IssueSeverity.INFO,
|
|
code=code,
|
|
message=message,
|
|
path=path,
|
|
)
|
|
)
|
|
|
|
|
|
class SpecChecker:
|
|
"""
|
|
Validates study specification files.
|
|
|
|
Checks:
|
|
1. Required fields present
|
|
2. Design variable bounds valid
|
|
3. Expressions exist in model (if introspection available)
|
|
4. Extractors available for objectives/constraints
|
|
5. Anti-patterns (mass minimization without constraints, etc.)
|
|
"""
|
|
|
|
# Known extractors
|
|
KNOWN_EXTRACTORS = {
|
|
"extract_mass_from_bdf",
|
|
"extract_part_mass",
|
|
"extract_displacement",
|
|
"extract_solid_stress",
|
|
"extract_principal_stress",
|
|
"extract_frequency",
|
|
"extract_strain_energy",
|
|
"extract_temperature",
|
|
"extract_zernike_from_op2",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
spec_path: Optional[Path] = None,
|
|
available_expressions: Optional[List[str]] = None,
|
|
):
|
|
"""
|
|
Initialize the checker.
|
|
|
|
Args:
|
|
spec_path: Path to spec file (atomizer_spec.json or optimization_config.json)
|
|
available_expressions: List of expression names from introspection
|
|
"""
|
|
self.spec_path = spec_path
|
|
self.available_expressions = available_expressions or []
|
|
self.spec: Dict[str, Any] = {}
|
|
|
|
def check(self, spec_data: Optional[Dict[str, Any]] = None) -> CheckResult:
|
|
"""
|
|
Run all validation checks.
|
|
|
|
Args:
|
|
spec_data: Spec dict (or load from spec_path if not provided)
|
|
|
|
Returns:
|
|
CheckResult with all issues found
|
|
"""
|
|
result = CheckResult(valid=True)
|
|
|
|
# Load spec if not provided
|
|
if spec_data:
|
|
self.spec = spec_data
|
|
elif self.spec_path and self.spec_path.exists():
|
|
with open(self.spec_path) as f:
|
|
self.spec = json.load(f)
|
|
else:
|
|
result.add_error("SPEC_NOT_FOUND", "No specification file found")
|
|
return result
|
|
|
|
# Run checks
|
|
self._check_required_fields(result)
|
|
self._check_design_variables(result)
|
|
self._check_objectives(result)
|
|
self._check_constraints(result)
|
|
self._check_extractors(result)
|
|
self._check_anti_patterns(result)
|
|
self._check_files(result)
|
|
|
|
return result
|
|
|
|
def _check_required_fields(self, result: CheckResult) -> None:
|
|
"""Check that required fields are present."""
|
|
|
|
# Check for design variables
|
|
dvs = self.spec.get("design_variables", [])
|
|
if not dvs:
|
|
result.add_error(
|
|
"NO_DESIGN_VARIABLES",
|
|
"No design variables defined",
|
|
suggestion="Add at least one design variable to optimize",
|
|
)
|
|
|
|
# Check for objectives
|
|
objectives = self.spec.get("objectives", [])
|
|
if not objectives:
|
|
result.add_error(
|
|
"NO_OBJECTIVES",
|
|
"No objectives defined",
|
|
suggestion="Define at least one objective (e.g., minimize mass)",
|
|
)
|
|
|
|
# Check for simulation settings
|
|
sim = self.spec.get("simulation", {})
|
|
if not sim.get("sim_file"):
|
|
result.add_warning(
|
|
"NO_SIM_FILE", "No simulation file specified", path="simulation.sim_file"
|
|
)
|
|
|
|
def _check_design_variables(self, result: CheckResult) -> None:
|
|
"""Check design variable definitions."""
|
|
|
|
dvs = self.spec.get("design_variables", [])
|
|
|
|
for i, dv in enumerate(dvs):
|
|
param = dv.get("parameter", dv.get("expression_name", dv.get("name", f"dv_{i}")))
|
|
bounds = dv.get("bounds", [])
|
|
path = f"design_variables[{i}]"
|
|
|
|
# Handle both formats: [min, max] or {"min": x, "max": y}
|
|
if isinstance(bounds, dict):
|
|
min_val = bounds.get("min")
|
|
max_val = bounds.get("max")
|
|
elif isinstance(bounds, (list, tuple)) and len(bounds) == 2:
|
|
min_val, max_val = bounds
|
|
else:
|
|
result.add_error(
|
|
"INVALID_BOUNDS",
|
|
f"Design variable '{param}' has invalid bounds format",
|
|
path=path,
|
|
suggestion="Bounds must be [min, max] or {min: x, max: y}",
|
|
)
|
|
continue
|
|
|
|
# Convert to float if strings
|
|
try:
|
|
min_val = float(min_val)
|
|
max_val = float(max_val)
|
|
except (TypeError, ValueError):
|
|
result.add_error(
|
|
"INVALID_BOUNDS_TYPE",
|
|
f"Design variable '{param}' bounds must be numeric",
|
|
path=path,
|
|
)
|
|
continue
|
|
|
|
# Check bounds order
|
|
if min_val >= max_val:
|
|
result.add_error(
|
|
"BOUNDS_INVERTED",
|
|
f"Design variable '{param}': min ({min_val}) >= max ({max_val})",
|
|
path=path,
|
|
suggestion="Ensure min < max",
|
|
)
|
|
|
|
# Check for very wide bounds
|
|
if max_val > 0 and min_val > 0:
|
|
ratio = max_val / min_val
|
|
if ratio > 100:
|
|
result.add_warning(
|
|
"BOUNDS_TOO_WIDE",
|
|
f"Design variable '{param}' has very wide bounds (ratio: {ratio:.1f}x)",
|
|
path=path,
|
|
suggestion="Consider narrowing bounds for faster convergence",
|
|
)
|
|
|
|
# Check for very narrow bounds
|
|
if max_val > 0 and min_val > 0:
|
|
ratio = max_val / min_val
|
|
if ratio < 1.1:
|
|
result.add_warning(
|
|
"BOUNDS_TOO_NARROW",
|
|
f"Design variable '{param}' has very narrow bounds (ratio: {ratio:.2f}x)",
|
|
path=path,
|
|
suggestion="Consider widening bounds to explore more design space",
|
|
)
|
|
|
|
# Check expression exists (if introspection available)
|
|
if self.available_expressions and param not in self.available_expressions:
|
|
result.add_error(
|
|
"EXPRESSION_NOT_FOUND",
|
|
f"Expression '{param}' not found in model",
|
|
path=path,
|
|
suggestion=f"Available expressions: {', '.join(self.available_expressions[:5])}...",
|
|
)
|
|
|
|
def _check_objectives(self, result: CheckResult) -> None:
|
|
"""Check objective definitions."""
|
|
|
|
objectives = self.spec.get("objectives", [])
|
|
|
|
for i, obj in enumerate(objectives):
|
|
name = obj.get("name", f"objective_{i}")
|
|
# Handle both formats: "goal" or "direction"
|
|
goal = obj.get("goal", obj.get("direction", "")).lower()
|
|
path = f"objectives[{i}]"
|
|
|
|
# Check goal is valid
|
|
if goal not in ("minimize", "maximize"):
|
|
result.add_error(
|
|
"INVALID_GOAL",
|
|
f"Objective '{name}' has invalid goal: '{goal}'",
|
|
path=path,
|
|
suggestion="Use 'minimize' or 'maximize'",
|
|
)
|
|
|
|
# Check extraction is defined
|
|
extraction = obj.get("extraction", {})
|
|
if not extraction.get("action"):
|
|
result.add_warning(
|
|
"NO_EXTRACTOR",
|
|
f"Objective '{name}' has no extractor specified",
|
|
path=path,
|
|
)
|
|
|
|
def _check_constraints(self, result: CheckResult) -> None:
|
|
"""Check constraint definitions."""
|
|
|
|
constraints = self.spec.get("constraints", [])
|
|
|
|
for i, const in enumerate(constraints):
|
|
name = const.get("name", f"constraint_{i}")
|
|
const_type = const.get("type", "").lower()
|
|
threshold = const.get("threshold")
|
|
path = f"constraints[{i}]"
|
|
|
|
# Check type is valid
|
|
if const_type not in ("less_than", "greater_than", "equal_to"):
|
|
result.add_warning(
|
|
"INVALID_CONSTRAINT_TYPE",
|
|
f"Constraint '{name}' has unusual type: '{const_type}'",
|
|
path=path,
|
|
suggestion="Use 'less_than' or 'greater_than'",
|
|
)
|
|
|
|
# Check threshold is defined
|
|
if threshold is None:
|
|
result.add_error(
|
|
"NO_THRESHOLD",
|
|
f"Constraint '{name}' has no threshold defined",
|
|
path=path,
|
|
)
|
|
|
|
def _check_extractors(self, result: CheckResult) -> None:
|
|
"""Check that referenced extractors exist."""
|
|
|
|
# Check objective extractors
|
|
for obj in self.spec.get("objectives", []):
|
|
extraction = obj.get("extraction", {})
|
|
action = extraction.get("action", "")
|
|
|
|
if action and action not in self.KNOWN_EXTRACTORS:
|
|
result.add_warning(
|
|
"UNKNOWN_EXTRACTOR",
|
|
f"Extractor '{action}' is not in the standard library",
|
|
suggestion="Ensure custom extractor is available",
|
|
)
|
|
|
|
# Check constraint extractors
|
|
for const in self.spec.get("constraints", []):
|
|
extraction = const.get("extraction", {})
|
|
action = extraction.get("action", "")
|
|
|
|
if action and action not in self.KNOWN_EXTRACTORS:
|
|
result.add_warning(
|
|
"UNKNOWN_EXTRACTOR",
|
|
f"Extractor '{action}' is not in the standard library",
|
|
)
|
|
|
|
def _check_anti_patterns(self, result: CheckResult) -> None:
|
|
"""Check for common optimization anti-patterns."""
|
|
|
|
objectives = self.spec.get("objectives", [])
|
|
constraints = self.spec.get("constraints", [])
|
|
|
|
# Anti-pattern: Mass minimization without stress/displacement constraints
|
|
has_mass_objective = any(
|
|
"mass" in obj.get("name", "").lower() and obj.get("goal") == "minimize"
|
|
for obj in objectives
|
|
)
|
|
|
|
has_structural_constraint = any(
|
|
any(
|
|
kw in const.get("name", "").lower()
|
|
for kw in ["stress", "displacement", "deflection"]
|
|
)
|
|
for const in constraints
|
|
)
|
|
|
|
if has_mass_objective and not has_structural_constraint:
|
|
result.add_warning(
|
|
"MASS_NO_CONSTRAINT",
|
|
"Mass minimization without structural constraints",
|
|
suggestion="Add stress or displacement constraints to prevent over-optimization",
|
|
)
|
|
|
|
# Anti-pattern: Too many design variables for trial count
|
|
n_dvs = len(self.spec.get("design_variables", []))
|
|
n_trials = self.spec.get("optimization_settings", {}).get("n_trials", 100)
|
|
|
|
if n_dvs > 0 and n_trials / n_dvs < 10:
|
|
result.add_warning(
|
|
"LOW_TRIALS_PER_DV",
|
|
f"Only {n_trials / n_dvs:.1f} trials per design variable",
|
|
suggestion=f"Consider increasing trials to at least {n_dvs * 20} for better coverage",
|
|
)
|
|
|
|
# Anti-pattern: Too many objectives
|
|
n_objectives = len(objectives)
|
|
if n_objectives > 3:
|
|
result.add_warning(
|
|
"TOO_MANY_OBJECTIVES",
|
|
f"{n_objectives} objectives may lead to sparse Pareto front",
|
|
suggestion="Consider consolidating or using weighted objectives",
|
|
)
|
|
|
|
def _check_files(self, result: CheckResult) -> None:
|
|
"""Check that referenced files exist."""
|
|
|
|
if not self.spec_path:
|
|
return
|
|
|
|
study_dir = self.spec_path.parent.parent # Assuming spec is in 1_setup/
|
|
|
|
sim = self.spec.get("simulation", {})
|
|
sim_file = sim.get("sim_file")
|
|
|
|
if sim_file:
|
|
# Check multiple possible locations
|
|
possible_paths = [
|
|
study_dir / "1_model" / sim_file,
|
|
study_dir / "1_setup" / "model" / sim_file,
|
|
study_dir / sim_file,
|
|
]
|
|
|
|
found = any(p.exists() for p in possible_paths)
|
|
if not found:
|
|
result.add_error(
|
|
"SIM_FILE_NOT_FOUND",
|
|
f"Simulation file not found: {sim_file}",
|
|
path="simulation.sim_file",
|
|
suggestion="Ensure model files are copied to study directory",
|
|
)
|
|
|
|
|
|
def validate_spec(spec_path: Path, expressions: List[str] = None) -> CheckResult:
|
|
"""
|
|
Convenience function to validate a spec file.
|
|
|
|
Args:
|
|
spec_path: Path to spec file
|
|
expressions: List of available expressions (from introspection)
|
|
|
|
Returns:
|
|
CheckResult with validation issues
|
|
"""
|
|
checker = SpecChecker(spec_path, expressions)
|
|
return checker.check()
|