feat: Major update with validators, skills, dashboard, and docs reorganization

- Add validation framework (config, model, results, study validators)
- Add Claude Code skills (create-study, run-optimization, generate-report,
  troubleshoot, analyze-model)
- Add Atomizer Dashboard (React frontend + FastAPI backend)
- Reorganize docs into structured directories (00-09)
- Add neural surrogate modules and training infrastructure
- Add multi-objective optimization support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-25 19:23:58 -05:00
parent 74a92803b7
commit e3bdb08a22
155 changed files with 52729 additions and 37 deletions

View File

@@ -0,0 +1,74 @@
"""
Atomizer Validators
==================
Validation modules for ensuring correct configurations, model setups,
and optimization results.
Available validators:
- config_validator: Validate optimization_config.json files
- model_validator: Validate NX model files and simulation setup
- results_validator: Validate optimization results in study.db
- study_validator: Complete study health check (combines all validators)
"""
from .config_validator import (
validate_config,
validate_config_file,
ValidationResult,
ConfigError,
ConfigWarning
)
from .model_validator import (
validate_model,
validate_model_files,
validate_study_model,
ModelValidationResult
)
from .results_validator import (
validate_results,
validate_study_results,
get_pareto_summary,
ResultsValidationResult,
ResultsError,
ResultsWarning
)
from .study_validator import (
validate_study,
list_studies,
quick_check,
get_study_health,
StudyValidationResult,
StudyStatus
)
__all__ = [
# Config validator
'validate_config',
'validate_config_file',
'ValidationResult',
'ConfigError',
'ConfigWarning',
# Model validator
'validate_model',
'validate_model_files',
'validate_study_model',
'ModelValidationResult',
# Results validator
'validate_results',
'validate_study_results',
'get_pareto_summary',
'ResultsValidationResult',
'ResultsError',
'ResultsWarning',
# Study validator
'validate_study',
'list_studies',
'quick_check',
'get_study_health',
'StudyValidationResult',
'StudyStatus'
]

View File

