Files
Atomizer/optimization_engine/interview/engineering_validator.py
Anto01 32caa5d05c feat: Implement Study Interview Mode as default study creation method
Study Interview Mode is now the DEFAULT for all study creation requests.
This intelligent Q&A system guides users through optimization setup with:

- 7-phase interview flow: introspection → objectives → constraints → design_variables → validation → review → complete
- Material-aware validation with 12 materials and fuzzy name matching
- Anti-pattern detection for 12 common mistakes (mass-no-constraint, stress-over-yield, etc.)
- Auto extractor mapping E1-E24 based on goal keywords
- State persistence with JSON serialization and backup rotation
- StudyBlueprint generation with full validation

Triggers: "create a study", "new study", "optimize this", any study creation intent
Skip with: "skip interview", "quick setup", "manual config"

Components:
- StudyInterviewEngine: Main orchestrator
- QuestionEngine: Conditional logic evaluation
- EngineeringValidator: MaterialsDatabase + AntiPatternDetector
- InterviewPresenter: Markdown formatting for Claude
- StudyBlueprint: Validated configuration output
- InterviewState: Persistent state management

All 129 tests passing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-03 11:06:07 -05:00

782 lines
27 KiB
Python

"""
Engineering Validator
Validates interview answers against engineering knowledge and detects anti-patterns.
Provides:
- MaterialsDatabase: Common materials with properties
- AntiPatternDetector: Detects optimization setup mistakes
- EngineeringValidator: Main validation logic
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import json
import re
from difflib import SequenceMatcher
@dataclass
class Material:
"""Engineering material with properties."""
id: str
names: List[str]
category: str
properties: Dict[str, Any]
notes: Optional[str] = None
recommended_safety_factors: Optional[Dict[str, float]] = None
@property
def density(self) -> Optional[float]:
return self.properties.get("density_kg_m3")
@property
def yield_stress(self) -> Optional[float]:
return self.properties.get("yield_stress_mpa")
@property
def ultimate_stress(self) -> Optional[float]:
return self.properties.get("ultimate_stress_mpa")
@property
def elastic_modulus(self) -> Optional[float]:
return self.properties.get("elastic_modulus_gpa")
def get_safe_stress(self, application: str = "static") -> Optional[float]:
"""Get safe stress limit with recommended safety factor."""
if self.yield_stress is None:
return None
sf = 1.5 # Default
if self.recommended_safety_factors:
sf = self.recommended_safety_factors.get(application, 1.5)
return self.yield_stress / sf
class MaterialsDatabase:
"""
Database of common engineering materials and properties.
Supports fuzzy name matching for user convenience.
"""
def __init__(self, db_path: Optional[Path] = None):
"""
Initialize materials database.
Args:
db_path: Path to materials JSON. Uses default if None.
"""
if db_path is None:
db_path = Path(__file__).parent / "schemas" / "materials_database.json"
self.db_path = db_path
self.materials: Dict[str, Material] = {}
self._name_index: Dict[str, str] = {} # name -> material_id
self._load_database()
def _load_database(self) -> None:
"""Load materials from JSON file."""
if not self.db_path.exists():
return
with open(self.db_path, "r", encoding="utf-8") as f:
data = json.load(f)
for mat_data in data.get("materials", []):
material = Material(
id=mat_data["id"],
names=mat_data["names"],
category=mat_data["category"],
properties=mat_data["properties"],
notes=mat_data.get("notes"),
recommended_safety_factors=mat_data.get("recommended_safety_factors"),
)
self.materials[material.id] = material
# Build name index
for name in material.names:
self._name_index[name.lower()] = material.id
def get_material(self, name: str) -> Optional[Material]:
"""
Look up material by name (supports fuzzy matching).
Args:
name: Material name (e.g., "Al 6061-T6", "aluminum", "steel 304")
Returns:
Material if found, None otherwise
"""
name_lower = name.lower().strip()
# Exact match
if name_lower in self._name_index:
return self.materials[self._name_index[name_lower]]
# Try by ID
if name_lower in self.materials:
return self.materials[name_lower]
# Fuzzy match
best_match = None
best_ratio = 0.6 # Minimum threshold
for indexed_name, mat_id in self._name_index.items():
ratio = SequenceMatcher(None, name_lower, indexed_name).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = mat_id
if best_match:
return self.materials[best_match]
return None
def get_yield_stress(self, material_name: str) -> Optional[float]:
"""Get yield stress for material in MPa."""
material = self.get_material(material_name)
return material.yield_stress if material else None
def validate_stress_limit(
self,
material_name: str,
limit: float,
safety_factor: float = 1.0,
application: str = "static"
) -> "ValidationResult":
"""
Check if stress limit is reasonable for material.
Args:
material_name: Material name
limit: Proposed stress limit in MPa
safety_factor: Applied safety factor (if any)
application: Application type (static, fatigue, aerospace)
Returns:
ValidationResult with status and message
"""
material = self.get_material(material_name)
if material is None:
return ValidationResult(
valid=True,
message=f"Material '{material_name}' not found in database. Unable to validate stress limit.",
severity="info"
)
if material.yield_stress is None:
return ValidationResult(
valid=True,
message=f"Material '{material.id}' does not have yield stress defined (e.g., brittle material).",
severity="info"
)
yield_stress = material.yield_stress
effective_limit = limit * safety_factor if safety_factor > 1 else limit
# Check various thresholds
if effective_limit > material.ultimate_stress if material.ultimate_stress else yield_stress:
return ValidationResult(
valid=False,
message=f"Stress limit ({limit} MPa) exceeds ultimate stress ({material.ultimate_stress or yield_stress} MPa) for {material.id}",
severity="error",
suggestion=f"Reduce stress limit to below {(material.ultimate_stress or yield_stress) / 1.5:.0f} MPa"
)
if effective_limit > yield_stress:
return ValidationResult(
valid=True, # Warning, not error
message=f"Stress limit ({limit} MPa) exceeds yield stress ({yield_stress} MPa) for {material.id}. This allows plastic deformation.",
severity="warning",
suggestion=f"Consider reducing to {yield_stress / 1.5:.0f} MPa (SF=1.5)"
)
# Get recommended safe stress
safe_stress = material.get_safe_stress(application)
if safe_stress and limit > safe_stress:
rec_sf = material.recommended_safety_factors.get(application, 1.5) if material.recommended_safety_factors else 1.5
return ValidationResult(
valid=True,
message=f"Stress limit ({limit} MPa) is {limit/yield_stress*100:.0f}% of yield. Recommended safety factor for {application}: {rec_sf}",
severity="info",
suggestion=f"Typical {application} limit: {safe_stress:.0f} MPa"
)
return ValidationResult(
valid=True,
message=f"Stress limit ({limit} MPa) is acceptable for {material.id} (yield: {yield_stress} MPa)",
severity="ok"
)
def list_materials(self, category: Optional[str] = None) -> List[Material]:
"""List all materials, optionally filtered by category."""
materials = list(self.materials.values())
if category:
materials = [m for m in materials if m.category == category]
return materials
@dataclass
class ValidationResult:
"""Result of a validation check."""
valid: bool
message: str
severity: str = "ok" # ok, info, warning, error
suggestion: Optional[str] = None
field: Optional[str] = None
def is_blocking(self) -> bool:
"""Check if this result blocks proceeding."""
return self.severity == "error"
@dataclass
class AntiPattern:
"""Detected anti-pattern."""
id: str
name: str
description: str
severity: str # error, warning, info
fix_suggestion: Optional[str] = None
auto_fix: Optional[Dict[str, Any]] = None
acknowledged: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"severity": self.severity,
"fix_suggestion": self.fix_suggestion,
"auto_fix": self.auto_fix,
"acknowledged": self.acknowledged,
}
class AntiPatternDetector:
"""
Detects common optimization setup mistakes.
Loads patterns from JSON and evaluates against interview state.
"""
def __init__(self, patterns_path: Optional[Path] = None):
"""
Initialize anti-pattern detector.
Args:
patterns_path: Path to patterns JSON. Uses default if None.
"""
if patterns_path is None:
patterns_path = Path(__file__).parent / "schemas" / "anti_patterns.json"
self.patterns_path = patterns_path
self.patterns: List[Dict[str, Any]] = []
self._load_patterns()
def _load_patterns(self) -> None:
"""Load patterns from JSON file."""
if not self.patterns_path.exists():
return
with open(self.patterns_path, "r", encoding="utf-8") as f:
data = json.load(f)
self.patterns = data.get("patterns", [])
def check_all(self, state: "InterviewState", introspection: Dict[str, Any] = None) -> List[AntiPattern]:
"""
Run all anti-pattern checks.
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
List of detected anti-patterns
"""
detected = []
context = self._build_context(state, introspection or {})
for pattern in self.patterns:
if self._evaluate_condition(pattern.get("condition", {}), context):
detected.append(AntiPattern(
id=pattern["id"],
name=pattern["name"],
description=pattern["description"],
severity=pattern["severity"],
fix_suggestion=pattern.get("fix_suggestion"),
auto_fix=pattern.get("auto_fix"),
))
return detected
def _build_context(self, state: "InterviewState", introspection: Dict[str, Any]) -> Dict[str, Any]:
"""Build evaluation context from state and introspection."""
answers = state.answers
# Extract objectives as list of goal values
objectives_list = []
for obj in answers.get("objectives", []):
if isinstance(obj, dict):
objectives_list.append(obj.get("goal", ""))
else:
objectives_list.append(str(obj))
# Add secondary objectives if present
for obj in answers.get("objectives_secondary", []):
if obj != "none":
objectives_list.append(obj)
return {
"objectives": objectives_list,
"constraints": answers.get("constraints", {}),
"design_variables": answers.get("design_variables", []),
"design_variable_count": len(answers.get("design_variables", [])),
"analysis_types": answers.get("analysis_types", []),
"solve_all_solutions": answers.get("solve_all_solutions", True),
"n_trials": answers.get("n_trials", 100),
"introspection": introspection,
"material": introspection.get("material"),
"baseline_violations": state.get_answer("baseline_violations"),
}
def _evaluate_condition(self, condition: Dict[str, Any], context: Dict[str, Any]) -> bool:
"""Evaluate a pattern condition against context."""
if not condition:
return False
cond_type = condition.get("type", "")
if cond_type == "and":
return all(
self._evaluate_condition(c, context)
for c in condition.get("conditions", [])
)
elif cond_type == "or":
return any(
self._evaluate_condition(c, context)
for c in condition.get("conditions", [])
)
elif cond_type == "not":
inner = condition.get("condition", {})
return not self._evaluate_condition(inner, context)
elif cond_type == "contains":
field_value = self._get_field(context, condition.get("field", ""))
target = condition.get("value")
if isinstance(field_value, list):
return target in field_value
return False
elif cond_type == "not_contains":
field_value = self._get_field(context, condition.get("field", ""))
target = condition.get("value")
if isinstance(field_value, list):
return target not in field_value
return True
elif cond_type == "equals":
field_value = self._get_field(context, condition.get("field", ""))
return field_value == condition.get("value")
elif cond_type == "empty":
field_value = self._get_field(context, condition.get("field", ""))
if field_value is None:
return True
if isinstance(field_value, (list, dict, str)):
return len(field_value) == 0
return False
elif cond_type == "exists":
field_value = self._get_field(context, condition.get("field", ""))
return field_value is not None
elif cond_type == "not_exists":
field_value = self._get_field(context, condition.get("field", ""))
return field_value is None
elif cond_type == "greater_than":
field_value = self._get_field(context, condition.get("field", ""))
compare = condition.get("value")
# Handle compare_to (field reference)
if "compare_to" in condition:
compare_ref = condition["compare_to"]
if isinstance(compare_ref, dict):
# Dynamic calculation
if compare_ref.get("type") == "multiply":
base_value = self._get_field(context, compare_ref.get("field", ""))
if base_value is not None:
compare = base_value * compare_ref.get("value", 1)
else:
compare = self._get_field(context, compare_ref)
if field_value is not None and compare is not None:
try:
return float(field_value) > float(compare)
except (ValueError, TypeError):
return False
return False
elif cond_type == "less_than":
field_value = self._get_field(context, condition.get("field", ""))
compare = condition.get("value")
if "compare_to" in condition:
compare_ref = condition["compare_to"]
if isinstance(compare_ref, dict):
if compare_ref.get("type") == "multiply":
base_value = self._get_field(context, compare_ref.get("field", ""))
if base_value is not None:
compare = base_value * compare_ref.get("value", 1)
else:
compare = self._get_field(context, compare_ref)
if field_value is not None and compare is not None:
try:
return float(field_value) < float(compare)
except (ValueError, TypeError):
return False
return False
elif cond_type == "count_greater_than":
field_value = self._get_field(context, condition.get("field", ""))
if isinstance(field_value, (list, dict)):
return len(field_value) > condition.get("value", 0)
return False
elif cond_type == "count_equals":
field_value = self._get_field(context, condition.get("field", ""))
if isinstance(field_value, (list, dict)):
return len(field_value) == condition.get("value", 0)
return False
elif cond_type == "any_of":
# Check if any item in array matches a condition
field_value = self._get_field(context, condition.get("field", ""))
if not isinstance(field_value, list):
return False
check = condition.get("check", {})
for item in field_value:
if isinstance(item, dict):
item_context = {**context, "item": item}
if self._evaluate_condition(check, item_context):
return True
return False
elif cond_type == "ratio_greater_than":
# For bounds checking
fields = condition.get("field", [])
if len(fields) == 2:
val1 = self._get_field(context, f"item.{fields[0]}")
val2 = self._get_field(context, f"item.{fields[1]}")
if val1 and val2 and val2 != 0:
try:
return float(val1) / float(val2) > condition.get("value", 1)
except (ValueError, TypeError):
return False
return False
return False
def _get_field(self, context: Dict[str, Any], field_path: str) -> Any:
"""Get a field value from context using dot notation."""
if not field_path:
return None
parts = field_path.split(".")
current = context
for part in parts:
if current is None:
return None
if isinstance(current, dict):
current = current.get(part)
else:
return None
return current
class EngineeringValidator:
"""
Main engineering validator.
Combines materials database and anti-pattern detection with
additional validation logic.
"""
def __init__(self):
"""Initialize validator with materials DB and anti-pattern detector."""
self.materials_db = MaterialsDatabase()
self.anti_patterns = AntiPatternDetector()
def validate_constraint(
self,
constraint_type: str,
value: float,
material: Optional[str] = None,
baseline: Optional[float] = None
) -> ValidationResult:
"""
Validate a constraint value against engineering limits.
Args:
constraint_type: Type of constraint (stress, displacement, frequency)
value: Constraint value
material: Optional material name for property lookups
baseline: Optional baseline value for feasibility check
Returns:
ValidationResult
"""
if constraint_type == "stress" and material:
return self.materials_db.validate_stress_limit(material, value)
# Check against baseline if available
if baseline is not None:
if constraint_type in ["stress", "displacement"]:
# Max constraint - baseline should be under limit
if baseline > value:
return ValidationResult(
valid=True,
message=f"Baseline ({baseline:.2f}) exceeds limit ({value}). Optimization starts infeasible.",
severity="warning",
suggestion="Consider relaxing the constraint or improving the baseline design"
)
elif constraint_type == "frequency":
# Min constraint - baseline should be above limit
if baseline < value:
return ValidationResult(
valid=True,
message=f"Baseline frequency ({baseline:.2f} Hz) is below limit ({value} Hz). Optimization starts infeasible.",
severity="warning",
suggestion="Consider relaxing the constraint"
)
return ValidationResult(
valid=True,
message=f"Constraint {constraint_type} = {value} accepted",
severity="ok"
)
def validate_bounds(
self,
parameter: str,
min_value: float,
max_value: float,
current_value: Optional[float] = None
) -> ValidationResult:
"""
Validate design variable bounds.
Args:
parameter: Parameter name
min_value: Lower bound
max_value: Upper bound
current_value: Current/nominal value
Returns:
ValidationResult
"""
if min_value >= max_value:
return ValidationResult(
valid=False,
message=f"Invalid bounds for {parameter}: min ({min_value}) >= max ({max_value})",
severity="error",
field=parameter
)
# Check bounds width
if min_value > 0:
ratio = max_value / min_value
if ratio > 10:
return ValidationResult(
valid=True,
message=f"Wide bounds for {parameter}: ratio {ratio:.1f}x may slow convergence",
severity="warning",
suggestion=f"Consider narrowing to {min_value:.2f} - {min_value * 5:.2f}",
field=parameter
)
# Check if current value is within bounds
if current_value is not None:
if current_value < min_value or current_value > max_value:
return ValidationResult(
valid=True,
message=f"Current value ({current_value}) for {parameter} is outside bounds [{min_value}, {max_value}]",
severity="warning",
suggestion="Adjust bounds to include current value or update nominal design",
field=parameter
)
return ValidationResult(
valid=True,
message=f"Bounds for {parameter} are valid",
severity="ok",
field=parameter
)
def suggest_bounds(
self,
parameter: str,
current_value: float,
context: Optional[Dict[str, Any]] = None
) -> Tuple[float, float]:
"""
Suggest reasonable bounds for a design variable.
Args:
parameter: Parameter name
current_value: Current value
context: Optional context (material, application, etc.)
Returns:
Tuple of (suggested_min, suggested_max)
"""
# Default: +/- 50% of current value
if current_value > 0:
suggested_min = current_value * 0.5
suggested_max = current_value * 1.5
elif current_value < 0:
suggested_min = current_value * 1.5
suggested_max = current_value * 0.5
else:
suggested_min = -1.0
suggested_max = 1.0
# Adjust based on parameter name heuristics
name_lower = parameter.lower()
if "thickness" in name_lower:
# Thickness should stay positive with reasonable manufacturing limits
suggested_min = max(0.5, current_value * 0.3) # Min 0.5mm
suggested_max = current_value * 2.0
elif "radius" in name_lower or "fillet" in name_lower:
# Radii should stay positive
suggested_min = max(0.1, current_value * 0.2)
suggested_max = current_value * 3.0
elif "angle" in name_lower:
# Angles often have natural limits
suggested_min = max(-90, current_value - 30)
suggested_max = min(90, current_value + 30)
return (round(suggested_min, 3), round(suggested_max, 3))
def detect_anti_patterns(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> List[AntiPattern]:
"""
Detect common optimization anti-patterns.
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
List of detected anti-patterns
"""
return self.anti_patterns.check_all(state, introspection or {})
def validate_all(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> List[ValidationResult]:
"""
Run all validations on interview state.
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
List of all validation results
"""
results = []
answers = state.answers
intro = introspection or {}
# Validate constraints
if "max_stress" in answers.get("constraints", {}):
material = intro.get("material", {}).get("name")
result = self.validate_constraint(
"stress",
answers["constraints"]["max_stress"],
material=material,
baseline=intro.get("baseline_stress")
)
results.append(result)
if "max_displacement" in answers.get("constraints", {}):
result = self.validate_constraint(
"displacement",
answers["constraints"]["max_displacement"],
baseline=intro.get("baseline_displacement")
)
results.append(result)
if "min_frequency" in answers.get("constraints", {}):
result = self.validate_constraint(
"frequency",
answers["constraints"]["min_frequency"],
baseline=intro.get("baseline_frequency")
)
results.append(result)
# Validate design variable bounds
for dv in answers.get("design_variables", []):
if isinstance(dv, dict):
result = self.validate_bounds(
dv.get("parameter", "unknown"),
dv.get("min_value", 0),
dv.get("max_value", 1),
dv.get("current_value")
)
results.append(result)
# Check anti-patterns
anti_patterns = self.detect_anti_patterns(state, intro)
for ap in anti_patterns:
results.append(ValidationResult(
valid=ap.severity != "error",
message=f"[{ap.name}] {ap.description}",
severity=ap.severity,
suggestion=ap.fix_suggestion
))
return results
def has_blocking_issues(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> Tuple[bool, List[str]]:
"""
Check if there are any blocking issues.
Returns:
Tuple of (has_blocking, list_of_blocking_messages)
"""
results = self.validate_all(state, introspection)
blocking = [r.message for r in results if r.is_blocking()]
return len(blocking) > 0, blocking
# Import for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .interview_state import InterviewState