782 lines
27 KiB
Python
782 lines
27 KiB
Python
|
|
"""
|
||
|
|
Engineering Validator
|
||
|
|
|
||
|
|
Validates interview answers against engineering knowledge and detects anti-patterns.
|
||
|
|
Provides:
|
||
|
|
- MaterialsDatabase: Common materials with properties
|
||
|
|
- AntiPatternDetector: Detects optimization setup mistakes
|
||
|
|
- EngineeringValidator: Main validation logic
|
||
|
|
"""
|
||
|
|
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, List, Any, Optional, Tuple
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
from difflib import SequenceMatcher
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Material:
|
||
|
|
"""Engineering material with properties."""
|
||
|
|
id: str
|
||
|
|
names: List[str]
|
||
|
|
category: str
|
||
|
|
properties: Dict[str, Any]
|
||
|
|
notes: Optional[str] = None
|
||
|
|
recommended_safety_factors: Optional[Dict[str, float]] = None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def density(self) -> Optional[float]:
|
||
|
|
return self.properties.get("density_kg_m3")
|
||
|
|
|
||
|
|
@property
|
||
|
|
def yield_stress(self) -> Optional[float]:
|
||
|
|
return self.properties.get("yield_stress_mpa")
|
||
|
|
|
||
|
|
@property
|
||
|
|
def ultimate_stress(self) -> Optional[float]:
|
||
|
|
return self.properties.get("ultimate_stress_mpa")
|
||
|
|
|
||
|
|
@property
|
||
|
|
def elastic_modulus(self) -> Optional[float]:
|
||
|
|
return self.properties.get("elastic_modulus_gpa")
|
||
|
|
|
||
|
|
def get_safe_stress(self, application: str = "static") -> Optional[float]:
|
||
|
|
"""Get safe stress limit with recommended safety factor."""
|
||
|
|
if self.yield_stress is None:
|
||
|
|
return None
|
||
|
|
|
||
|
|
sf = 1.5 # Default
|
||
|
|
if self.recommended_safety_factors:
|
||
|
|
sf = self.recommended_safety_factors.get(application, 1.5)
|
||
|
|
|
||
|
|
return self.yield_stress / sf
|
||
|
|
|
||
|
|
|
||
|
|
class MaterialsDatabase:
|
||
|
|
"""
|
||
|
|
Database of common engineering materials and properties.
|
||
|
|
|
||
|
|
Supports fuzzy name matching for user convenience.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, db_path: Optional[Path] = None):
|
||
|
|
"""
|
||
|
|
Initialize materials database.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
db_path: Path to materials JSON. Uses default if None.
|
||
|
|
"""
|
||
|
|
if db_path is None:
|
||
|
|
db_path = Path(__file__).parent / "schemas" / "materials_database.json"
|
||
|
|
|
||
|
|
self.db_path = db_path
|
||
|
|
self.materials: Dict[str, Material] = {}
|
||
|
|
self._name_index: Dict[str, str] = {} # name -> material_id
|
||
|
|
|
||
|
|
self._load_database()
|
||
|
|
|
||
|
|
def _load_database(self) -> None:
|
||
|
|
"""Load materials from JSON file."""
|
||
|
|
if not self.db_path.exists():
|
||
|
|
return
|
||
|
|
|
||
|
|
with open(self.db_path, "r", encoding="utf-8") as f:
|
||
|
|
data = json.load(f)
|
||
|
|
|
||
|
|
for mat_data in data.get("materials", []):
|
||
|
|
material = Material(
|
||
|
|
id=mat_data["id"],
|
||
|
|
names=mat_data["names"],
|
||
|
|
category=mat_data["category"],
|
||
|
|
properties=mat_data["properties"],
|
||
|
|
notes=mat_data.get("notes"),
|
||
|
|
recommended_safety_factors=mat_data.get("recommended_safety_factors"),
|
||
|
|
)
|
||
|
|
self.materials[material.id] = material
|
||
|
|
|
||
|
|
# Build name index
|
||
|
|
for name in material.names:
|
||
|
|
self._name_index[name.lower()] = material.id
|
||
|
|
|
||
|
|
def get_material(self, name: str) -> Optional[Material]:
|
||
|
|
"""
|
||
|
|
Look up material by name (supports fuzzy matching).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
name: Material name (e.g., "Al 6061-T6", "aluminum", "steel 304")
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Material if found, None otherwise
|
||
|
|
"""
|
||
|
|
name_lower = name.lower().strip()
|
||
|
|
|
||
|
|
# Exact match
|
||
|
|
if name_lower in self._name_index:
|
||
|
|
return self.materials[self._name_index[name_lower]]
|
||
|
|
|
||
|
|
# Try by ID
|
||
|
|
if name_lower in self.materials:
|
||
|
|
return self.materials[name_lower]
|
||
|
|
|
||
|
|
# Fuzzy match
|
||
|
|
best_match = None
|
||
|
|
best_ratio = 0.6 # Minimum threshold
|
||
|
|
|
||
|
|
for indexed_name, mat_id in self._name_index.items():
|
||
|
|
ratio = SequenceMatcher(None, name_lower, indexed_name).ratio()
|
||
|
|
if ratio > best_ratio:
|
||
|
|
best_ratio = ratio
|
||
|
|
best_match = mat_id
|
||
|
|
|
||
|
|
if best_match:
|
||
|
|
return self.materials[best_match]
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_yield_stress(self, material_name: str) -> Optional[float]:
|
||
|
|
"""Get yield stress for material in MPa."""
|
||
|
|
material = self.get_material(material_name)
|
||
|
|
return material.yield_stress if material else None
|
||
|
|
|
||
|
|
def validate_stress_limit(
|
||
|
|
self,
|
||
|
|
material_name: str,
|
||
|
|
limit: float,
|
||
|
|
safety_factor: float = 1.0,
|
||
|
|
application: str = "static"
|
||
|
|
) -> "ValidationResult":
|
||
|
|
"""
|
||
|
|
Check if stress limit is reasonable for material.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
material_name: Material name
|
||
|
|
limit: Proposed stress limit in MPa
|
||
|
|
safety_factor: Applied safety factor (if any)
|
||
|
|
application: Application type (static, fatigue, aerospace)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
ValidationResult with status and message
|
||
|
|
"""
|
||
|
|
material = self.get_material(material_name)
|
||
|
|
|
||
|
|
if material is None:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Material '{material_name}' not found in database. Unable to validate stress limit.",
|
||
|
|
severity="info"
|
||
|
|
)
|
||
|
|
|
||
|
|
if material.yield_stress is None:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Material '{material.id}' does not have yield stress defined (e.g., brittle material).",
|
||
|
|
severity="info"
|
||
|
|
)
|
||
|
|
|
||
|
|
yield_stress = material.yield_stress
|
||
|
|
effective_limit = limit * safety_factor if safety_factor > 1 else limit
|
||
|
|
|
||
|
|
# Check various thresholds
|
||
|
|
if effective_limit > material.ultimate_stress if material.ultimate_stress else yield_stress:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=False,
|
||
|
|
message=f"Stress limit ({limit} MPa) exceeds ultimate stress ({material.ultimate_stress or yield_stress} MPa) for {material.id}",
|
||
|
|
severity="error",
|
||
|
|
suggestion=f"Reduce stress limit to below {(material.ultimate_stress or yield_stress) / 1.5:.0f} MPa"
|
||
|
|
)
|
||
|
|
|
||
|
|
if effective_limit > yield_stress:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True, # Warning, not error
|
||
|
|
message=f"Stress limit ({limit} MPa) exceeds yield stress ({yield_stress} MPa) for {material.id}. This allows plastic deformation.",
|
||
|
|
severity="warning",
|
||
|
|
suggestion=f"Consider reducing to {yield_stress / 1.5:.0f} MPa (SF=1.5)"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Get recommended safe stress
|
||
|
|
safe_stress = material.get_safe_stress(application)
|
||
|
|
if safe_stress and limit > safe_stress:
|
||
|
|
rec_sf = material.recommended_safety_factors.get(application, 1.5) if material.recommended_safety_factors else 1.5
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Stress limit ({limit} MPa) is {limit/yield_stress*100:.0f}% of yield. Recommended safety factor for {application}: {rec_sf}",
|
||
|
|
severity="info",
|
||
|
|
suggestion=f"Typical {application} limit: {safe_stress:.0f} MPa"
|
||
|
|
)
|
||
|
|
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Stress limit ({limit} MPa) is acceptable for {material.id} (yield: {yield_stress} MPa)",
|
||
|
|
severity="ok"
|
||
|
|
)
|
||
|
|
|
||
|
|
def list_materials(self, category: Optional[str] = None) -> List[Material]:
|
||
|
|
"""List all materials, optionally filtered by category."""
|
||
|
|
materials = list(self.materials.values())
|
||
|
|
if category:
|
||
|
|
materials = [m for m in materials if m.category == category]
|
||
|
|
return materials
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class ValidationResult:
|
||
|
|
"""Result of a validation check."""
|
||
|
|
valid: bool
|
||
|
|
message: str
|
||
|
|
severity: str = "ok" # ok, info, warning, error
|
||
|
|
suggestion: Optional[str] = None
|
||
|
|
field: Optional[str] = None
|
||
|
|
|
||
|
|
def is_blocking(self) -> bool:
|
||
|
|
"""Check if this result blocks proceeding."""
|
||
|
|
return self.severity == "error"
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class AntiPattern:
|
||
|
|
"""Detected anti-pattern."""
|
||
|
|
id: str
|
||
|
|
name: str
|
||
|
|
description: str
|
||
|
|
severity: str # error, warning, info
|
||
|
|
fix_suggestion: Optional[str] = None
|
||
|
|
auto_fix: Optional[Dict[str, Any]] = None
|
||
|
|
acknowledged: bool = False
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"name": self.name,
|
||
|
|
"description": self.description,
|
||
|
|
"severity": self.severity,
|
||
|
|
"fix_suggestion": self.fix_suggestion,
|
||
|
|
"auto_fix": self.auto_fix,
|
||
|
|
"acknowledged": self.acknowledged,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class AntiPatternDetector:
|
||
|
|
"""
|
||
|
|
Detects common optimization setup mistakes.
|
||
|
|
|
||
|
|
Loads patterns from JSON and evaluates against interview state.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, patterns_path: Optional[Path] = None):
|
||
|
|
"""
|
||
|
|
Initialize anti-pattern detector.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
patterns_path: Path to patterns JSON. Uses default if None.
|
||
|
|
"""
|
||
|
|
if patterns_path is None:
|
||
|
|
patterns_path = Path(__file__).parent / "schemas" / "anti_patterns.json"
|
||
|
|
|
||
|
|
self.patterns_path = patterns_path
|
||
|
|
self.patterns: List[Dict[str, Any]] = []
|
||
|
|
|
||
|
|
self._load_patterns()
|
||
|
|
|
||
|
|
def _load_patterns(self) -> None:
|
||
|
|
"""Load patterns from JSON file."""
|
||
|
|
if not self.patterns_path.exists():
|
||
|
|
return
|
||
|
|
|
||
|
|
with open(self.patterns_path, "r", encoding="utf-8") as f:
|
||
|
|
data = json.load(f)
|
||
|
|
|
||
|
|
self.patterns = data.get("patterns", [])
|
||
|
|
|
||
|
|
def check_all(self, state: "InterviewState", introspection: Dict[str, Any] = None) -> List[AntiPattern]:
|
||
|
|
"""
|
||
|
|
Run all anti-pattern checks.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
state: Current interview state
|
||
|
|
introspection: Optional introspection results
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of detected anti-patterns
|
||
|
|
"""
|
||
|
|
detected = []
|
||
|
|
context = self._build_context(state, introspection or {})
|
||
|
|
|
||
|
|
for pattern in self.patterns:
|
||
|
|
if self._evaluate_condition(pattern.get("condition", {}), context):
|
||
|
|
detected.append(AntiPattern(
|
||
|
|
id=pattern["id"],
|
||
|
|
name=pattern["name"],
|
||
|
|
description=pattern["description"],
|
||
|
|
severity=pattern["severity"],
|
||
|
|
fix_suggestion=pattern.get("fix_suggestion"),
|
||
|
|
auto_fix=pattern.get("auto_fix"),
|
||
|
|
))
|
||
|
|
|
||
|
|
return detected
|
||
|
|
|
||
|
|
def _build_context(self, state: "InterviewState", introspection: Dict[str, Any]) -> Dict[str, Any]:
|
||
|
|
"""Build evaluation context from state and introspection."""
|
||
|
|
answers = state.answers
|
||
|
|
|
||
|
|
# Extract objectives as list of goal values
|
||
|
|
objectives_list = []
|
||
|
|
for obj in answers.get("objectives", []):
|
||
|
|
if isinstance(obj, dict):
|
||
|
|
objectives_list.append(obj.get("goal", ""))
|
||
|
|
else:
|
||
|
|
objectives_list.append(str(obj))
|
||
|
|
|
||
|
|
# Add secondary objectives if present
|
||
|
|
for obj in answers.get("objectives_secondary", []):
|
||
|
|
if obj != "none":
|
||
|
|
objectives_list.append(obj)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"objectives": objectives_list,
|
||
|
|
"constraints": answers.get("constraints", {}),
|
||
|
|
"design_variables": answers.get("design_variables", []),
|
||
|
|
"design_variable_count": len(answers.get("design_variables", [])),
|
||
|
|
"analysis_types": answers.get("analysis_types", []),
|
||
|
|
"solve_all_solutions": answers.get("solve_all_solutions", True),
|
||
|
|
"n_trials": answers.get("n_trials", 100),
|
||
|
|
"introspection": introspection,
|
||
|
|
"material": introspection.get("material"),
|
||
|
|
"baseline_violations": state.get_answer("baseline_violations"),
|
||
|
|
}
|
||
|
|
|
||
|
|
def _evaluate_condition(self, condition: Dict[str, Any], context: Dict[str, Any]) -> bool:
|
||
|
|
"""Evaluate a pattern condition against context."""
|
||
|
|
if not condition:
|
||
|
|
return False
|
||
|
|
|
||
|
|
cond_type = condition.get("type", "")
|
||
|
|
|
||
|
|
if cond_type == "and":
|
||
|
|
return all(
|
||
|
|
self._evaluate_condition(c, context)
|
||
|
|
for c in condition.get("conditions", [])
|
||
|
|
)
|
||
|
|
|
||
|
|
elif cond_type == "or":
|
||
|
|
return any(
|
||
|
|
self._evaluate_condition(c, context)
|
||
|
|
for c in condition.get("conditions", [])
|
||
|
|
)
|
||
|
|
|
||
|
|
elif cond_type == "not":
|
||
|
|
inner = condition.get("condition", {})
|
||
|
|
return not self._evaluate_condition(inner, context)
|
||
|
|
|
||
|
|
elif cond_type == "contains":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
target = condition.get("value")
|
||
|
|
if isinstance(field_value, list):
|
||
|
|
return target in field_value
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "not_contains":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
target = condition.get("value")
|
||
|
|
if isinstance(field_value, list):
|
||
|
|
return target not in field_value
|
||
|
|
return True
|
||
|
|
|
||
|
|
elif cond_type == "equals":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
return field_value == condition.get("value")
|
||
|
|
|
||
|
|
elif cond_type == "empty":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
if field_value is None:
|
||
|
|
return True
|
||
|
|
if isinstance(field_value, (list, dict, str)):
|
||
|
|
return len(field_value) == 0
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "exists":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
return field_value is not None
|
||
|
|
|
||
|
|
elif cond_type == "not_exists":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
return field_value is None
|
||
|
|
|
||
|
|
elif cond_type == "greater_than":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
compare = condition.get("value")
|
||
|
|
|
||
|
|
# Handle compare_to (field reference)
|
||
|
|
if "compare_to" in condition:
|
||
|
|
compare_ref = condition["compare_to"]
|
||
|
|
if isinstance(compare_ref, dict):
|
||
|
|
# Dynamic calculation
|
||
|
|
if compare_ref.get("type") == "multiply":
|
||
|
|
base_value = self._get_field(context, compare_ref.get("field", ""))
|
||
|
|
if base_value is not None:
|
||
|
|
compare = base_value * compare_ref.get("value", 1)
|
||
|
|
else:
|
||
|
|
compare = self._get_field(context, compare_ref)
|
||
|
|
|
||
|
|
if field_value is not None and compare is not None:
|
||
|
|
try:
|
||
|
|
return float(field_value) > float(compare)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return False
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "less_than":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
compare = condition.get("value")
|
||
|
|
|
||
|
|
if "compare_to" in condition:
|
||
|
|
compare_ref = condition["compare_to"]
|
||
|
|
if isinstance(compare_ref, dict):
|
||
|
|
if compare_ref.get("type") == "multiply":
|
||
|
|
base_value = self._get_field(context, compare_ref.get("field", ""))
|
||
|
|
if base_value is not None:
|
||
|
|
compare = base_value * compare_ref.get("value", 1)
|
||
|
|
else:
|
||
|
|
compare = self._get_field(context, compare_ref)
|
||
|
|
|
||
|
|
if field_value is not None and compare is not None:
|
||
|
|
try:
|
||
|
|
return float(field_value) < float(compare)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return False
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "count_greater_than":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
if isinstance(field_value, (list, dict)):
|
||
|
|
return len(field_value) > condition.get("value", 0)
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "count_equals":
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
if isinstance(field_value, (list, dict)):
|
||
|
|
return len(field_value) == condition.get("value", 0)
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "any_of":
|
||
|
|
# Check if any item in array matches a condition
|
||
|
|
field_value = self._get_field(context, condition.get("field", ""))
|
||
|
|
if not isinstance(field_value, list):
|
||
|
|
return False
|
||
|
|
check = condition.get("check", {})
|
||
|
|
for item in field_value:
|
||
|
|
if isinstance(item, dict):
|
||
|
|
item_context = {**context, "item": item}
|
||
|
|
if self._evaluate_condition(check, item_context):
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
|
||
|
|
elif cond_type == "ratio_greater_than":
|
||
|
|
# For bounds checking
|
||
|
|
fields = condition.get("field", [])
|
||
|
|
if len(fields) == 2:
|
||
|
|
val1 = self._get_field(context, f"item.{fields[0]}")
|
||
|
|
val2 = self._get_field(context, f"item.{fields[1]}")
|
||
|
|
if val1 and val2 and val2 != 0:
|
||
|
|
try:
|
||
|
|
return float(val1) / float(val2) > condition.get("value", 1)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return False
|
||
|
|
return False
|
||
|
|
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _get_field(self, context: Dict[str, Any], field_path: str) -> Any:
|
||
|
|
"""Get a field value from context using dot notation."""
|
||
|
|
if not field_path:
|
||
|
|
return None
|
||
|
|
|
||
|
|
parts = field_path.split(".")
|
||
|
|
current = context
|
||
|
|
|
||
|
|
for part in parts:
|
||
|
|
if current is None:
|
||
|
|
return None
|
||
|
|
if isinstance(current, dict):
|
||
|
|
current = current.get(part)
|
||
|
|
else:
|
||
|
|
return None
|
||
|
|
|
||
|
|
return current
|
||
|
|
|
||
|
|
|
||
|
|
class EngineeringValidator:
|
||
|
|
"""
|
||
|
|
Main engineering validator.
|
||
|
|
|
||
|
|
Combines materials database and anti-pattern detection with
|
||
|
|
additional validation logic.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
"""Initialize validator with materials DB and anti-pattern detector."""
|
||
|
|
self.materials_db = MaterialsDatabase()
|
||
|
|
self.anti_patterns = AntiPatternDetector()
|
||
|
|
|
||
|
|
def validate_constraint(
|
||
|
|
self,
|
||
|
|
constraint_type: str,
|
||
|
|
value: float,
|
||
|
|
material: Optional[str] = None,
|
||
|
|
baseline: Optional[float] = None
|
||
|
|
) -> ValidationResult:
|
||
|
|
"""
|
||
|
|
Validate a constraint value against engineering limits.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
constraint_type: Type of constraint (stress, displacement, frequency)
|
||
|
|
value: Constraint value
|
||
|
|
material: Optional material name for property lookups
|
||
|
|
baseline: Optional baseline value for feasibility check
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
ValidationResult
|
||
|
|
"""
|
||
|
|
if constraint_type == "stress" and material:
|
||
|
|
return self.materials_db.validate_stress_limit(material, value)
|
||
|
|
|
||
|
|
# Check against baseline if available
|
||
|
|
if baseline is not None:
|
||
|
|
if constraint_type in ["stress", "displacement"]:
|
||
|
|
# Max constraint - baseline should be under limit
|
||
|
|
if baseline > value:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Baseline ({baseline:.2f}) exceeds limit ({value}). Optimization starts infeasible.",
|
||
|
|
severity="warning",
|
||
|
|
suggestion="Consider relaxing the constraint or improving the baseline design"
|
||
|
|
)
|
||
|
|
elif constraint_type == "frequency":
|
||
|
|
# Min constraint - baseline should be above limit
|
||
|
|
if baseline < value:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Baseline frequency ({baseline:.2f} Hz) is below limit ({value} Hz). Optimization starts infeasible.",
|
||
|
|
severity="warning",
|
||
|
|
suggestion="Consider relaxing the constraint"
|
||
|
|
)
|
||
|
|
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Constraint {constraint_type} = {value} accepted",
|
||
|
|
severity="ok"
|
||
|
|
)
|
||
|
|
|
||
|
|
def validate_bounds(
|
||
|
|
self,
|
||
|
|
parameter: str,
|
||
|
|
min_value: float,
|
||
|
|
max_value: float,
|
||
|
|
current_value: Optional[float] = None
|
||
|
|
) -> ValidationResult:
|
||
|
|
"""
|
||
|
|
Validate design variable bounds.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
parameter: Parameter name
|
||
|
|
min_value: Lower bound
|
||
|
|
max_value: Upper bound
|
||
|
|
current_value: Current/nominal value
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
ValidationResult
|
||
|
|
"""
|
||
|
|
if min_value >= max_value:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=False,
|
||
|
|
message=f"Invalid bounds for {parameter}: min ({min_value}) >= max ({max_value})",
|
||
|
|
severity="error",
|
||
|
|
field=parameter
|
||
|
|
)
|
||
|
|
|
||
|
|
# Check bounds width
|
||
|
|
if min_value > 0:
|
||
|
|
ratio = max_value / min_value
|
||
|
|
if ratio > 10:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Wide bounds for {parameter}: ratio {ratio:.1f}x may slow convergence",
|
||
|
|
severity="warning",
|
||
|
|
suggestion=f"Consider narrowing to {min_value:.2f} - {min_value * 5:.2f}",
|
||
|
|
field=parameter
|
||
|
|
)
|
||
|
|
|
||
|
|
# Check if current value is within bounds
|
||
|
|
if current_value is not None:
|
||
|
|
if current_value < min_value or current_value > max_value:
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Current value ({current_value}) for {parameter} is outside bounds [{min_value}, {max_value}]",
|
||
|
|
severity="warning",
|
||
|
|
suggestion="Adjust bounds to include current value or update nominal design",
|
||
|
|
field=parameter
|
||
|
|
)
|
||
|
|
|
||
|
|
return ValidationResult(
|
||
|
|
valid=True,
|
||
|
|
message=f"Bounds for {parameter} are valid",
|
||
|
|
severity="ok",
|
||
|
|
field=parameter
|
||
|
|
)
|
||
|
|
|
||
|
|
def suggest_bounds(
|
||
|
|
self,
|
||
|
|
parameter: str,
|
||
|
|
current_value: float,
|
||
|
|
context: Optional[Dict[str, Any]] = None
|
||
|
|
) -> Tuple[float, float]:
|
||
|
|
"""
|
||
|
|
Suggest reasonable bounds for a design variable.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
parameter: Parameter name
|
||
|
|
current_value: Current value
|
||
|
|
context: Optional context (material, application, etc.)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (suggested_min, suggested_max)
|
||
|
|
"""
|
||
|
|
# Default: +/- 50% of current value
|
||
|
|
if current_value > 0:
|
||
|
|
suggested_min = current_value * 0.5
|
||
|
|
suggested_max = current_value * 1.5
|
||
|
|
elif current_value < 0:
|
||
|
|
suggested_min = current_value * 1.5
|
||
|
|
suggested_max = current_value * 0.5
|
||
|
|
else:
|
||
|
|
suggested_min = -1.0
|
||
|
|
suggested_max = 1.0
|
||
|
|
|
||
|
|
# Adjust based on parameter name heuristics
|
||
|
|
name_lower = parameter.lower()
|
||
|
|
|
||
|
|
if "thickness" in name_lower:
|
||
|
|
# Thickness should stay positive with reasonable manufacturing limits
|
||
|
|
suggested_min = max(0.5, current_value * 0.3) # Min 0.5mm
|
||
|
|
suggested_max = current_value * 2.0
|
||
|
|
|
||
|
|
elif "radius" in name_lower or "fillet" in name_lower:
|
||
|
|
# Radii should stay positive
|
||
|
|
suggested_min = max(0.1, current_value * 0.2)
|
||
|
|
suggested_max = current_value * 3.0
|
||
|
|
|
||
|
|
elif "angle" in name_lower:
|
||
|
|
# Angles often have natural limits
|
||
|
|
suggested_min = max(-90, current_value - 30)
|
||
|
|
suggested_max = min(90, current_value + 30)
|
||
|
|
|
||
|
|
return (round(suggested_min, 3), round(suggested_max, 3))
|
||
|
|
|
||
|
|
def detect_anti_patterns(
|
||
|
|
self,
|
||
|
|
state: "InterviewState",
|
||
|
|
introspection: Optional[Dict[str, Any]] = None
|
||
|
|
) -> List[AntiPattern]:
|
||
|
|
"""
|
||
|
|
Detect common optimization anti-patterns.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
state: Current interview state
|
||
|
|
introspection: Optional introspection results
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of detected anti-patterns
|
||
|
|
"""
|
||
|
|
return self.anti_patterns.check_all(state, introspection or {})
|
||
|
|
|
||
|
|
def validate_all(
|
||
|
|
self,
|
||
|
|
state: "InterviewState",
|
||
|
|
introspection: Optional[Dict[str, Any]] = None
|
||
|
|
) -> List[ValidationResult]:
|
||
|
|
"""
|
||
|
|
Run all validations on interview state.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
state: Current interview state
|
||
|
|
introspection: Optional introspection results
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of all validation results
|
||
|
|
"""
|
||
|
|
results = []
|
||
|
|
answers = state.answers
|
||
|
|
intro = introspection or {}
|
||
|
|
|
||
|
|
# Validate constraints
|
||
|
|
if "max_stress" in answers.get("constraints", {}):
|
||
|
|
material = intro.get("material", {}).get("name")
|
||
|
|
result = self.validate_constraint(
|
||
|
|
"stress",
|
||
|
|
answers["constraints"]["max_stress"],
|
||
|
|
material=material,
|
||
|
|
baseline=intro.get("baseline_stress")
|
||
|
|
)
|
||
|
|
results.append(result)
|
||
|
|
|
||
|
|
if "max_displacement" in answers.get("constraints", {}):
|
||
|
|
result = self.validate_constraint(
|
||
|
|
"displacement",
|
||
|
|
answers["constraints"]["max_displacement"],
|
||
|
|
baseline=intro.get("baseline_displacement")
|
||
|
|
)
|
||
|
|
results.append(result)
|
||
|
|
|
||
|
|
if "min_frequency" in answers.get("constraints", {}):
|
||
|
|
result = self.validate_constraint(
|
||
|
|
"frequency",
|
||
|
|
answers["constraints"]["min_frequency"],
|
||
|
|
baseline=intro.get("baseline_frequency")
|
||
|
|
)
|
||
|
|
results.append(result)
|
||
|
|
|
||
|
|
# Validate design variable bounds
|
||
|
|
for dv in answers.get("design_variables", []):
|
||
|
|
if isinstance(dv, dict):
|
||
|
|
result = self.validate_bounds(
|
||
|
|
dv.get("parameter", "unknown"),
|
||
|
|
dv.get("min_value", 0),
|
||
|
|
dv.get("max_value", 1),
|
||
|
|
dv.get("current_value")
|
||
|
|
)
|
||
|
|
results.append(result)
|
||
|
|
|
||
|
|
# Check anti-patterns
|
||
|
|
anti_patterns = self.detect_anti_patterns(state, intro)
|
||
|
|
for ap in anti_patterns:
|
||
|
|
results.append(ValidationResult(
|
||
|
|
valid=ap.severity != "error",
|
||
|
|
message=f"[{ap.name}] {ap.description}",
|
||
|
|
severity=ap.severity,
|
||
|
|
suggestion=ap.fix_suggestion
|
||
|
|
))
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
def has_blocking_issues(
|
||
|
|
self,
|
||
|
|
state: "InterviewState",
|
||
|
|
introspection: Optional[Dict[str, Any]] = None
|
||
|
|
) -> Tuple[bool, List[str]]:
|
||
|
|
"""
|
||
|
|
Check if there are any blocking issues.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (has_blocking, list_of_blocking_messages)
|
||
|
|
"""
|
||
|
|
results = self.validate_all(state, introspection)
|
||
|
|
blocking = [r.message for r in results if r.is_blocking()]
|
||
|
|
return len(blocking) > 0, blocking
|
||
|
|
|
||
|
|
|
||
|
|
# Import for type hints
|
||
|
|
from typing import TYPE_CHECKING
|
||
|
|
if TYPE_CHECKING:
|
||
|
|
from .interview_state import InterviewState
|