@@ -0,0 +1,591 @@
"""
Configuration Validator for Atomizer
====================================
Validates optimization_config.json files before running optimizations.
Catches common errors and provides helpful suggestions.
Usage:
from optimization_engine.validators import validate_config, validate_config_file
# Validate from file path
result = validate_config_file("studies/my_study/1_setup/optimization_config.json")
# Validate from dict
result = validate_config(config_dict)
if result.is_valid:
print("Config is valid!")
else:
for error in result.errors:
print(f"ERROR: {error}")
for warning in result.warnings:
print(f"WARNING: {warning}")
"""
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
@dataclass
class ConfigError:
"""Represents a configuration error that blocks execution."""
field: str
message: str
suggestion: Optional[str] = None
def __str__(self):
msg = f"[{self.field}] {self.message}"
if self.suggestion:
msg += f" (Suggestion: {self.suggestion})"
return msg
@dataclass
class ConfigWarning:
"""Represents a configuration warning that doesn't block execution."""
field: str
message: str
suggestion: Optional[str] = None
def __str__(self):
msg = f"[{self.field}] {self.message}"
if self.suggestion:
msg += f" (Suggestion: {self.suggestion})"
return msg
@dataclass
class ValidationResult:
"""Result of configuration validation."""
errors: List[ConfigError] = field(default_factory=list)
warnings: List[ConfigWarning] = field(default_factory=list)
config: Optional[Dict[str, Any]] = None
@property
def is_valid(self) -> bool:
"""Config is valid if there are no errors (warnings are OK)."""
return len(self.errors) == 0
def __str__(self):
lines = []
if self.errors:
lines.append(f"ERRORS ({len(self.errors)}):")
for e in self.errors:
lines.append(f" - {e}")
if self.warnings:
lines.append(f"WARNINGS ({len(self.warnings)}):")
for w in self.warnings:
lines.append(f" - {w}")
if self.is_valid and not self.warnings:
lines.append("Configuration is valid.")
return "\n".join(lines)
# Valid values for certain fields
VALID_PROTOCOLS = [
'protocol_10_single_objective',
'protocol_11_multi_objective',
'protocol_12_hybrid_surrogate',
'legacy'
]
VALID_SAMPLERS = [
'TPESampler',
'NSGAIISampler',
'CmaEsSampler',
'RandomSampler',
'GridSampler'
]
VALID_GOALS = ['minimize', 'maximize']
VALID_CONSTRAINT_TYPES = ['less_than', 'greater_than', 'equal_to', 'range']
VALID_VAR_TYPES = ['float', 'integer', 'categorical']
VALID_EXTRACTION_ACTIONS = [
'extract_displacement',
'extract_solid_stress',
'extract_frequency',
'extract_mass_from_expression',
'extract_mass_from_bdf',
'extract_mass',
'extract_stress'
]
def validate_config_file(config_path: Union[str, Path]) -> ValidationResult:
"""
Validate an optimization_config.json file.
Args:
config_path: Path to the configuration file
Returns:
ValidationResult with errors, warnings, and parsed config
"""
config_path = Path(config_path)
result = ValidationResult()
# Check file exists
if not config_path.exists():
result.errors.append(ConfigError(
field="file",
message=f"Configuration file not found: {config_path}",
suggestion="Create optimization_config.json using the create-study skill"
))
return result
# Parse JSON
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
except json.JSONDecodeError as e:
result.errors.append(ConfigError(
field="file",
message=f"Invalid JSON: {e}",
suggestion="Check for syntax errors (missing commas, quotes, brackets)"
))
return result
# Validate content
return validate_config(config, result)
def validate_config(config: Dict[str, Any],
result: Optional[ValidationResult] = None) -> ValidationResult:
"""
Validate an optimization configuration dictionary.
Args:
config: Configuration dictionary
result: Existing ValidationResult to append to (optional)
Returns:
ValidationResult with errors, warnings, and config
"""
if result is None:
result = ValidationResult()
result.config = config
# Required top-level fields
_validate_required_fields(config, result)
# Validate each section
if 'design_variables' in config:
_validate_design_variables(config['design_variables'], result)
if 'objectives' in config:
_validate_objectives(config['objectives'], result)
if 'constraints' in config:
_validate_constraints(config['constraints'], result)
if 'optimization_settings' in config:
_validate_optimization_settings(config['optimization_settings'], result)
if 'simulation' in config:
_validate_simulation_settings(config['simulation'], result)
if 'surrogate_settings' in config:
_validate_surrogate_settings(config['surrogate_settings'], result)
# Cross-field validations
_validate_cross_references(config, result)
return result
def _validate_required_fields(config: Dict[str, Any], result: ValidationResult):
"""Check that required top-level fields exist."""
required = ['study_name', 'design_variables', 'objectives']
for field in required:
if field not in config:
result.errors.append(ConfigError(
field=field,
message=f"Required field '{field}' is missing",
suggestion=f"Add '{field}' to your configuration"
))
# Recommended fields
recommended = ['description', 'engineering_context', 'optimization_settings', 'simulation']
for field in recommended:
if field not in config:
result.warnings.append(ConfigWarning(
field=field,
message=f"Recommended field '{field}' is missing",
suggestion=f"Consider adding '{field}' for better documentation"
))
def _validate_design_variables(variables: List[Dict], result: ValidationResult):
"""Validate design variables section."""
if not isinstance(variables, list):
result.errors.append(ConfigError(
field="design_variables",
message="design_variables must be a list",
suggestion="Use array format: [{parameter: ..., bounds: ...}, ...]"
))
return
if len(variables) == 0:
result.errors.append(ConfigError(
field="design_variables",
message="At least one design variable is required",
suggestion="Add design variables with parameter names and bounds"
))
return
param_names = set()
for i, var in enumerate(variables):
prefix = f"design_variables[{i}]"
# Required fields
if 'parameter' not in var:
result.errors.append(ConfigError(
field=prefix,
message="'parameter' name is required",
suggestion="Add 'parameter': 'your_nx_expression_name'"
))
else:
param = var['parameter']
if param in param_names:
result.errors.append(ConfigError(
field=prefix,
message=f"Duplicate parameter name: '{param}'",
suggestion="Each parameter name must be unique"
))
param_names.add(param)
if 'bounds' not in var:
result.errors.append(ConfigError(
field=prefix,
message="'bounds' are required",
suggestion="Add 'bounds': [min_value, max_value]"
))
else:
bounds = var['bounds']
if not isinstance(bounds, list) or len(bounds) != 2:
result.errors.append(ConfigError(
field=f"{prefix}.bounds",
message="Bounds must be [min, max] array",
suggestion="Use format: 'bounds': [1.0, 10.0]"
))
elif bounds[0] >= bounds[1]:
result.errors.append(ConfigError(
field=f"{prefix}.bounds",
message=f"Min ({bounds[0]}) must be less than max ({bounds[1]})",
suggestion="Swap values or adjust range"
))
elif bounds[0] == bounds[1]:
result.warnings.append(ConfigWarning(
field=f"{prefix}.bounds",
message="Min equals max - variable will be constant",
suggestion="If intentional, consider removing this variable"
))
# Type validation
var_type = var.get('type', 'float')
if var_type not in VALID_VAR_TYPES:
result.warnings.append(ConfigWarning(
field=f"{prefix}.type",
message=f"Unknown type '{var_type}'",
suggestion=f"Use one of: {', '.join(VALID_VAR_TYPES)}"
))
# Integer bounds check
if var_type == 'integer' and 'bounds' in var:
bounds = var['bounds']
if isinstance(bounds, list) and len(bounds) == 2:
if not (isinstance(bounds[0], int) and isinstance(bounds[1], int)):
result.warnings.append(ConfigWarning(
field=f"{prefix}.bounds",
message="Integer variable bounds should be integers",
suggestion="Use whole numbers for integer bounds"
))
def _validate_objectives(objectives: List[Dict], result: ValidationResult):
"""Validate objectives section."""
if not isinstance(objectives, list):
result.errors.append(ConfigError(
field="objectives",
message="objectives must be a list",
suggestion="Use array format: [{name: ..., goal: ...}, ...]"
))
return
if len(objectives) == 0:
result.errors.append(ConfigError(
field="objectives",
message="At least one objective is required",
suggestion="Add an objective with name and goal (minimize/maximize)"
))
return
if len(objectives) > 3:
result.warnings.append(ConfigWarning(
field="objectives",
message=f"{len(objectives)} objectives may make optimization difficult",
suggestion="Consider reducing to 2-3 objectives for clearer trade-offs"
))
obj_names = set()
for i, obj in enumerate(objectives):
prefix = f"objectives[{i}]"
# Required fields
if 'name' not in obj:
result.errors.append(ConfigError(
field=prefix,
message="'name' is required",
suggestion="Add 'name': 'mass' or similar"
))
else:
name = obj['name']
if name in obj_names:
result.errors.append(ConfigError(
field=prefix,
message=f"Duplicate objective name: '{name}'",
suggestion="Each objective name must be unique"
))
obj_names.add(name)
if 'goal' not in obj:
result.errors.append(ConfigError(
field=prefix,
message="'goal' is required",
suggestion="Add 'goal': 'minimize' or 'goal': 'maximize'"
))
elif obj['goal'] not in VALID_GOALS:
result.errors.append(ConfigError(
field=f"{prefix}.goal",
message=f"Invalid goal '{obj['goal']}'",
suggestion=f"Use one of: {', '.join(VALID_GOALS)}"
))
# Extraction validation
if 'extraction' in obj:
_validate_extraction(obj['extraction'], f"{prefix}.extraction", result)
def _validate_constraints(constraints: List[Dict], result: ValidationResult):
"""Validate constraints section."""
if not isinstance(constraints, list):
result.errors.append(ConfigError(
field="constraints",
message="constraints must be a list",
suggestion="Use array format: [{name: ..., type: ..., threshold: ...}, ...]"
))
return
constraint_names = set()
for i, const in enumerate(constraints):
prefix = f"constraints[{i}]"
# Required fields
if 'name' not in const:
result.errors.append(ConfigError(
field=prefix,
message="'name' is required",
suggestion="Add 'name': 'max_stress' or similar"
))
else:
name = const['name']
if name in constraint_names:
result.warnings.append(ConfigWarning(
field=prefix,
message=f"Duplicate constraint name: '{name}'",
suggestion="Consider using unique names for clarity"
))
constraint_names.add(name)
if 'type' not in const:
result.errors.append(ConfigError(
field=prefix,
message="'type' is required",
suggestion="Add 'type': 'less_than' or 'type': 'greater_than'"
))
elif const['type'] not in VALID_CONSTRAINT_TYPES:
result.errors.append(ConfigError(
field=f"{prefix}.type",
message=f"Invalid constraint type '{const['type']}'",
suggestion=f"Use one of: {', '.join(VALID_CONSTRAINT_TYPES)}"
))
if 'threshold' not in const:
result.errors.append(ConfigError(
field=prefix,
message="'threshold' is required",
suggestion="Add 'threshold': 200 (the limit value)"
))
# Extraction validation
if 'extraction' in const:
_validate_extraction(const['extraction'], f"{prefix}.extraction", result)
def _validate_extraction(extraction: Dict, prefix: str, result: ValidationResult):
"""Validate extraction configuration."""
if not isinstance(extraction, dict):
result.errors.append(ConfigError(
field=prefix,
message="extraction must be an object",
suggestion="Use format: {action: '...', params: {...}}"
))
return
if 'action' not in extraction:
result.errors.append(ConfigError(
field=prefix,
message="'action' is required in extraction",
suggestion="Add 'action': 'extract_displacement' or similar"
))
elif extraction['action'] not in VALID_EXTRACTION_ACTIONS:
result.warnings.append(ConfigWarning(
field=f"{prefix}.action",
message=f"Unknown extraction action '{extraction['action']}'",
suggestion=f"Standard actions: {', '.join(VALID_EXTRACTION_ACTIONS)}"
))
def _validate_optimization_settings(settings: Dict, result: ValidationResult):
"""Validate optimization settings section."""
# Protocol
if 'protocol' in settings:
protocol = settings['protocol']
if protocol not in VALID_PROTOCOLS:
result.warnings.append(ConfigWarning(
field="optimization_settings.protocol",
message=f"Unknown protocol '{protocol}'",
suggestion=f"Standard protocols: {', '.join(VALID_PROTOCOLS)}"
))
# Number of trials
if 'n_trials' in settings:
n_trials = settings['n_trials']
if not isinstance(n_trials, int) or n_trials < 1:
result.errors.append(ConfigError(
field="optimization_settings.n_trials",
message="n_trials must be a positive integer",
suggestion="Use a value like 30, 50, or 100"
))
elif n_trials < 10:
result.warnings.append(ConfigWarning(
field="optimization_settings.n_trials",
message=f"Only {n_trials} trials may not be enough for good optimization",
suggestion="Consider at least 20-30 trials for meaningful results"
))
# Sampler
if 'sampler' in settings:
sampler = settings['sampler']
if sampler not in VALID_SAMPLERS:
result.warnings.append(ConfigWarning(
field="optimization_settings.sampler",
message=f"Unknown sampler '{sampler}'",
suggestion=f"Standard samplers: {', '.join(VALID_SAMPLERS)}"
))
def _validate_simulation_settings(simulation: Dict, result: ValidationResult):
"""Validate simulation settings section."""
required = ['model_file', 'sim_file']
for field in required:
if field not in simulation:
result.warnings.append(ConfigWarning(
field=f"simulation.{field}",
message=f"'{field}' not specified",
suggestion="Add file name for better documentation"
))
def _validate_surrogate_settings(surrogate: Dict, result: ValidationResult):
"""Validate surrogate (NN) settings section."""
if surrogate.get('enabled', False):
# Check training settings
if 'training' in surrogate:
training = surrogate['training']
if training.get('initial_fea_trials', 0) < 20:
result.warnings.append(ConfigWarning(
field="surrogate_settings.training.initial_fea_trials",
message="Less than 20 initial FEA trials may not provide enough training data",
suggestion="Recommend at least 20-30 initial trials"
))
# Check model settings
if 'model' in surrogate:
model = surrogate['model']
if 'min_accuracy_mape' in model:
mape = model['min_accuracy_mape']
if mape > 20:
result.warnings.append(ConfigWarning(
field="surrogate_settings.model.min_accuracy_mape",
message=f"MAPE threshold {mape}% is quite high",
suggestion="Consider 5-10% for better surrogate accuracy"
))
def _validate_cross_references(config: Dict, result: ValidationResult):
"""Validate cross-references between sections."""
# Check sampler matches objective count
objectives = config.get('objectives', [])
settings = config.get('optimization_settings', {})
sampler = settings.get('sampler', 'TPESampler')
if len(objectives) > 1 and sampler == 'TPESampler':
result.warnings.append(ConfigWarning(
field="optimization_settings.sampler",
message="TPESampler with multiple objectives will scalarize them",
suggestion="Consider NSGAIISampler for true multi-objective optimization"
))
if len(objectives) == 1 and sampler == 'NSGAIISampler':
result.warnings.append(ConfigWarning(
field="optimization_settings.sampler",
message="NSGAIISampler is designed for multi-objective; single-objective may be slower",
suggestion="Consider TPESampler or CmaEsSampler for single-objective"
))
# Protocol consistency
protocol = settings.get('protocol', '')
if 'multi_objective' in protocol and len(objectives) == 1:
result.warnings.append(ConfigWarning(
field="optimization_settings.protocol",
message="Multi-objective protocol with single objective",
suggestion="Use protocol_10_single_objective instead"
))
if 'single_objective' in protocol and len(objectives) > 1:
result.warnings.append(ConfigWarning(
field="optimization_settings.protocol",
message="Single-objective protocol with multiple objectives",
suggestion="Use protocol_11_multi_objective for multiple objectives"
))
# CLI interface for direct execution
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python config_validator.py <path_to_config.json>")
sys.exit(1)
config_path = sys.argv[1]
result = validate_config_file(config_path)
print(result)
if result.is_valid:
print("\n✓ Configuration is valid!")
sys.exit(0)
else:
print(f"\n✗ Configuration has {len(result.errors)} error(s)")
sys.exit(1)

View File

@@ -0,0 +1,557 @@
"""
Model Validator for Atomizer
============================
Validates NX model files and simulation setup before running optimizations.
Checks file existence, structure, and configuration compatibility.
Usage:
from optimization_engine.validators import validate_model, validate_model_files
# Validate model directory
result = validate_model("studies/my_study/1_setup/model")
# Validate specific files
result = validate_model_files(
prt_file="Beam.prt",
sim_file="Beam_sim1.sim",
model_dir=Path("studies/my_study/1_setup/model")
)
if result.is_valid:
print("Model is ready!")
else:
for error in result.errors:
print(f"ERROR: {error}")
"""
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
@dataclass
class ModelError:
"""Represents a model validation error that blocks execution."""
component: str
message: str
suggestion: Optional[str] = None
def __str__(self):
msg = f"[{self.component}] {self.message}"
if self.suggestion:
msg += f" (Suggestion: {self.suggestion})"
return msg
@dataclass
class ModelWarning:
"""Represents a model validation warning."""
component: str
message: str
suggestion: Optional[str] = None
def __str__(self):
msg = f"[{self.component}] {self.message}"
if self.suggestion:
msg += f" (Suggestion: {self.suggestion})"
return msg
@dataclass
class ModelValidationResult:
"""Result of model validation."""
errors: List[ModelError] = field(default_factory=list)
warnings: List[ModelWarning] = field(default_factory=list)
# Discovered files
prt_file: Optional[Path] = None
sim_file: Optional[Path] = None
fem_file: Optional[Path] = None
# Model info
model_name: Optional[str] = None
model_dir: Optional[Path] = None
file_sizes: Dict[str, int] = field(default_factory=dict)
@property
def is_valid(self) -> bool:
"""Model is valid if there are no errors."""
return len(self.errors) == 0
@property
def has_simulation(self) -> bool:
"""Check if simulation file exists."""
return self.sim_file is not None
@property
def has_fem(self) -> bool:
"""Check if FEM mesh file exists."""
return self.fem_file is not None
def __str__(self):
lines = []
lines.append(f"Model: {self.model_name or 'Unknown'}")
lines.append(f"Directory: {self.model_dir or 'Unknown'}")
lines.append("")
lines.append("Files:")
if self.prt_file:
size = self.file_sizes.get('prt', 0)
lines.append(f" [OK] Part file: {self.prt_file.name} ({_format_size(size)})")
else:
lines.append(" [X] Part file: NOT FOUND")
if self.sim_file:
size = self.file_sizes.get('sim', 0)
lines.append(f" [OK] Simulation: {self.sim_file.name} ({_format_size(size)})")
else:
lines.append(" [X] Simulation: NOT FOUND")
if self.fem_file:
size = self.file_sizes.get('fem', 0)
lines.append(f" [OK] FEM mesh: {self.fem_file.name} ({_format_size(size)})")
else:
lines.append(" ? FEM mesh: Not found (will be created on first solve)")
if self.errors:
lines.append("")
lines.append(f"ERRORS ({len(self.errors)}):")
for e in self.errors:
lines.append(f" - {e}")
if self.warnings:
lines.append("")
lines.append(f"WARNINGS ({len(self.warnings)}):")
for w in self.warnings:
lines.append(f" - {w}")
if self.is_valid:
lines.append("")
lines.append("[OK] Model validation passed!")
return "\n".join(lines)
def _format_size(size_bytes: int) -> str:
"""Format file size for display."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"
def validate_model(model_dir: Union[str, Path],
expected_model_name: Optional[str] = None) -> ModelValidationResult:
"""
Validate an NX model directory.
Args:
model_dir: Path to the model directory
expected_model_name: Expected base name of the model (optional)
Returns:
ModelValidationResult with errors, warnings, and discovered files
"""
model_dir = Path(model_dir)
result = ModelValidationResult(model_dir=model_dir)
# Check directory exists
if not model_dir.exists():
result.errors.append(ModelError(
component="directory",
message=f"Model directory not found: {model_dir}",
suggestion="Create the directory and add NX model files"
))
return result
if not model_dir.is_dir():
result.errors.append(ModelError(
component="directory",
message=f"Path is not a directory: {model_dir}",
suggestion="Provide path to the model directory, not a file"
))
return result
# Find model files
prt_files = list(model_dir.glob("*.prt"))
sim_files = list(model_dir.glob("*.sim"))
fem_files = list(model_dir.glob("*.fem"))
# Check for part file
if len(prt_files) == 0:
result.errors.append(ModelError(
component="part",
message="No .prt file found in model directory",
suggestion="Add your NX part file to the model directory"
))
elif len(prt_files) > 1:
# Filter out internal files (often have _i suffix)
main_prt_files = [f for f in prt_files if not f.stem.endswith('_i')]
if len(main_prt_files) == 1:
prt_files = main_prt_files
elif expected_model_name:
matching = [f for f in prt_files if f.stem == expected_model_name]
if matching:
prt_files = matching
else:
result.warnings.append(ModelWarning(
component="part",
message=f"Multiple .prt files found, none match expected name '{expected_model_name}'",
suggestion="Specify the correct model name in configuration"
))
else:
result.warnings.append(ModelWarning(
component="part",
message=f"Multiple .prt files found: {[f.name for f in prt_files]}",
suggestion="Consider keeping only the main model file in the directory"
))
if prt_files:
result.prt_file = prt_files[0]
result.model_name = result.prt_file.stem
result.file_sizes['prt'] = result.prt_file.stat().st_size
# Validate part file
_validate_prt_file(result.prt_file, result)
# Check for simulation file
if len(sim_files) == 0:
result.errors.append(ModelError(
component="simulation",
message="No .sim file found in model directory",
suggestion="Create a simulation in NX and save it to this directory"
))
elif len(sim_files) > 1:
if result.model_name:
# Try to find matching sim file
expected_sim = f"{result.model_name}_sim1.sim"
matching = [f for f in sim_files if f.name.lower() == expected_sim.lower()]
if matching:
sim_files = matching
else:
result.warnings.append(ModelWarning(
component="simulation",
message=f"Multiple .sim files found: {[f.name for f in sim_files]}",
suggestion=f"Expected: {expected_sim}"
))
else:
result.warnings.append(ModelWarning(
component="simulation",
message=f"Multiple .sim files found: {[f.name for f in sim_files]}",
suggestion="Keep only one simulation file"
))
if sim_files:
result.sim_file = sim_files[0]
result.file_sizes['sim'] = result.sim_file.stat().st_size
# Validate simulation file
_validate_sim_file(result.sim_file, result)
# Check for FEM file
if len(fem_files) == 0:
result.warnings.append(ModelWarning(
component="fem",
message="No .fem file found",
suggestion="FEM mesh will be created automatically on first solve"
))
else:
if result.model_name:
expected_fem = f"{result.model_name}_fem1.fem"
matching = [f for f in fem_files if f.name.lower() == expected_fem.lower()]
if matching:
fem_files = matching
result.fem_file = fem_files[0]
result.file_sizes['fem'] = result.fem_file.stat().st_size
# Cross-validate files
_validate_file_relationships(result)
return result
def validate_model_files(prt_file: Union[str, Path],
sim_file: Union[str, Path],
model_dir: Optional[Union[str, Path]] = None) -> ModelValidationResult:
"""
Validate specific model files.
Args:
prt_file: Name or path to the part file
sim_file: Name or path to the simulation file
model_dir: Base directory (optional, will be inferred if full paths given)
Returns:
ModelValidationResult
"""
prt_path = Path(prt_file)
sim_path = Path(sim_file)
# If paths are relative and model_dir provided, resolve them
if model_dir:
model_dir = Path(model_dir)
if not prt_path.is_absolute():
prt_path = model_dir / prt_path
if not sim_path.is_absolute():
sim_path = model_dir / sim_path
else:
# Infer model_dir from prt_file
if prt_path.is_absolute():
model_dir = prt_path.parent
else:
model_dir = Path.cwd()
result = ModelValidationResult(model_dir=model_dir)
# Check part file
if not prt_path.exists():
result.errors.append(ModelError(
component="part",
message=f"Part file not found: {prt_path}",
suggestion="Check the file path and name"
))
else:
result.prt_file = prt_path
result.model_name = prt_path.stem
result.file_sizes['prt'] = prt_path.stat().st_size
_validate_prt_file(prt_path, result)
# Check simulation file
if not sim_path.exists():
result.errors.append(ModelError(
component="simulation",
message=f"Simulation file not found: {sim_path}",
suggestion="Check the file path and name"
))
else:
result.sim_file = sim_path
result.file_sizes['sim'] = sim_path.stat().st_size
_validate_sim_file(sim_path, result)
# Check for FEM file
if result.model_name:
fem_path = model_dir / f"{result.model_name}_fem1.fem"
if fem_path.exists():
result.fem_file = fem_path
result.file_sizes['fem'] = fem_path.stat().st_size
else:
# Try alternative naming
fem_files = list(model_dir.glob("*.fem")) if model_dir.exists() else []
if fem_files:
result.fem_file = fem_files[0]
result.file_sizes['fem'] = result.fem_file.stat().st_size
_validate_file_relationships(result)
return result
def _validate_prt_file(prt_path: Path, result: ModelValidationResult):
"""Validate a part file."""
# Check file size
size = prt_path.stat().st_size
if size == 0:
result.errors.append(ModelError(
component="part",
message="Part file is empty",
suggestion="Re-save the part file in NX"
))
return
if size < 1024:
result.warnings.append(ModelWarning(
component="part",
message=f"Part file is very small ({_format_size(size)})",
suggestion="Verify the file contains valid geometry"
))
# Check for NX file signature (basic validation)
try:
with open(prt_path, 'rb') as f:
header = f.read(8)
# NX files typically start with a specific signature
# This is a basic check - real NX files have more complex headers
if len(header) < 8:
result.warnings.append(ModelWarning(
component="part",
message="Part file appears incomplete",
suggestion="Re-save the file in NX"
))
except PermissionError:
result.errors.append(ModelError(
component="part",
message="Cannot read part file - permission denied",
suggestion="Close NX if the file is open, or check file permissions"
))
except Exception as e:
result.warnings.append(ModelWarning(
component="part",
message=f"Could not verify part file: {e}",
suggestion="Ensure file is a valid NX part"
))
def _validate_sim_file(sim_path: Path, result: ModelValidationResult):
"""Validate a simulation file."""
size = sim_path.stat().st_size
if size == 0:
result.errors.append(ModelError(
component="simulation",
message="Simulation file is empty",
suggestion="Re-save the simulation in NX"
))
return
if size < 512:
result.warnings.append(ModelWarning(
component="simulation",
message=f"Simulation file is very small ({_format_size(size)})",
suggestion="Verify simulation setup in NX"
))
def _validate_file_relationships(result: ModelValidationResult):
"""Validate relationships between model files."""
if not result.prt_file or not result.sim_file:
return
# Check naming convention
prt_stem = result.prt_file.stem
sim_stem = result.sim_file.stem
expected_sim_stem = f"{prt_stem}_sim1"
if sim_stem != expected_sim_stem and not sim_stem.startswith(prt_stem):
result.warnings.append(ModelWarning(
component="naming",
message=f"Simulation name '{sim_stem}' doesn't match part name '{prt_stem}'",
suggestion=f"Expected simulation name: {expected_sim_stem}.sim"
))
# Check FEM naming if present
if result.fem_file:
fem_stem = result.fem_file.stem
expected_fem_stem = f"{prt_stem}_fem1"
if fem_stem != expected_fem_stem and not fem_stem.startswith(prt_stem):
result.warnings.append(ModelWarning(
component="naming",
message=f"FEM name '{fem_stem}' doesn't match part name '{prt_stem}'",
suggestion=f"Expected FEM name: {expected_fem_stem}.fem"
))
# Check files are in same directory
if result.prt_file.parent != result.sim_file.parent:
result.warnings.append(ModelWarning(
component="directory",
message="Part and simulation files are in different directories",
suggestion="Keep all model files in the same directory"
))
def validate_study_model(study_name: str,
studies_dir: str = "studies",
config: Optional[Dict[str, Any]] = None) -> ModelValidationResult:
"""
Validate model for a complete study.
Args:
study_name: Name of the study folder (e.g., "uav_arm_optimization")
studies_dir: Base directory for studies (default: "studies")
config: Optional optimization_config.json contents (loaded dict, not path)
Returns:
ModelValidationResult
"""
study_path = Path(studies_dir) / study_name
model_dir = study_path / "1_setup" / "model"
# Load config if not provided
if config is None:
config_path = study_path / "1_setup" / "optimization_config.json"
if config_path.exists():
import json
try:
with open(config_path, 'r') as f:
config = json.load(f)
except (json.JSONDecodeError, IOError):
config = None
# Get expected file names from config if available
expected_model_name = None
if config and isinstance(config, dict) and 'simulation' in config:
sim_config = config['simulation']
if 'model_file' in sim_config:
expected_model_name = Path(sim_config['model_file']).stem
result = validate_model(model_dir, expected_model_name)
# Additional study-specific validations
if config and isinstance(config, dict):
_validate_config_model_match(config, result)
return result
def _validate_config_model_match(config: Dict[str, Any], result: ModelValidationResult):
"""Check that config matches discovered model files."""
sim_config = config.get('simulation', {})
# Check model file name matches
if 'model_file' in sim_config and result.prt_file:
config_model = Path(sim_config['model_file']).name
actual_model = result.prt_file.name
if config_model.lower() != actual_model.lower():
result.warnings.append(ModelWarning(
component="config",
message=f"Config specifies '{config_model}' but found '{actual_model}'",
suggestion="Update config to match actual file name"
))
# Check sim file name matches
if 'sim_file' in sim_config and result.sim_file:
config_sim = Path(sim_config['sim_file']).name
actual_sim = result.sim_file.name
if config_sim.lower() != actual_sim.lower():
result.warnings.append(ModelWarning(
component="config",
message=f"Config specifies '{config_sim}' but found '{actual_sim}'",
suggestion="Update config to match actual file name"
))
# CLI interface for direct execution
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python model_validator.py <path_to_model_directory>")
print(" python model_validator.py <path_to_study_directory>")
sys.exit(1)
path = Path(sys.argv[1])
# Check if it's a study directory or model directory
if (path / "1_setup" / "model").exists():
# It's a study directory
result = validate_study_model(path)
elif path.is_dir():
# It's a model directory
result = validate_model(path)
else:
print(f"ERROR: Path not found or not a directory: {path}")
sys.exit(1)
print(result)
if result.is_valid:
print("\n✓ Model validation passed!")
sys.exit(0)
else:
print(f"\n✗ Model has {len(result.errors)} error(s)")
sys.exit(1)

View File

@@ -0,0 +1,565 @@
"""
Results Validator for Atomizer Optimization Studies
Validates optimization results stored in study.db and provides
analysis of trial quality, constraint satisfaction, and data integrity.
Usage:
from optimization_engine.validators.results_validator import validate_results
result = validate_results("studies/my_study/2_results/study.db")
if result.is_valid:
print("Results are valid")
else:
for error in result.errors:
print(f"ERROR: {error}")
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import json
@dataclass
class ResultsError:
"""Represents an error found during results validation."""
code: str
message: str
trial_number: Optional[int] = None
def __str__(self) -> str:
if self.trial_number is not None:
return f"[{self.code}] Trial #{self.trial_number}: {self.message}"
return f"[{self.code}] {self.message}"
@dataclass
class ResultsWarning:
"""Represents a warning found during results validation."""
code: str
message: str
trial_number: Optional[int] = None
def __str__(self) -> str:
if self.trial_number is not None:
return f"[{self.code}] Trial #{self.trial_number}: {self.message}"
return f"[{self.code}] {self.message}"
@dataclass
class ResultsInfo:
"""Information about the optimization results."""
study_name: str = ""
n_trials: int = 0
n_completed: int = 0
n_failed: int = 0
n_pruned: int = 0
n_pareto: int = 0
feasibility_rate: float = 0.0
is_multi_objective: bool = False
objective_names: List[str] = field(default_factory=list)
best_values: Dict[str, float] = field(default_factory=dict)
parameter_names: List[str] = field(default_factory=list)
@dataclass
class ResultsValidationResult:
"""Complete validation result for optimization results."""
is_valid: bool
errors: List[ResultsError]
warnings: List[ResultsWarning]
info: ResultsInfo
def __str__(self) -> str:
lines = []
# Status
status = "[OK] Results validation passed!" if self.is_valid else "[X] Results validation failed!"
lines.append(status)
lines.append("")
# Info
lines.append("RESULTS SUMMARY")
lines.append("-" * 40)
lines.append(f" Study: {self.info.study_name}")
lines.append(f" Total trials: {self.info.n_trials}")
lines.append(f" Completed: {self.info.n_completed}")
lines.append(f" Failed: {self.info.n_failed}")
if self.info.n_pruned > 0:
lines.append(f" Pruned: {self.info.n_pruned}")
lines.append(f" Multi-objective: {'Yes' if self.info.is_multi_objective else 'No'}")
if self.info.is_multi_objective and self.info.n_pareto > 0:
lines.append(f" Pareto-optimal: {self.info.n_pareto}")
if self.info.feasibility_rate > 0:
lines.append(f" Feasibility rate: {self.info.feasibility_rate:.1f}%")
lines.append("")
# Best values
if self.info.best_values:
lines.append("BEST VALUES")
lines.append("-" * 40)
for name, value in self.info.best_values.items():
lines.append(f" {name}: {value:.4f}")
lines.append("")
# Errors
if self.errors:
lines.append("ERRORS")
lines.append("-" * 40)
for error in self.errors:
lines.append(f" {error}")
lines.append("")
# Warnings
if self.warnings:
lines.append("WARNINGS")
lines.append("-" * 40)
for warning in self.warnings:
lines.append(f" {warning}")
lines.append("")
return "\n".join(lines)
def validate_results(
db_path: str,
config_path: Optional[str] = None,
min_trials: int = 1
) -> ResultsValidationResult:
"""
Validate optimization results stored in study.db.
Args:
db_path: Path to study.db file
config_path: Optional path to optimization_config.json for cross-validation
min_trials: Minimum number of completed trials required
Returns:
ResultsValidationResult with errors, warnings, and info
"""
errors: List[ResultsError] = []
warnings: List[ResultsWarning] = []
info = ResultsInfo()
db_path = Path(db_path)
# Check database exists
if not db_path.exists():
errors.append(ResultsError(
code="DB_NOT_FOUND",
message=f"Database not found: {db_path}"
))
return ResultsValidationResult(
is_valid=False,
errors=errors,
warnings=warnings,
info=info
)
# Try to load with Optuna
try:
import optuna
storage_url = f"sqlite:///{db_path}"
# Get all studies in the database
storage = optuna.storages.RDBStorage(url=storage_url)
study_summaries = storage.get_all_studies()
if not study_summaries:
errors.append(ResultsError(
code="NO_STUDIES",
message="Database contains no optimization studies"
))
return ResultsValidationResult(
is_valid=False,
errors=errors,
warnings=warnings,
info=info
)
# Use the first (usually only) study
study_summary = study_summaries[0]
info.study_name = study_summary.study_name
# Load the full study
study = optuna.load_study(
study_name=info.study_name,
storage=storage_url
)
# Basic counts
info.n_trials = len(study.trials)
info.n_completed = len([t for t in study.trials
if t.state == optuna.trial.TrialState.COMPLETE])
info.n_failed = len([t for t in study.trials
if t.state == optuna.trial.TrialState.FAIL])
info.n_pruned = len([t for t in study.trials
if t.state == optuna.trial.TrialState.PRUNED])
# Check minimum trials
if info.n_completed < min_trials:
errors.append(ResultsError(
code="INSUFFICIENT_TRIALS",
message=f"Only {info.n_completed} completed trials (minimum: {min_trials})"
))
# Check for multi-objective
info.is_multi_objective = len(study.directions) > 1
# Get parameter names from first completed trial
for trial in study.trials:
if trial.state == optuna.trial.TrialState.COMPLETE:
info.parameter_names = list(trial.params.keys())
break
# Analyze Pareto front for multi-objective
if info.is_multi_objective:
try:
pareto_trials = study.best_trials
info.n_pareto = len(pareto_trials)
if info.n_pareto == 0 and info.n_completed > 0:
warnings.append(ResultsWarning(
code="NO_PARETO",
message="No Pareto-optimal solutions found despite completed trials"
))
except Exception as e:
warnings.append(ResultsWarning(
code="PARETO_ERROR",
message=f"Could not compute Pareto front: {e}"
))
else:
# Single objective - get best value
if info.n_completed > 0:
try:
best_trial = study.best_trial
info.best_values["objective"] = best_trial.value
except Exception:
pass
# Analyze feasibility
feasible_count = 0
for trial in study.trials:
if trial.state == optuna.trial.TrialState.COMPLETE:
# Check user_attrs for feasibility flag
is_feasible = trial.user_attrs.get('feasible', True)
if is_feasible:
feasible_count += 1
if info.n_completed > 0:
info.feasibility_rate = (feasible_count / info.n_completed) * 100
if info.feasibility_rate < 50:
warnings.append(ResultsWarning(
code="LOW_FEASIBILITY",
message=f"Low feasibility rate ({info.feasibility_rate:.1f}%) - consider relaxing constraints or adjusting bounds"
))
elif info.feasibility_rate < 80:
warnings.append(ResultsWarning(
code="MODERATE_FEASIBILITY",
message=f"Moderate feasibility rate ({info.feasibility_rate:.1f}%)"
))
# Check for data quality issues
_validate_trial_data(study, errors, warnings)
# Cross-validate with config if provided
if config_path:
_cross_validate_with_config(study, config_path, info, errors, warnings)
except ImportError:
errors.append(ResultsError(
code="OPTUNA_NOT_INSTALLED",
message="Optuna is not installed. Cannot validate results."
))
except Exception as e:
errors.append(ResultsError(
code="LOAD_ERROR",
message=f"Failed to load study: {e}"
))
return ResultsValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
info=info
)
def _validate_trial_data(study, errors: List[ResultsError], warnings: List[ResultsWarning]):
"""Validate individual trial data quality."""
import optuna
for trial in study.trials:
if trial.state != optuna.trial.TrialState.COMPLETE:
continue
# Check for NaN or inf values
if trial.values:
for i, val in enumerate(trial.values):
if val is None:
errors.append(ResultsError(
code="NULL_OBJECTIVE",
message=f"Objective {i} has null value",
trial_number=trial.number
))
elif not isinstance(val, (int, float)):
errors.append(ResultsError(
code="INVALID_OBJECTIVE_TYPE",
message=f"Objective {i} has invalid type: {type(val)}",
trial_number=trial.number
))
elif isinstance(val, float):
import math
if math.isnan(val):
errors.append(ResultsError(
code="NAN_OBJECTIVE",
message=f"Objective {i} is NaN",
trial_number=trial.number
))
elif math.isinf(val):
warnings.append(ResultsWarning(
code="INF_OBJECTIVE",
message=f"Objective {i} is infinite",
trial_number=trial.number
))
# Check for missing parameters
if not trial.params:
errors.append(ResultsError(
code="MISSING_PARAMS",
message="Trial has no parameters recorded",
trial_number=trial.number
))
# Check for negative values where unexpected
for param_name, param_value in trial.params.items():
if 'thickness' in param_name.lower() and param_value <= 0:
warnings.append(ResultsWarning(
code="INVALID_THICKNESS",
message=f"{param_name} = {param_value} (non-positive thickness)",
trial_number=trial.number
))
elif 'diameter' in param_name.lower() and param_value <= 0:
warnings.append(ResultsWarning(
code="INVALID_DIAMETER",
message=f"{param_name} = {param_value} (non-positive diameter)",
trial_number=trial.number
))
def _cross_validate_with_config(
study,
config_path: str,
info: ResultsInfo,
errors: List[ResultsError],
warnings: List[ResultsWarning]
):
"""Cross-validate results with optimization config."""
import optuna
config_path = Path(config_path)
if not config_path.exists():
warnings.append(ResultsWarning(
code="CONFIG_NOT_FOUND",
message=f"Config file not found for cross-validation: {config_path}"
))
return
try:
with open(config_path, 'r') as f:
config = json.load(f)
# Check parameter names match
config_params = set()
for var in config.get('design_variables', []):
param_name = var.get('parameter', var.get('name', ''))
if param_name:
config_params.add(param_name)
result_params = set(info.parameter_names)
missing_in_results = config_params - result_params
extra_in_results = result_params - config_params
if missing_in_results:
warnings.append(ResultsWarning(
code="MISSING_PARAMS_IN_RESULTS",
message=f"Config params not in results: {missing_in_results}"
))
if extra_in_results:
warnings.append(ResultsWarning(
code="EXTRA_PARAMS_IN_RESULTS",
message=f"Results have extra params not in config: {extra_in_results}"
))
# Check objective count matches
config_objectives = len(config.get('objectives', []))
result_objectives = len(study.directions)
if config_objectives != result_objectives:
warnings.append(ResultsWarning(
code="OBJECTIVE_COUNT_MISMATCH",
message=f"Config has {config_objectives} objectives, results have {result_objectives}"
))
# Get objective names from config
for obj in config.get('objectives', []):
obj_name = obj.get('name', '')
if obj_name:
info.objective_names.append(obj_name)
# Check bounds violations
for trial in study.trials:
if trial.state != optuna.trial.TrialState.COMPLETE:
continue
for var in config.get('design_variables', []):
param_name = var.get('parameter', var.get('name', ''))
bounds = var.get('bounds', [])
if param_name in trial.params and len(bounds) == 2:
value = trial.params[param_name]
min_val, max_val = bounds
# Small tolerance for floating point
tolerance = (max_val - min_val) * 0.001
if value < min_val - tolerance:
warnings.append(ResultsWarning(
code="BELOW_MIN_BOUND",
message=f"{param_name} = {value} < min ({min_val})",
trial_number=trial.number
))
elif value > max_val + tolerance:
warnings.append(ResultsWarning(
code="ABOVE_MAX_BOUND",
message=f"{param_name} = {value} > max ({max_val})",
trial_number=trial.number
))
except json.JSONDecodeError as e:
warnings.append(ResultsWarning(
code="CONFIG_PARSE_ERROR",
message=f"Could not parse config JSON: {e}"
))
except Exception as e:
warnings.append(ResultsWarning(
code="CONFIG_ERROR",
message=f"Error reading config: {e}"
))
def validate_study_results(study_name: str) -> ResultsValidationResult:
"""
Convenience function to validate results for a named study.
Args:
study_name: Name of the study (folder in studies/)
Returns:
ResultsValidationResult
"""
from pathlib import Path
study_dir = Path(f"studies/{study_name}")
db_path = study_dir / "2_results" / "study.db"
config_path = study_dir / "1_setup" / "optimization_config.json"
return validate_results(
db_path=str(db_path),
config_path=str(config_path) if config_path.exists() else None
)
def get_pareto_summary(db_path: str) -> Dict[str, Any]:
"""
Get a summary of Pareto-optimal designs from results.
Args:
db_path: Path to study.db
Returns:
Dictionary with Pareto front information
"""
try:
import optuna
storage_url = f"sqlite:///{db_path}"
storage = optuna.storages.RDBStorage(url=storage_url)
study_summaries = storage.get_all_studies()
if not study_summaries:
return {"error": "No studies found"}
study = optuna.load_study(
study_name=study_summaries[0].study_name,
storage=storage_url
)
if len(study.directions) < 2:
# Single objective
if study.best_trial:
return {
"type": "single_objective",
"best_trial": study.best_trial.number,
"best_value": study.best_value,
"best_params": study.best_params
}
return {"error": "No completed trials"}
# Multi-objective
pareto_trials = study.best_trials
designs = []
for trial in pareto_trials:
designs.append({
"trial_number": trial.number,
"objectives": trial.values,
"parameters": trial.params,
"user_attrs": dict(trial.user_attrs)
})
# Calculate ranges
ranges = {}
if designs:
for i in range(len(designs[0]["objectives"])):
values = [d["objectives"][i] for d in designs]
ranges[f"objective_{i}"] = {
"min": min(values),
"max": max(values),
"spread": max(values) - min(values)
}
return {
"type": "multi_objective",
"n_pareto": len(pareto_trials),
"designs": designs,
"ranges": ranges
}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python results_validator.py <study_name_or_db_path>")
print("Example: python results_validator.py uav_arm_optimization")
print("Example: python results_validator.py studies/my_study/2_results/study.db")
sys.exit(1)
arg = sys.argv[1]
# Check if it's a study name or db path
if arg.endswith('.db'):
result = validate_results(arg)
else:
result = validate_study_results(arg)
print(result)

View File

@@ -0,0 +1,421 @@
"""
Study Validator for Atomizer Optimization Studies
Comprehensive validation that combines config, model, and results validation
to provide a complete health check for an optimization study.
Usage:
from optimization_engine.validators.study_validator import validate_study
result = validate_study("uav_arm_optimization")
print(result) # Shows complete status with all checks
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Any, Optional
from enum import Enum
class StudyStatus(Enum):
"""Overall status of a study."""
NOT_FOUND = "not_found"
SETUP_INCOMPLETE = "setup_incomplete"
READY_TO_RUN = "ready_to_run"
RUNNING = "running"
COMPLETED = "completed"
HAS_ERRORS = "has_errors"
@dataclass
class StudyCheckResult:
"""Result of a single validation check."""
name: str
passed: bool
message: str
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class StudyValidationResult:
"""Complete validation result for a study."""
study_name: str
status: StudyStatus
checks: List[StudyCheckResult]
summary: Dict[str, Any]
@property
def is_ready_to_run(self) -> bool:
"""Check if study is ready to run optimization."""
return self.status in [StudyStatus.READY_TO_RUN, StudyStatus.COMPLETED]
@property
def error_count(self) -> int:
"""Count of failed checks."""
return len([c for c in self.checks if not c.passed])
@property
def warning_count(self) -> int:
"""Count of warnings (checks that passed with warnings)."""
return len([c for c in self.checks
if c.passed and 'warning' in c.message.lower()])
def __str__(self) -> str:
lines = []
# Header
lines.append("=" * 60)
lines.append(f"STUDY VALIDATION: {self.study_name}")
lines.append("=" * 60)
lines.append("")
# Status
status_icons = {
StudyStatus.NOT_FOUND: "[X] NOT FOUND",
StudyStatus.SETUP_INCOMPLETE: "[!] SETUP INCOMPLETE",
StudyStatus.READY_TO_RUN: "[OK] READY TO RUN",
StudyStatus.RUNNING: "[...] RUNNING",
StudyStatus.COMPLETED: "[OK] COMPLETED",
StudyStatus.HAS_ERRORS: "[X] HAS ERRORS"
}
lines.append(f"Status: {status_icons.get(self.status, str(self.status))}")
lines.append("")
# Summary info
if self.summary:
lines.append("SUMMARY")
lines.append("-" * 40)
for key, value in self.summary.items():
lines.append(f" {key}: {value}")
lines.append("")
# Checks
lines.append("VALIDATION CHECKS")
lines.append("-" * 40)
for check in self.checks:
icon = "[OK]" if check.passed else "[X]"
lines.append(f" {icon} {check.name}")
if not check.passed or check.details:
lines.append(f" {check.message}")
lines.append("")
# Final verdict
if self.status == StudyStatus.READY_TO_RUN:
lines.append("Ready to run optimization!")
lines.append(" Command: python run_optimization.py --trials 30")
elif self.status == StudyStatus.COMPLETED:
lines.append("Optimization completed. View results:")
lines.append(" Command: python -m optimization_engine.validators.results_validator " + self.study_name)
elif self.status == StudyStatus.SETUP_INCOMPLETE:
lines.append("Complete setup before running:")
for check in self.checks:
if not check.passed:
lines.append(f" - Fix: {check.message}")
elif self.status == StudyStatus.HAS_ERRORS:
lines.append("Fix errors before continuing:")
for check in self.checks:
if not check.passed:
lines.append(f" - {check.message}")
return "\n".join(lines)
def validate_study(study_name: str, studies_dir: str = "studies") -> StudyValidationResult:
"""
Validate all aspects of an optimization study.
Args:
study_name: Name of the study folder
studies_dir: Base directory for studies (default: "studies")
Returns:
StudyValidationResult with complete validation status
"""
checks: List[StudyCheckResult] = []
summary: Dict[str, Any] = {}
study_path = Path(studies_dir) / study_name
# Check 1: Study folder exists
if not study_path.exists():
checks.append(StudyCheckResult(
name="Study folder exists",
passed=False,
message=f"Study folder not found: {study_path}"
))
return StudyValidationResult(
study_name=study_name,
status=StudyStatus.NOT_FOUND,
checks=checks,
summary=summary
)
checks.append(StudyCheckResult(
name="Study folder exists",
passed=True,
message="OK"
))
# Check 2: Required directory structure
setup_dir = study_path / "1_setup"
results_dir = study_path / "2_results"
model_dir = setup_dir / "model"
structure_ok = True
structure_msg = []
if not setup_dir.exists():
structure_ok = False
structure_msg.append("Missing 1_setup/")
if not model_dir.exists():
structure_ok = False
structure_msg.append("Missing 1_setup/model/")
checks.append(StudyCheckResult(
name="Directory structure",
passed=structure_ok,
message="OK" if structure_ok else f"Missing: {', '.join(structure_msg)}"
))
# Check 3: Configuration file
config_path = setup_dir / "optimization_config.json"
config_valid = False
config_details = {}
if config_path.exists():
from .config_validator import validate_config_file
config_result = validate_config_file(str(config_path))
config_valid = config_result.is_valid
config_details = {
"errors": len(config_result.errors),
"warnings": len(config_result.warnings)
}
summary["design_variables"] = len(config_result.config.get("design_variables", []))
summary["objectives"] = len(config_result.config.get("objectives", []))
summary["constraints"] = len(config_result.config.get("constraints", []))
if config_valid:
msg = "Configuration valid"
if config_result.warnings:
msg += f" ({len(config_result.warnings)} warnings)"
else:
msg = f"{len(config_result.errors)} errors"
else:
msg = "optimization_config.json not found"
checks.append(StudyCheckResult(
name="Configuration file",
passed=config_valid,
message=msg,
details=config_details
))
# Check 4: Model files
model_valid = False
model_details = {}
if model_dir.exists():
from .model_validator import validate_study_model
model_result = validate_study_model(study_name, studies_dir)
model_valid = model_result.is_valid
model_details = {
"prt": model_result.prt_file is not None,
"sim": model_result.sim_file is not None,
"fem": model_result.fem_file is not None
}
if model_result.model_name:
summary["model_name"] = model_result.model_name
if model_valid:
msg = "Model files valid"
if model_result.warnings:
msg += f" ({len(model_result.warnings)} warnings)"
else:
msg = f"{len(model_result.errors)} errors"
else:
msg = "Model directory not found"
checks.append(StudyCheckResult(
name="Model files",
passed=model_valid,
message=msg,
details=model_details
))
# Check 5: Run script
run_script = study_path / "run_optimization.py"
run_script_exists = run_script.exists()
checks.append(StudyCheckResult(
name="Run script",
passed=run_script_exists,
message="OK" if run_script_exists else "run_optimization.py not found"
))
# Check 6: Results (if any)
db_path = results_dir / "study.db"
has_results = db_path.exists()
results_valid = False
results_details = {}
if has_results:
from .results_validator import validate_results
results_result = validate_results(
str(db_path),
str(config_path) if config_path.exists() else None
)
results_valid = results_result.is_valid
results_details = {
"trials": results_result.info.n_trials,
"completed": results_result.info.n_completed,
"failed": results_result.info.n_failed,
"pareto": results_result.info.n_pareto
}
summary["trials_completed"] = results_result.info.n_completed
summary["trials_failed"] = results_result.info.n_failed
if results_result.info.n_pareto > 0:
summary["pareto_designs"] = results_result.info.n_pareto
if results_valid:
msg = f"{results_result.info.n_completed} completed trials"
if results_result.info.is_multi_objective:
msg += f", {results_result.info.n_pareto} Pareto-optimal"
else:
msg = f"{len(results_result.errors)} errors in results"
checks.append(StudyCheckResult(
name="Optimization results",
passed=results_valid,
message=msg,
details=results_details
))
else:
checks.append(StudyCheckResult(
name="Optimization results",
passed=True, # Not having results is OK for a new study
message="No results yet (study not run)",
details={"exists": False}
))
# Determine overall status
critical_checks_passed = all([
checks[0].passed, # folder exists
checks[1].passed, # structure
checks[2].passed, # config
checks[3].passed, # model
])
if not critical_checks_passed:
status = StudyStatus.SETUP_INCOMPLETE
elif has_results and results_valid:
# Check if still running (look for lock file or recent activity)
lock_file = results_dir / ".optimization_lock"
if lock_file.exists():
status = StudyStatus.RUNNING
else:
status = StudyStatus.COMPLETED
elif has_results and not results_valid:
status = StudyStatus.HAS_ERRORS
else:
status = StudyStatus.READY_TO_RUN
return StudyValidationResult(
study_name=study_name,
status=status,
checks=checks,
summary=summary
)
def list_studies(studies_dir: str = "studies") -> List[Dict[str, Any]]:
"""
List all studies and their validation status.
Args:
studies_dir: Base directory for studies
Returns:
List of dictionaries with study name and status
"""
studies_path = Path(studies_dir)
results = []
if not studies_path.exists():
return results
for study_folder in sorted(studies_path.iterdir()):
if study_folder.is_dir() and not study_folder.name.startswith('.'):
validation = validate_study(study_folder.name, studies_dir)
results.append({
"name": study_folder.name,
"status": validation.status.value,
"is_ready": validation.is_ready_to_run,
"errors": validation.error_count,
"trials": validation.summary.get("trials_completed", 0),
"pareto": validation.summary.get("pareto_designs", 0)
})
return results
def quick_check(study_name: str, studies_dir: str = "studies") -> bool:
"""
Quick check if a study is ready to run.
Args:
study_name: Name of the study
studies_dir: Base directory for studies
Returns:
True if ready to run, False otherwise
"""
result = validate_study(study_name, studies_dir)
return result.is_ready_to_run
def get_study_health(study_name: str, studies_dir: str = "studies") -> Dict[str, Any]:
"""
Get a simple health report for a study.
Args:
study_name: Name of the study
studies_dir: Base directory for studies
Returns:
Dictionary with health information
"""
result = validate_study(study_name, studies_dir)
return {
"name": study_name,
"status": result.status.value,
"is_ready": result.is_ready_to_run,
"checks_passed": len([c for c in result.checks if c.passed]),
"checks_total": len(result.checks),
"error_count": result.error_count,
"summary": result.summary
}
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
# List all studies
print("Available studies:")
print("-" * 60)
studies = list_studies()
if not studies:
print(" No studies found in studies/")
else:
for study in studies:
status_icon = "[OK]" if study["is_ready"] else "[X]"
trials_info = f"{study['trials']} trials" if study['trials'] > 0 else "no trials"
print(f" {status_icon} {study['name']}: {study['status']} ({trials_info})")
print()
print("Usage: python study_validator.py <study_name>")
else:
study_name = sys.argv[1]
result = validate_study(study_name)
print(result)