feat: Implement Study Interview Mode as default study creation method

Study Interview Mode is now the DEFAULT for all study creation requests.
This intelligent Q&A system guides users through optimization setup with:

- 7-phase interview flow: introspection → objectives → constraints → design_variables → validation → review → complete
- Material-aware validation with 12 materials and fuzzy name matching
- Anti-pattern detection for 12 common mistakes (mass-no-constraint, stress-over-yield, etc.)
- Auto extractor mapping E1-E24 based on goal keywords
- State persistence with JSON serialization and backup rotation
- StudyBlueprint generation with full validation

Triggers: "create a study", "new study", "optimize this", any study creation intent
Skip with: "skip interview", "quick setup", "manual config"

Components:
- StudyInterviewEngine: Main orchestrator
- QuestionEngine: Conditional logic evaluation
- EngineeringValidator: MaterialsDatabase + AntiPatternDetector
- InterviewPresenter: Markdown formatting for Claude
- StudyBlueprint: Validated configuration output
- InterviewState: Persistent state management

All 129 tests passing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-03 11:06:07 -05:00
parent b1ffc64407
commit 32caa5d05c
27 changed files with 9737 additions and 11 deletions

View File

@@ -0,0 +1,102 @@
"""
Atomizer Study Interview Mode
This module provides an intelligent interview system for gathering engineering requirements
before study generation. It systematically questions users about objectives, constraints,
and design variables to create accurate optimization configurations.
Components:
- StudyInterviewEngine: Main orchestrator
- QuestionEngine: Question flow and conditional logic
- InterviewStateManager: State persistence
- InterviewPresenter: Presentation abstraction (ClaudePresenter)
- EngineeringValidator: Engineering validation and anti-pattern detection
- InterviewIntelligence: Smart features (extractor mapping, complexity)
"""
from .interview_state import (
InterviewState,
InterviewPhase,
AnsweredQuestion,
InterviewStateManager,
LogEntry,
)
from .question_engine import (
QuestionEngine,
Question,
QuestionOption,
QuestionCondition,
ValidationRule,
)
from .interview_presenter import (
InterviewPresenter,
ClaudePresenter,
)
from .study_interview import (
StudyInterviewEngine,
InterviewSession,
NextAction,
)
from .engineering_validator import (
EngineeringValidator,
MaterialsDatabase,
AntiPatternDetector,
ValidationResult,
AntiPattern,
)
from .interview_intelligence import (
InterviewIntelligence,
ExtractorMapper,
ExtractorSelection,
)
from .study_blueprint import (
StudyBlueprint,
DesignVariable,
Objective,
Constraint,
)
__all__ = [
# State management
"InterviewState",
"InterviewPhase",
"AnsweredQuestion",
"InterviewStateManager",
"LogEntry",
# Question engine
"QuestionEngine",
"Question",
"QuestionOption",
"QuestionCondition",
"ValidationRule",
# Presentation
"InterviewPresenter",
"ClaudePresenter",
# Main engine
"StudyInterviewEngine",
"InterviewSession",
"NextAction",
# Validation
"EngineeringValidator",
"MaterialsDatabase",
"AntiPatternDetector",
"ValidationResult",
"AntiPattern",
# Intelligence
"InterviewIntelligence",
"ExtractorMapper",
"ExtractorSelection",
# Blueprint
"StudyBlueprint",
"DesignVariable",
"Objective",
"Constraint",
]
__version__ = "1.0.0"

View File

@@ -0,0 +1,781 @@
"""
Engineering Validator
Validates interview answers against engineering knowledge and detects anti-patterns.
Provides:
- MaterialsDatabase: Common materials with properties
- AntiPatternDetector: Detects optimization setup mistakes
- EngineeringValidator: Main validation logic
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import json
import re
from difflib import SequenceMatcher
@dataclass
class Material:
"""Engineering material with properties."""
id: str
names: List[str]
category: str
properties: Dict[str, Any]
notes: Optional[str] = None
recommended_safety_factors: Optional[Dict[str, float]] = None
@property
def density(self) -> Optional[float]:
return self.properties.get("density_kg_m3")
@property
def yield_stress(self) -> Optional[float]:
return self.properties.get("yield_stress_mpa")
@property
def ultimate_stress(self) -> Optional[float]:
return self.properties.get("ultimate_stress_mpa")
@property
def elastic_modulus(self) -> Optional[float]:
return self.properties.get("elastic_modulus_gpa")
def get_safe_stress(self, application: str = "static") -> Optional[float]:
"""Get safe stress limit with recommended safety factor."""
if self.yield_stress is None:
return None
sf = 1.5 # Default
if self.recommended_safety_factors:
sf = self.recommended_safety_factors.get(application, 1.5)
return self.yield_stress / sf
class MaterialsDatabase:
"""
Database of common engineering materials and properties.
Supports fuzzy name matching for user convenience.
"""
def __init__(self, db_path: Optional[Path] = None):
"""
Initialize materials database.
Args:
db_path: Path to materials JSON. Uses default if None.
"""
if db_path is None:
db_path = Path(__file__).parent / "schemas" / "materials_database.json"
self.db_path = db_path
self.materials: Dict[str, Material] = {}
self._name_index: Dict[str, str] = {} # name -> material_id
self._load_database()
def _load_database(self) -> None:
"""Load materials from JSON file."""
if not self.db_path.exists():
return
with open(self.db_path, "r", encoding="utf-8") as f:
data = json.load(f)
for mat_data in data.get("materials", []):
material = Material(
id=mat_data["id"],
names=mat_data["names"],
category=mat_data["category"],
properties=mat_data["properties"],
notes=mat_data.get("notes"),
recommended_safety_factors=mat_data.get("recommended_safety_factors"),
)
self.materials[material.id] = material
# Build name index
for name in material.names:
self._name_index[name.lower()] = material.id
def get_material(self, name: str) -> Optional[Material]:
"""
Look up material by name (supports fuzzy matching).
Args:
name: Material name (e.g., "Al 6061-T6", "aluminum", "steel 304")
Returns:
Material if found, None otherwise
"""
name_lower = name.lower().strip()
# Exact match
if name_lower in self._name_index:
return self.materials[self._name_index[name_lower]]
# Try by ID
if name_lower in self.materials:
return self.materials[name_lower]
# Fuzzy match
best_match = None
best_ratio = 0.6 # Minimum threshold
for indexed_name, mat_id in self._name_index.items():
ratio = SequenceMatcher(None, name_lower, indexed_name).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = mat_id
if best_match:
return self.materials[best_match]
return None
def get_yield_stress(self, material_name: str) -> Optional[float]:
"""Get yield stress for material in MPa."""
material = self.get_material(material_name)
return material.yield_stress if material else None
def validate_stress_limit(
self,
material_name: str,
limit: float,
safety_factor: float = 1.0,
application: str = "static"
) -> "ValidationResult":
"""
Check if stress limit is reasonable for material.
Args:
material_name: Material name
limit: Proposed stress limit in MPa
safety_factor: Applied safety factor (if any)
application: Application type (static, fatigue, aerospace)
Returns:
ValidationResult with status and message
"""
material = self.get_material(material_name)
if material is None:
return ValidationResult(
valid=True,
message=f"Material '{material_name}' not found in database. Unable to validate stress limit.",
severity="info"
)
if material.yield_stress is None:
return ValidationResult(
valid=True,
message=f"Material '{material.id}' does not have yield stress defined (e.g., brittle material).",
severity="info"
)
yield_stress = material.yield_stress
effective_limit = limit * safety_factor if safety_factor > 1 else limit
# Check various thresholds
if effective_limit > material.ultimate_stress if material.ultimate_stress else yield_stress:
return ValidationResult(
valid=False,
message=f"Stress limit ({limit} MPa) exceeds ultimate stress ({material.ultimate_stress or yield_stress} MPa) for {material.id}",
severity="error",
suggestion=f"Reduce stress limit to below {(material.ultimate_stress or yield_stress) / 1.5:.0f} MPa"
)
if effective_limit > yield_stress:
return ValidationResult(
valid=True, # Warning, not error
message=f"Stress limit ({limit} MPa) exceeds yield stress ({yield_stress} MPa) for {material.id}. This allows plastic deformation.",
severity="warning",
suggestion=f"Consider reducing to {yield_stress / 1.5:.0f} MPa (SF=1.5)"
)
# Get recommended safe stress
safe_stress = material.get_safe_stress(application)
if safe_stress and limit > safe_stress:
rec_sf = material.recommended_safety_factors.get(application, 1.5) if material.recommended_safety_factors else 1.5
return ValidationResult(
valid=True,
message=f"Stress limit ({limit} MPa) is {limit/yield_stress*100:.0f}% of yield. Recommended safety factor for {application}: {rec_sf}",
severity="info",
suggestion=f"Typical {application} limit: {safe_stress:.0f} MPa"
)
return ValidationResult(
valid=True,
message=f"Stress limit ({limit} MPa) is acceptable for {material.id} (yield: {yield_stress} MPa)",
severity="ok"
)
def list_materials(self, category: Optional[str] = None) -> List[Material]:
"""List all materials, optionally filtered by category."""
materials = list(self.materials.values())
if category:
materials = [m for m in materials if m.category == category]
return materials
@dataclass
class ValidationResult:
"""Result of a validation check."""
valid: bool
message: str
severity: str = "ok" # ok, info, warning, error
suggestion: Optional[str] = None
field: Optional[str] = None
def is_blocking(self) -> bool:
"""Check if this result blocks proceeding."""
return self.severity == "error"
@dataclass
class AntiPattern:
"""Detected anti-pattern."""
id: str
name: str
description: str
severity: str # error, warning, info
fix_suggestion: Optional[str] = None
auto_fix: Optional[Dict[str, Any]] = None
acknowledged: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"severity": self.severity,
"fix_suggestion": self.fix_suggestion,
"auto_fix": self.auto_fix,
"acknowledged": self.acknowledged,
}
class AntiPatternDetector:
"""
Detects common optimization setup mistakes.
Loads patterns from JSON and evaluates against interview state.
"""
def __init__(self, patterns_path: Optional[Path] = None):
"""
Initialize anti-pattern detector.
Args:
patterns_path: Path to patterns JSON. Uses default if None.
"""
if patterns_path is None:
patterns_path = Path(__file__).parent / "schemas" / "anti_patterns.json"
self.patterns_path = patterns_path
self.patterns: List[Dict[str, Any]] = []
self._load_patterns()
def _load_patterns(self) -> None:
"""Load patterns from JSON file."""
if not self.patterns_path.exists():
return
with open(self.patterns_path, "r", encoding="utf-8") as f:
data = json.load(f)
self.patterns = data.get("patterns", [])
def check_all(self, state: "InterviewState", introspection: Dict[str, Any] = None) -> List[AntiPattern]:
"""
Run all anti-pattern checks.
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
List of detected anti-patterns
"""
detected = []
context = self._build_context(state, introspection or {})
for pattern in self.patterns:
if self._evaluate_condition(pattern.get("condition", {}), context):
detected.append(AntiPattern(
id=pattern["id"],
name=pattern["name"],
description=pattern["description"],
severity=pattern["severity"],
fix_suggestion=pattern.get("fix_suggestion"),
auto_fix=pattern.get("auto_fix"),
))
return detected
def _build_context(self, state: "InterviewState", introspection: Dict[str, Any]) -> Dict[str, Any]:
"""Build evaluation context from state and introspection."""
answers = state.answers
# Extract objectives as list of goal values
objectives_list = []
for obj in answers.get("objectives", []):
if isinstance(obj, dict):
objectives_list.append(obj.get("goal", ""))
else:
objectives_list.append(str(obj))
# Add secondary objectives if present
for obj in answers.get("objectives_secondary", []):
if obj != "none":
objectives_list.append(obj)
return {
"objectives": objectives_list,
"constraints": answers.get("constraints", {}),
"design_variables": answers.get("design_variables", []),
"design_variable_count": len(answers.get("design_variables", [])),
"analysis_types": answers.get("analysis_types", []),
"solve_all_solutions": answers.get("solve_all_solutions", True),
"n_trials": answers.get("n_trials", 100),
"introspection": introspection,
"material": introspection.get("material"),
"baseline_violations": state.get_answer("baseline_violations"),
}
def _evaluate_condition(self, condition: Dict[str, Any], context: Dict[str, Any]) -> bool:
"""Evaluate a pattern condition against context."""
if not condition:
return False
cond_type = condition.get("type", "")
if cond_type == "and":
return all(
self._evaluate_condition(c, context)
for c in condition.get("conditions", [])
)
elif cond_type == "or":
return any(
self._evaluate_condition(c, context)
for c in condition.get("conditions", [])
)
elif cond_type == "not":
inner = condition.get("condition", {})
return not self._evaluate_condition(inner, context)
elif cond_type == "contains":
field_value = self._get_field(context, condition.get("field", ""))
target = condition.get("value")
if isinstance(field_value, list):
return target in field_value
return False
elif cond_type == "not_contains":
field_value = self._get_field(context, condition.get("field", ""))
target = condition.get("value")
if isinstance(field_value, list):
return target not in field_value
return True
elif cond_type == "equals":
field_value = self._get_field(context, condition.get("field", ""))
return field_value == condition.get("value")
elif cond_type == "empty":
field_value = self._get_field(context, condition.get("field", ""))
if field_value is None:
return True
if isinstance(field_value, (list, dict, str)):
return len(field_value) == 0
return False
elif cond_type == "exists":
field_value = self._get_field(context, condition.get("field", ""))
return field_value is not None
elif cond_type == "not_exists":
field_value = self._get_field(context, condition.get("field", ""))
return field_value is None
elif cond_type == "greater_than":
field_value = self._get_field(context, condition.get("field", ""))
compare = condition.get("value")
# Handle compare_to (field reference)
if "compare_to" in condition:
compare_ref = condition["compare_to"]
if isinstance(compare_ref, dict):
# Dynamic calculation
if compare_ref.get("type") == "multiply":
base_value = self._get_field(context, compare_ref.get("field", ""))
if base_value is not None:
compare = base_value * compare_ref.get("value", 1)
else:
compare = self._get_field(context, compare_ref)
if field_value is not None and compare is not None:
try:
return float(field_value) > float(compare)
except (ValueError, TypeError):
return False
return False
elif cond_type == "less_than":
field_value = self._get_field(context, condition.get("field", ""))
compare = condition.get("value")
if "compare_to" in condition:
compare_ref = condition["compare_to"]
if isinstance(compare_ref, dict):
if compare_ref.get("type") == "multiply":
base_value = self._get_field(context, compare_ref.get("field", ""))
if base_value is not None:
compare = base_value * compare_ref.get("value", 1)
else:
compare = self._get_field(context, compare_ref)
if field_value is not None and compare is not None:
try:
return float(field_value) < float(compare)
except (ValueError, TypeError):
return False
return False
elif cond_type == "count_greater_than":
field_value = self._get_field(context, condition.get("field", ""))
if isinstance(field_value, (list, dict)):
return len(field_value) > condition.get("value", 0)
return False
elif cond_type == "count_equals":
field_value = self._get_field(context, condition.get("field", ""))
if isinstance(field_value, (list, dict)):
return len(field_value) == condition.get("value", 0)
return False
elif cond_type == "any_of":
# Check if any item in array matches a condition
field_value = self._get_field(context, condition.get("field", ""))
if not isinstance(field_value, list):
return False
check = condition.get("check", {})
for item in field_value:
if isinstance(item, dict):
item_context = {**context, "item": item}
if self._evaluate_condition(check, item_context):
return True
return False
elif cond_type == "ratio_greater_than":
# For bounds checking
fields = condition.get("field", [])
if len(fields) == 2:
val1 = self._get_field(context, f"item.{fields[0]}")
val2 = self._get_field(context, f"item.{fields[1]}")
if val1 and val2 and val2 != 0:
try:
return float(val1) / float(val2) > condition.get("value", 1)
except (ValueError, TypeError):
return False
return False
return False
def _get_field(self, context: Dict[str, Any], field_path: str) -> Any:
"""Get a field value from context using dot notation."""
if not field_path:
return None
parts = field_path.split(".")
current = context
for part in parts:
if current is None:
return None
if isinstance(current, dict):
current = current.get(part)
else:
return None
return current
class EngineeringValidator:
"""
Main engineering validator.
Combines materials database and anti-pattern detection with
additional validation logic.
"""
def __init__(self):
"""Initialize validator with materials DB and anti-pattern detector."""
self.materials_db = MaterialsDatabase()
self.anti_patterns = AntiPatternDetector()
def validate_constraint(
self,
constraint_type: str,
value: float,
material: Optional[str] = None,
baseline: Optional[float] = None
) -> ValidationResult:
"""
Validate a constraint value against engineering limits.
Args:
constraint_type: Type of constraint (stress, displacement, frequency)
value: Constraint value
material: Optional material name for property lookups
baseline: Optional baseline value for feasibility check
Returns:
ValidationResult
"""
if constraint_type == "stress" and material:
return self.materials_db.validate_stress_limit(material, value)
# Check against baseline if available
if baseline is not None:
if constraint_type in ["stress", "displacement"]:
# Max constraint - baseline should be under limit
if baseline > value:
return ValidationResult(
valid=True,
message=f"Baseline ({baseline:.2f}) exceeds limit ({value}). Optimization starts infeasible.",
severity="warning",
suggestion="Consider relaxing the constraint or improving the baseline design"
)
elif constraint_type == "frequency":
# Min constraint - baseline should be above limit
if baseline < value:
return ValidationResult(
valid=True,
message=f"Baseline frequency ({baseline:.2f} Hz) is below limit ({value} Hz). Optimization starts infeasible.",
severity="warning",
suggestion="Consider relaxing the constraint"
)
return ValidationResult(
valid=True,
message=f"Constraint {constraint_type} = {value} accepted",
severity="ok"
)
def validate_bounds(
self,
parameter: str,
min_value: float,
max_value: float,
current_value: Optional[float] = None
) -> ValidationResult:
"""
Validate design variable bounds.
Args:
parameter: Parameter name
min_value: Lower bound
max_value: Upper bound
current_value: Current/nominal value
Returns:
ValidationResult
"""
if min_value >= max_value:
return ValidationResult(
valid=False,
message=f"Invalid bounds for {parameter}: min ({min_value}) >= max ({max_value})",
severity="error",
field=parameter
)
# Check bounds width
if min_value > 0:
ratio = max_value / min_value
if ratio > 10:
return ValidationResult(
valid=True,
message=f"Wide bounds for {parameter}: ratio {ratio:.1f}x may slow convergence",
severity="warning",
suggestion=f"Consider narrowing to {min_value:.2f} - {min_value * 5:.2f}",
field=parameter
)
# Check if current value is within bounds
if current_value is not None:
if current_value < min_value or current_value > max_value:
return ValidationResult(
valid=True,
message=f"Current value ({current_value}) for {parameter} is outside bounds [{min_value}, {max_value}]",
severity="warning",
suggestion="Adjust bounds to include current value or update nominal design",
field=parameter
)
return ValidationResult(
valid=True,
message=f"Bounds for {parameter} are valid",
severity="ok",
field=parameter
)
def suggest_bounds(
self,
parameter: str,
current_value: float,
context: Optional[Dict[str, Any]] = None
) -> Tuple[float, float]:
"""
Suggest reasonable bounds for a design variable.
Args:
parameter: Parameter name
current_value: Current value
context: Optional context (material, application, etc.)
Returns:
Tuple of (suggested_min, suggested_max)
"""
# Default: +/- 50% of current value
if current_value > 0:
suggested_min = current_value * 0.5
suggested_max = current_value * 1.5
elif current_value < 0:
suggested_min = current_value * 1.5
suggested_max = current_value * 0.5
else:
suggested_min = -1.0
suggested_max = 1.0
# Adjust based on parameter name heuristics
name_lower = parameter.lower()
if "thickness" in name_lower:
# Thickness should stay positive with reasonable manufacturing limits
suggested_min = max(0.5, current_value * 0.3) # Min 0.5mm
suggested_max = current_value * 2.0
elif "radius" in name_lower or "fillet" in name_lower:
# Radii should stay positive
suggested_min = max(0.1, current_value * 0.2)
suggested_max = current_value * 3.0
elif "angle" in name_lower:
# Angles often have natural limits
suggested_min = max(-90, current_value - 30)
suggested_max = min(90, current_value + 30)
return (round(suggested_min, 3), round(suggested_max, 3))
def detect_anti_patterns(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> List[AntiPattern]:
"""
Detect common optimization anti-patterns.
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
List of detected anti-patterns
"""
return self.anti_patterns.check_all(state, introspection or {})
def validate_all(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> List[ValidationResult]:
"""
Run all validations on interview state.
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
List of all validation results
"""
results = []
answers = state.answers
intro = introspection or {}
# Validate constraints
if "max_stress" in answers.get("constraints", {}):
material = intro.get("material", {}).get("name")
result = self.validate_constraint(
"stress",
answers["constraints"]["max_stress"],
material=material,
baseline=intro.get("baseline_stress")
)
results.append(result)
if "max_displacement" in answers.get("constraints", {}):
result = self.validate_constraint(
"displacement",
answers["constraints"]["max_displacement"],
baseline=intro.get("baseline_displacement")
)
results.append(result)
if "min_frequency" in answers.get("constraints", {}):
result = self.validate_constraint(
"frequency",
answers["constraints"]["min_frequency"],
baseline=intro.get("baseline_frequency")
)
results.append(result)
# Validate design variable bounds
for dv in answers.get("design_variables", []):
if isinstance(dv, dict):
result = self.validate_bounds(
dv.get("parameter", "unknown"),
dv.get("min_value", 0),
dv.get("max_value", 1),
dv.get("current_value")
)
results.append(result)
# Check anti-patterns
anti_patterns = self.detect_anti_patterns(state, intro)
for ap in anti_patterns:
results.append(ValidationResult(
valid=ap.severity != "error",
message=f"[{ap.name}] {ap.description}",
severity=ap.severity,
suggestion=ap.fix_suggestion
))
return results
def has_blocking_issues(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> Tuple[bool, List[str]]:
"""
Check if there are any blocking issues.
Returns:
Tuple of (has_blocking, list_of_blocking_messages)
"""
results = self.validate_all(state, introspection)
blocking = [r.message for r in results if r.is_blocking()]
return len(blocking) > 0, blocking
# Import for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .interview_state import InterviewState

View File

@@ -0,0 +1,648 @@
"""
Interview Intelligence
Smart features for the interview process:
- ExtractorMapper: Maps goals to appropriate extractors
- InterviewIntelligence: Auto-detection, inference, complexity determination
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Any, Optional, Literal, Tuple
import re
@dataclass
class ExtractorSelection:
"""Result of mapping a goal to an extractor."""
extractor_id: str
extractor_name: str
goal_type: str # minimize, maximize, target
params: Dict[str, Any] = field(default_factory=dict)
fallback: Optional[str] = None
confidence: float = 1.0
notes: Optional[str] = None
class ExtractorMapper:
"""
Maps physics goals to appropriate extractors.
Uses the Atomizer extractor library (SYS_12) to select
the right extractor for each objective or constraint.
"""
# Goal to extractor mapping
GOAL_MAP = {
# Mass objectives
"minimize_mass": ExtractorSelection(
extractor_id="E4",
extractor_name="BDF Mass Extraction",
goal_type="minimize",
fallback="E5",
notes="Uses BDF parsing for accurate mass. Falls back to NX expression."
),
"minimize_weight": ExtractorSelection(
extractor_id="E4",
extractor_name="BDF Mass Extraction",
goal_type="minimize",
fallback="E5"
),
# Displacement/stiffness objectives
"minimize_displacement": ExtractorSelection(
extractor_id="E1",
extractor_name="Displacement Extraction",
goal_type="minimize",
params={"component": "magnitude", "node_id": "auto"},
notes="Extracts displacement magnitude. Node ID auto-detected from max."
),
"maximize_stiffness": ExtractorSelection(
extractor_id="E1",
extractor_name="Displacement Extraction",
goal_type="minimize", # Stiffness = 1/displacement
params={"component": "magnitude", "node_id": "auto"},
notes="Stiffness maximization = displacement minimization"
),
# Frequency objectives
"maximize_frequency": ExtractorSelection(
extractor_id="E2",
extractor_name="Frequency Extraction",
goal_type="maximize",
params={"mode_number": 1},
notes="First natural frequency. Mode number adjustable."
),
"target_frequency": ExtractorSelection(
extractor_id="E2",
extractor_name="Frequency Extraction",
goal_type="target",
params={"mode_number": 1, "target": None},
notes="Target a specific frequency value."
),
# Stress objectives
"minimize_stress": ExtractorSelection(
extractor_id="E3",
extractor_name="Solid Stress Extraction",
goal_type="minimize",
params={"element_type": "auto", "stress_type": "von_mises"},
notes="Von Mises stress. Element type auto-detected."
),
# Optical objectives
"minimize_wavefront_error": ExtractorSelection(
extractor_id="E8",
extractor_name="Zernike Wavefront Fitting",
goal_type="minimize",
params={"n_terms": 15, "radius": "auto"},
notes="Fits surface to Zernike polynomials. Optical applications."
),
# Custom
"custom": ExtractorSelection(
extractor_id="custom",
extractor_name="Custom Extractor",
goal_type="custom",
confidence=0.5,
notes="User will define custom extraction logic."
),
}
# Constraint type to extractor mapping
CONSTRAINT_MAP = {
"stress": ExtractorSelection(
extractor_id="E3",
extractor_name="Solid Stress Extraction",
goal_type="max",
params={"stress_type": "von_mises"}
),
"max_stress": ExtractorSelection(
extractor_id="E3",
extractor_name="Solid Stress Extraction",
goal_type="max",
params={"stress_type": "von_mises"}
),
"displacement": ExtractorSelection(
extractor_id="E1",
extractor_name="Displacement Extraction",
goal_type="max",
params={"component": "magnitude"}
),
"max_displacement": ExtractorSelection(
extractor_id="E1",
extractor_name="Displacement Extraction",
goal_type="max",
params={"component": "magnitude"}
),
"frequency": ExtractorSelection(
extractor_id="E2",
extractor_name="Frequency Extraction",
goal_type="min",
params={"mode_number": 1}
),
"min_frequency": ExtractorSelection(
extractor_id="E2",
extractor_name="Frequency Extraction",
goal_type="min",
params={"mode_number": 1}
),
"mass": ExtractorSelection(
extractor_id="E4",
extractor_name="BDF Mass Extraction",
goal_type="max"
),
"max_mass": ExtractorSelection(
extractor_id="E4",
extractor_name="BDF Mass Extraction",
goal_type="max"
),
}
def map_goal_to_extractor(
self,
goal: str,
introspection: Optional[Dict[str, Any]] = None
) -> ExtractorSelection:
"""
Map a physics goal to the appropriate extractor.
Args:
goal: Goal identifier (e.g., "minimize_mass")
introspection: Optional introspection results for auto-detection
Returns:
ExtractorSelection with extractor details
"""
goal_lower = goal.lower().strip()
# Direct match
if goal_lower in self.GOAL_MAP:
selection = self.GOAL_MAP[goal_lower]
# Auto-detect parameters if introspection available
if introspection:
selection = self._refine_selection(selection, introspection)
return selection
# Fuzzy matching for common variations
for key, selection in self.GOAL_MAP.items():
if key.replace("_", " ") in goal_lower or goal_lower in key:
return selection
# Default to custom
return self.GOAL_MAP["custom"]
def map_constraint_to_extractor(
self,
constraint_type: str,
introspection: Optional[Dict[str, Any]] = None
) -> ExtractorSelection:
"""
Map a constraint type to the appropriate extractor.
Args:
constraint_type: Constraint type (e.g., "stress", "displacement")
introspection: Optional introspection results
Returns:
ExtractorSelection with extractor details
"""
type_lower = constraint_type.lower().strip()
if type_lower in self.CONSTRAINT_MAP:
selection = self.CONSTRAINT_MAP[type_lower]
if introspection:
selection = self._refine_selection(selection, introspection)
return selection
# Try to infer from name
if "stress" in type_lower:
return self.CONSTRAINT_MAP["stress"]
if "disp" in type_lower or "deflect" in type_lower:
return self.CONSTRAINT_MAP["displacement"]
if "freq" in type_lower or "modal" in type_lower:
return self.CONSTRAINT_MAP["frequency"]
if "mass" in type_lower or "weight" in type_lower:
return self.CONSTRAINT_MAP["mass"]
return ExtractorSelection(
extractor_id="custom",
extractor_name="Custom Constraint",
goal_type="constraint",
confidence=0.5
)
def _refine_selection(
self,
selection: ExtractorSelection,
introspection: Dict[str, Any]
) -> ExtractorSelection:
"""Refine extractor selection based on introspection."""
import copy
refined = copy.deepcopy(selection)
# Auto-detect element type for stress extraction
if refined.extractor_id == "E3" and refined.params.get("element_type") == "auto":
element_types = introspection.get("element_types", [])
if "solid" in element_types or any("TET" in e or "HEX" in e for e in element_types):
refined.params["element_type"] = "solid"
elif "shell" in element_types or any("QUAD" in e or "TRI" in e for e in element_types):
refined.params["element_type"] = "shell"
refined.extractor_id = "E3_shell" # Use shell stress extractor
# Auto-detect node for displacement
if refined.extractor_id == "E1" and refined.params.get("node_id") == "auto":
# Use max displacement node from baseline if available
if "max_disp_node" in introspection:
refined.params["node_id"] = introspection["max_disp_node"]
return refined
def get_extractor_summary(self, selections: List[ExtractorSelection]) -> str:
"""Generate a summary of selected extractors."""
lines = ["**Selected Extractors:**", ""]
for sel in selections:
params_str = ""
if sel.params:
params_str = " (" + ", ".join(f"{k}={v}" for k, v in sel.params.items()) + ")"
lines.append(f"- **{sel.extractor_id}**: {sel.extractor_name}{params_str}")
if sel.notes:
lines.append(f" > {sel.notes}")
return "\n".join(lines)
@dataclass
class StudyTypeInference:
"""Result of inferring study type."""
study_type: str # single_objective, multi_objective, parametric
protocol: str # protocol_10_single, protocol_11_multi
confidence: float
reasons: List[str] = field(default_factory=list)
class InterviewIntelligence:
"""
Smart features for the interview process.
Provides:
- Study type inference from context
- Auto-selection of extractors
- History-based suggestions
- Complexity determination
"""
def __init__(self):
"""Initialize intelligence module."""
self.extractor_mapper = ExtractorMapper()
def infer_study_type(
self,
study_name: str,
user_description: str,
introspection: Optional[Dict[str, Any]] = None
) -> StudyTypeInference:
"""
Infer study type from available context.
Args:
study_name: Study name (may contain hints)
user_description: User's problem description
introspection: Optional introspection results
Returns:
StudyTypeInference with type and protocol
"""
reasons = []
score_multi = 0
score_single = 0
text = f"{study_name} {user_description}".lower()
# Check for multi-objective keywords
if any(kw in text for kw in ["pareto", "trade-off", "tradeoff", "multi-objective", "multiobjective"]):
score_multi += 2
reasons.append("Multi-objective keywords detected")
if any(kw in text for kw in ["versus", " vs ", "and minimize", "and maximize", "balance"]):
score_multi += 1
reasons.append("Conflicting goals language detected")
# Check for single-objective keywords
if any(kw in text for kw in ["minimize", "maximize", "reduce", "increase"]):
# Count occurrences
count = sum(1 for kw in ["minimize", "maximize", "reduce", "increase"] if kw in text)
if count == 1:
score_single += 1
reasons.append("Single optimization goal language")
else:
score_multi += 1
reasons.append("Multiple optimization verbs detected")
# Default to single objective if no strong signals
if score_multi > score_single:
return StudyTypeInference(
study_type="multi_objective",
protocol="protocol_11_multi",
confidence=min(1.0, 0.5 + score_multi * 0.2),
reasons=reasons
)
else:
return StudyTypeInference(
study_type="single_objective",
protocol="protocol_10_single",
confidence=min(1.0, 0.5 + score_single * 0.2),
reasons=reasons if reasons else ["Default to single-objective"]
)
def auto_select_extractors(
self,
objectives: List[Dict[str, Any]],
constraints: List[Dict[str, Any]],
introspection: Optional[Dict[str, Any]] = None
) -> Dict[str, ExtractorSelection]:
"""
Automatically select appropriate extractors.
Args:
objectives: List of objective definitions
constraints: List of constraint definitions
introspection: Optional introspection results
Returns:
Dict mapping objective/constraint names to ExtractorSelection
"""
selections = {}
# Map objectives
for i, obj in enumerate(objectives):
goal = obj.get("goal", "") if isinstance(obj, dict) else str(obj)
name = obj.get("name", f"objective_{i}") if isinstance(obj, dict) else f"objective_{i}"
selection = self.extractor_mapper.map_goal_to_extractor(goal, introspection)
selections[name] = selection
# Map constraints
for i, con in enumerate(constraints):
con_type = con.get("type", "") if isinstance(con, dict) else str(con)
name = con.get("name", f"constraint_{i}") if isinstance(con, dict) else f"constraint_{i}"
selection = self.extractor_mapper.map_constraint_to_extractor(con_type, introspection)
selections[name] = selection
return selections
def determine_complexity(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> Literal["simple", "moderate", "complex"]:
"""
Determine study complexity for adaptive questioning.
Based on:
- Number of objectives
- Number of design variables
- Analysis complexity
- Custom components
Args:
state: Current interview state
introspection: Optional introspection results
Returns:
Complexity level
"""
score = 0
answers = state.answers
# Objectives
n_obj = len(answers.get("objectives", []))
secondary = answers.get("objectives_secondary", [])
if "none" not in secondary:
n_obj += len(secondary)
if n_obj == 1:
score += 0
elif n_obj == 2:
score += 1
else:
score += 2
# Design variables
n_dvs = len(answers.get("design_variables", []))
if n_dvs <= 3:
score += 0
elif n_dvs <= 6:
score += 1
else:
score += 2
# Analysis types
analysis_types = answers.get("analysis_types", [])
if len(analysis_types) > 2:
score += 2
elif len(analysis_types) > 1:
score += 1
if "coupled_thermal_structural" in analysis_types:
score += 1
if "nonlinear" in analysis_types:
score += 1
# Introspection complexity
if introspection:
if introspection.get("multiple_solutions", False):
score += 1
if len(introspection.get("expressions", [])) > 20:
score += 1
# Categorize
if score <= 2:
return "simple"
elif score <= 5:
return "moderate"
else:
return "complex"
def suggest_trial_count(
self,
n_design_variables: int,
n_objectives: int,
complexity: str
) -> int:
"""
Suggest appropriate number of trials.
Args:
n_design_variables: Number of design variables
n_objectives: Number of objectives
complexity: Study complexity level
Returns:
Suggested trial count
"""
# Base: 15 trials per design variable
base = n_design_variables * 15
# Multi-objective needs more
if n_objectives > 1:
base = int(base * 1.5)
# Adjust for complexity
if complexity == "simple":
base = max(50, base)
elif complexity == "moderate":
base = max(100, base)
else:
base = max(150, base)
# Round to nice numbers
if base <= 50:
return 50
elif base <= 75:
return 75
elif base <= 100:
return 100
elif base <= 150:
return 150
elif base <= 200:
return 200
else:
return int((base // 100) * 100)
def suggest_sampler(
self,
n_objectives: int,
n_design_variables: int
) -> str:
"""
Suggest appropriate sampler/optimizer.
Args:
n_objectives: Number of objectives
n_design_variables: Number of design variables
Returns:
Sampler name
"""
if n_objectives > 1:
return "NSGA-II" # Multi-objective
elif n_design_variables <= 3:
return "TPE" # Tree-structured Parzen Estimator
elif n_design_variables <= 10:
return "CMA-ES" # Covariance Matrix Adaptation
else:
return "TPE" # TPE handles high dimensions well
def analyze_design_variable_candidates(
self,
expressions: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Analyze expressions to find design variable candidates.
Args:
expressions: List of expressions from introspection
Returns:
Sorted list of candidates with scores
"""
candidates = []
# High confidence patterns
high_patterns = [
(r"thickness", "Thickness parameter"),
(r"width", "Width parameter"),
(r"height", "Height parameter"),
(r"diameter", "Diameter parameter"),
(r"radius", "Radius parameter"),
(r"length", "Length parameter"),
(r"depth", "Depth parameter"),
(r"angle", "Angle parameter"),
(r"fillet", "Fillet radius"),
(r"chamfer", "Chamfer dimension"),
(r"rib_", "Rib parameter"),
(r"wall_", "Wall parameter"),
(r"flange_", "Flange parameter"),
]
# Medium confidence patterns
medium_patterns = [
(r"dim_", "Dimension parameter"),
(r"size_", "Size parameter"),
(r"param_", "Named parameter"),
(r"^p\d+$", "Numbered parameter"),
(r"var_", "Variable"),
]
# Exclusion patterns
exclude_patterns = [
r"mesh_", r"count_", r"num_", r"material",
r"derived_", r"calc_", r"_result$", r"_output$",
r"^n\d+$", r"count$"
]
for expr in expressions:
name = expr.get("name", "")
value = expr.get("value")
formula = expr.get("formula", "")
# Skip non-numeric
if not isinstance(value, (int, float)):
continue
# Skip formulas (computed values)
if formula and formula != str(value):
continue
# Check exclusions
if any(re.search(p, name.lower()) for p in exclude_patterns):
continue
# Score
score = 0
reason = "Named expression"
for pattern, desc in high_patterns:
if re.search(pattern, name.lower()):
score = 3
reason = desc
break
if score == 0:
for pattern, desc in medium_patterns:
if re.search(pattern, name.lower()):
score = 2
reason = desc
break
if score == 0 and len(name) > 2:
score = 1
if score > 0:
candidates.append({
"name": name,
"value": value,
"score": score,
"reason": reason,
"suggested_min": round(value * 0.5, 3) if value > 0 else round(value * 1.5, 3),
"suggested_max": round(value * 1.5, 3) if value > 0 else round(value * 0.5, 3),
})
# Sort by score descending
candidates.sort(key=lambda x: (-x["score"], x["name"]))
return candidates
# Import for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .interview_state import InterviewState

View File

@@ -0,0 +1,588 @@
"""
Interview Presenter
Abstract presentation layer for different UI modes.
Handles:
- Formatting questions for display
- Parsing user responses
- Showing summaries and warnings
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Optional, List, Dict
import re
from .question_engine import Question, QuestionOption
@dataclass
class PresentedQuestion:
"""A question formatted for presentation."""
question_id: str
formatted_text: str
question_number: int
total_questions: int
category_name: str
class InterviewPresenter(ABC):
"""
Abstract base for interview presentation.
Different presenters handle UI-specific rendering:
- ClaudePresenter: Markdown for Claude conversation
- DashboardPresenter: WebSocket events for React UI (future)
- CLIPresenter: Interactive terminal prompts (future)
"""
@abstractmethod
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""
Format a question for display.
Args:
question: Question to present
question_number: Current question number
total_questions: Estimated total questions
category_name: Name of the question category
dynamic_content: Dynamic content to inject (e.g., extractor summary)
Returns:
Formatted question string
"""
pass
@abstractmethod
def parse_response(self, response: str, question: Question) -> Any:
"""
Parse user's response into structured value.
Args:
response: Raw user response
question: Question being answered
Returns:
Parsed answer value
"""
pass
@abstractmethod
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""
Format interview summary/blueprint for display.
Args:
blueprint: Generated study blueprint
Returns:
Formatted summary string
"""
pass
@abstractmethod
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""
Format a warning message for display.
Args:
warning: Warning message
severity: "error", "warning", or "info"
Returns:
Formatted warning string
"""
pass
@abstractmethod
def show_progress(self, current: int, total: int, phase: str) -> str:
"""
Format progress indicator.
Args:
current: Current question number
total: Estimated total questions
phase: Current phase name
Returns:
Formatted progress string
"""
pass
class ClaudePresenter(InterviewPresenter):
"""
Presenter for Claude conversation mode (VS Code, Web).
Formats questions and responses as markdown for natural
conversation flow with Claude.
"""
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""Format question as markdown for Claude to present."""
lines = []
# Header with progress
lines.append(f"### Question {question_number} of ~{total_questions}: {category_name}")
lines.append("")
# Main question text
lines.append(question.text)
lines.append("")
# Dynamic content if provided
if dynamic_content:
lines.append(dynamic_content)
lines.append("")
# Options for choice questions
if question.options and question.question_type in ["choice", "multi_choice"]:
for i, opt in enumerate(question.options, 1):
desc = f" - {opt.description}" if opt.description else ""
lines.append(f"{i}. **{opt.label}**{desc}")
lines.append("")
# Help text
if question.help_text:
lines.append(f"> {question.help_text}")
lines.append("")
# Engineering guidance
if question.engineering_guidance:
lines.append(f"> **Tip**: {question.engineering_guidance}")
lines.append("")
# Default value hint
if question.default is not None and question.default != []:
if isinstance(question.default, list):
default_str = ", ".join(str(d) for d in question.default)
else:
default_str = str(question.default)
lines.append(f"*Default: {default_str}*")
lines.append("")
# Input prompt based on type
if question.question_type == "text":
lines.append("Please describe:")
elif question.question_type == "numeric":
units = question.validation.units if question.validation else ""
lines.append(f"Enter value{f' ({units})' if units else ''}:")
elif question.question_type == "choice":
lines.append("Type your choice (number or description):")
elif question.question_type == "multi_choice":
lines.append("Type your choices (numbers or descriptions, comma-separated):")
elif question.question_type == "confirm":
lines.append("Type **yes** or **no**:")
elif question.question_type == "parameter_select":
lines.append("Type parameter names (comma-separated) or select by number:")
elif question.question_type == "bounds":
lines.append("Enter bounds (e.g., '2 to 10' or 'min 2, max 10'):")
return "\n".join(lines)
def parse_response(self, response: str, question: Question) -> Any:
"""Parse natural language response into structured answer."""
response = response.strip()
if question.question_type == "text":
return response
elif question.question_type == "numeric":
return self._parse_numeric(response, question)
elif question.question_type == "confirm":
return self._parse_confirm(response)
elif question.question_type == "choice":
return self._parse_choice(response, question)
elif question.question_type == "multi_choice":
return self._parse_multi_choice(response, question)
elif question.question_type == "parameter_select":
return self._parse_parameter_select(response, question)
elif question.question_type == "bounds":
return self._parse_bounds(response)
return response
def _parse_numeric(self, response: str, question: Question) -> Optional[float]:
"""Parse numeric response with unit handling."""
# Remove common unit suffixes
cleaned = re.sub(r'\s*(mm|cm|m|kg|g|MPa|Pa|GPa|Hz|kHz|MHz|°|deg)s?\s*$', '', response, flags=re.I)
# Extract number
match = re.search(r'[-+]?\d*\.?\d+', cleaned)
if match:
return float(match.group())
return None
def _parse_confirm(self, response: str) -> Optional[bool]:
"""Parse yes/no confirmation."""
lower = response.lower().strip()
# Positive responses
if lower in ["yes", "y", "true", "1", "ok", "sure", "yep", "yeah", "correct", "confirmed", "confirm", "affirmative"]:
return True
# Negative responses
if lower in ["no", "n", "false", "0", "nope", "nah", "cancel", "incorrect", "negative"]:
return False
# Try to detect intent from natural language
if "yes" in lower or "ok" in lower or "correct" in lower:
return True
if "no" in lower or "don't" in lower or "not" in lower:
return False
return None
def _parse_choice(self, response: str, question: Question) -> Any:
"""Parse single choice response."""
if not question.options:
return response
# Try by number
if response.isdigit():
idx = int(response) - 1
if 0 <= idx < len(question.options):
return question.options[idx].value
# Try by value (exact match)
for opt in question.options:
if response.lower() == str(opt.value).lower():
return opt.value
# Try by label (exact match)
for opt in question.options:
if response.lower() == opt.label.lower():
return opt.value
# Try fuzzy match on label
for opt in question.options:
if response.lower() in opt.label.lower():
return opt.value
# Return as-is for custom values
return response
def _parse_multi_choice(self, response: str, question: Question) -> List[Any]:
"""Parse multiple choice response."""
# Split by comma, 'and', or numbers
parts = re.split(r'[,&]|\band\b|\s+', response)
parts = [p.strip() for p in parts if p.strip()]
values = []
for part in parts:
if not part:
continue
# Try by number
if part.isdigit() and question.options:
idx = int(part) - 1
if 0 <= idx < len(question.options):
value = question.options[idx].value
if value not in values:
values.append(value)
continue
# Try by value/label
if question.options:
found = False
for opt in question.options:
if part.lower() == str(opt.value).lower() or part.lower() == opt.label.lower():
if opt.value not in values:
values.append(opt.value)
found = True
break
if part.lower() in opt.label.lower():
if opt.value not in values:
values.append(opt.value)
found = True
break
if found:
continue
# Add as custom value
if part not in values:
values.append(part)
return values
def _parse_parameter_select(self, response: str, question: Question) -> List[str]:
"""Parse parameter selection response."""
# Split by comma, 'and', or numbers
parts = re.split(r'[,&]|\band\b', response)
parameters = []
for part in parts:
part = part.strip()
if not part:
continue
# Try by number if we have options
if part.isdigit() and question.options:
idx = int(part) - 1
if 0 <= idx < len(question.options):
parameters.append(question.options[idx].value)
continue
# Add as parameter name
parameters.append(part)
return parameters
def _parse_bounds(self, response: str) -> Optional[Dict[str, float]]:
"""Parse bounds specification."""
bounds = {}
# Try "min to max" format
match = re.search(r'(\d+\.?\d*)\s*(?:to|-)\s*(\d+\.?\d*)', response)
if match:
bounds["min"] = float(match.group(1))
bounds["max"] = float(match.group(2))
return bounds
# Try "min: X, max: Y" format
min_match = re.search(r'min[:\s]+(\d+\.?\d*)', response, re.I)
max_match = re.search(r'max[:\s]+(\d+\.?\d*)', response, re.I)
if min_match:
bounds["min"] = float(min_match.group(1))
if max_match:
bounds["max"] = float(max_match.group(1))
return bounds if bounds else None
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""Format interview summary/blueprint for display."""
lines = []
lines.append(f"## Study Blueprint: {blueprint.study_name}")
lines.append("")
# Description
if blueprint.study_description:
lines.append(f"**Description**: {blueprint.study_description}")
lines.append("")
# Design Variables
lines.append(f"### Design Variables ({len(blueprint.design_variables)})")
lines.append("")
lines.append("| Parameter | Current | Min | Max | Units |")
lines.append("|-----------|---------|-----|-----|-------|")
for dv in blueprint.design_variables:
lines.append(f"| {dv.parameter} | {dv.current_value} | {dv.min_value} | {dv.max_value} | {dv.units or '-'} |")
lines.append("")
# Objectives
lines.append(f"### Objectives ({len(blueprint.objectives)})")
lines.append("")
lines.append("| Goal | Extractor | Parameters |")
lines.append("|------|-----------|------------|")
for obj in blueprint.objectives:
params = ", ".join(f"{k}={v}" for k, v in (obj.extractor_params or {}).items()) or "-"
lines.append(f"| {obj.goal} | {obj.extractor} | {params} |")
lines.append("")
# Constraints
if blueprint.constraints:
lines.append(f"### Constraints ({len(blueprint.constraints)})")
lines.append("")
lines.append("| Type | Threshold | Extractor |")
lines.append("|------|-----------|-----------|")
for con in blueprint.constraints:
op = "<=" if con.constraint_type == "max" else ">="
lines.append(f"| {con.name} | {op} {con.threshold} | {con.extractor} |")
lines.append("")
# Settings
lines.append("### Settings")
lines.append("")
lines.append(f"- **Protocol**: {blueprint.protocol}")
lines.append(f"- **Trials**: {blueprint.n_trials}")
lines.append(f"- **Sampler**: {blueprint.sampler}")
lines.append("")
# Warnings
if blueprint.warnings_acknowledged:
lines.append("### Acknowledged Warnings")
lines.append("")
for warning in blueprint.warnings_acknowledged:
lines.append(f"- {warning}")
lines.append("")
lines.append("---")
lines.append("")
lines.append("Does this look correct? Reply **yes** to generate the study, or describe what to change.")
return "\n".join(lines)
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""Format a warning message for display."""
icons = {
"error": "X",
"warning": "!",
"info": "i"
}
icon = icons.get(severity, "!")
if severity == "error":
return f"\n**[{icon}] ERROR**: {warning}\n"
elif severity == "warning":
return f"\n**[{icon}] Warning**: {warning}\n"
else:
return f"\n*[{icon}] Note*: {warning}\n"
def show_progress(self, current: int, total: int, phase: str) -> str:
"""Format progress indicator."""
percentage = int((current / total) * 100) if total > 0 else 0
bar_length = 20
filled = int(bar_length * current / total) if total > 0 else 0
bar = "=" * filled + "-" * (bar_length - filled)
return f"**Progress**: [{bar}] {percentage}% - {phase}"
class DashboardPresenter(InterviewPresenter):
"""
Presenter for dashboard UI mode (future).
Emits WebSocket events for React UI to render.
"""
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""Emit WebSocket event for dashboard to render."""
# This would emit an event to the dashboard
# For now, return JSON representation
import json
return json.dumps({
"type": "question",
"data": {
"question_id": question.id,
"question_number": question_number,
"total_questions": total_questions,
"category": category_name,
"text": question.text,
"question_type": question.question_type,
"options": [{"value": o.value, "label": o.label} for o in (question.options or [])],
"help_text": question.help_text,
"default": question.default,
"dynamic_content": dynamic_content,
}
})
def parse_response(self, response: str, question: Question) -> Any:
"""Parse JSON response from dashboard."""
import json
try:
data = json.loads(response)
return data.get("value", response)
except json.JSONDecodeError:
# Fall back to Claude parser
claude = ClaudePresenter()
return claude.parse_response(response, question)
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""Emit summary event for dashboard."""
import json
return json.dumps({
"type": "summary",
"data": blueprint.to_dict() if hasattr(blueprint, 'to_dict') else str(blueprint)
})
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""Emit warning event for dashboard."""
import json
return json.dumps({
"type": "warning",
"data": {"message": warning, "severity": severity}
})
def show_progress(self, current: int, total: int, phase: str) -> str:
"""Emit progress event for dashboard."""
import json
return json.dumps({
"type": "progress",
"data": {"current": current, "total": total, "phase": phase}
})
class CLIPresenter(InterviewPresenter):
"""
Presenter for CLI wizard mode (future).
Interactive terminal prompts using Rich/Questionary.
"""
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""Format for CLI display."""
# Simple text format for CLI
lines = []
lines.append(f"\n[{question_number}/{total_questions}] {category_name}")
lines.append("-" * 50)
lines.append(question.text)
if question.options:
for i, opt in enumerate(question.options, 1):
lines.append(f" {i}. {opt.label}")
if question.help_text:
lines.append(f"\nHint: {question.help_text}")
lines.append("")
return "\n".join(lines)
def parse_response(self, response: str, question: Question) -> Any:
"""Parse CLI response (delegate to Claude parser)."""
claude = ClaudePresenter()
return claude.parse_response(response, question)
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""Format summary for CLI."""
claude = ClaudePresenter()
return claude.show_summary(blueprint)
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""Format warning for CLI."""
icons = {"error": "[ERROR]", "warning": "[WARN]", "info": "[INFO]"}
return f"\n{icons.get(severity, '[WARN]')} {warning}\n"
def show_progress(self, current: int, total: int, phase: str) -> str:
"""Format progress for CLI."""
return f"Progress: {current}/{total} ({phase})"
# Import for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .study_blueprint import StudyBlueprint

View File

@@ -0,0 +1,556 @@
"""
Interview State Management
This module handles the persistence and management of interview state across sessions.
It provides:
- InterviewState: Complete state dataclass
- InterviewPhase: Enum for interview phases
- InterviewStateManager: Save/load/history functionality
- LogEntry: Audit log entries
"""
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Any, Optional, Literal
import json
import uuid
import shutil
import os
class InterviewPhase(Enum):
"""Interview phases in order of progression."""
INTROSPECTION = "introspection"
PROBLEM_DEFINITION = "problem_definition"
OBJECTIVES = "objectives"
CONSTRAINTS = "constraints"
DESIGN_VARIABLES = "design_variables"
VALIDATION = "validation"
REVIEW = "review"
COMPLETE = "complete"
@classmethod
def from_string(cls, s: str) -> "InterviewPhase":
"""Convert string to enum."""
for phase in cls:
if phase.value == s:
return phase
raise ValueError(f"Unknown phase: {s}")
def next_phase(self) -> Optional["InterviewPhase"]:
"""Get the next phase in sequence."""
phases = list(InterviewPhase)
idx = phases.index(self)
if idx < len(phases) - 1:
return phases[idx + 1]
return None
def previous_phase(self) -> Optional["InterviewPhase"]:
"""Get the previous phase in sequence."""
phases = list(InterviewPhase)
idx = phases.index(self)
if idx > 0:
return phases[idx - 1]
return None
@dataclass
class AnsweredQuestion:
"""Record of an answered question."""
question_id: str
answered_at: str # ISO datetime
raw_response: str
parsed_value: Any
inferred: Optional[Dict[str, Any]] = None # What was inferred from answer
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"question_id": self.question_id,
"answered_at": self.answered_at,
"raw_response": self.raw_response,
"parsed_value": self.parsed_value,
"inferred": self.inferred,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AnsweredQuestion":
"""Create from dictionary."""
return cls(
question_id=data["question_id"],
answered_at=data["answered_at"],
raw_response=data["raw_response"],
parsed_value=data["parsed_value"],
inferred=data.get("inferred"),
)
@dataclass
class LogEntry:
"""Entry for the human-readable audit log."""
timestamp: datetime
question_id: str
question_text: str
answer_raw: str
answer_parsed: Any
inferred: Optional[Dict[str, Any]] = None
warnings: Optional[List[str]] = None
def to_markdown(self) -> str:
"""Format as markdown for audit log."""
lines = [
f"## [{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] Question: {self.question_id}",
"",
f"**Question**: {self.question_text}",
"",
f"**Answer**: {self.answer_raw}",
"",
]
if self.answer_parsed != self.answer_raw:
lines.extend([
f"**Parsed Value**: `{self.answer_parsed}`",
"",
])
if self.inferred:
lines.append("**Inferred**:")
for key, value in self.inferred.items():
lines.append(f"- {key}: {value}")
lines.append("")
if self.warnings:
lines.append("**Warnings**:")
for warning in self.warnings:
lines.append(f"- {warning}")
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
@dataclass
class InterviewState:
"""
Complete interview state (JSON-serializable).
This dataclass holds all state needed to resume an interview,
including introspection results, answers, and derived configuration.
"""
version: str = "1.0"
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
study_name: str = ""
study_path: str = ""
parent_study: Optional[str] = None
# Progress tracking
started_at: str = field(default_factory=lambda: datetime.now().isoformat())
last_updated: str = field(default_factory=lambda: datetime.now().isoformat())
current_phase: str = InterviewPhase.INTROSPECTION.value
complexity: Literal["simple", "moderate", "complex"] = "simple"
# Question tracking
questions_answered: List[Dict[str, Any]] = field(default_factory=list)
questions_remaining: List[str] = field(default_factory=list)
current_question_id: Optional[str] = None
# Introspection cache
introspection: Dict[str, Any] = field(default_factory=dict)
# Collected answers (organized by category)
answers: Dict[str, Any] = field(default_factory=lambda: {
"problem_description": None,
"physical_context": None,
"analysis_types": [],
"objectives": [],
"constraints": [],
"design_variables": [],
"protocol": None,
"n_trials": 100,
"use_neural_acceleration": False,
})
# Derived/inferred configuration
inferred_config: Dict[str, Any] = field(default_factory=dict)
# Validation results
warnings: List[str] = field(default_factory=list)
warnings_acknowledged: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
# Blueprint (when complete)
blueprint: Optional[Dict[str, Any]] = None
def get_phase(self) -> InterviewPhase:
"""Get current phase as enum."""
return InterviewPhase.from_string(self.current_phase)
def set_phase(self, phase: InterviewPhase) -> None:
"""Set current phase."""
self.current_phase = phase.value
self.touch()
def touch(self) -> None:
"""Update last_updated timestamp."""
self.last_updated = datetime.now().isoformat()
def is_complete(self) -> bool:
"""Check if interview is complete."""
return self.current_phase == InterviewPhase.COMPLETE.value
def current_question_count(self) -> int:
"""Get number of questions answered."""
return len(self.questions_answered)
def progress_percentage(self) -> float:
"""
Estimate progress through interview.
Based on phase, not questions, since questions are adaptive.
"""
phases = list(InterviewPhase)
current_idx = phases.index(self.get_phase())
return (current_idx / (len(phases) - 1)) * 100
def add_answered_question(self, question: AnsweredQuestion) -> None:
"""Record a question as answered."""
self.questions_answered.append(question.to_dict())
if question.question_id in self.questions_remaining:
self.questions_remaining.remove(question.question_id)
self.touch()
def get_answer(self, key: str, default: Any = None) -> Any:
"""Get an answer by key."""
return self.answers.get(key, default)
def set_answer(self, key: str, value: Any) -> None:
"""Set an answer."""
self.answers[key] = value
self.touch()
def add_warning(self, warning: str) -> None:
"""Add a warning message."""
if warning not in self.warnings:
self.warnings.append(warning)
self.touch()
def acknowledge_warning(self, warning: str) -> None:
"""Mark a warning as acknowledged."""
if warning in self.warnings and warning not in self.warnings_acknowledged:
self.warnings_acknowledged.append(warning)
self.touch()
def has_unacknowledged_errors(self) -> bool:
"""Check if there are blocking errors."""
return len(self.errors) > 0
def has_unacknowledged_warnings(self) -> bool:
"""Check if there are unacknowledged warnings."""
return any(w not in self.warnings_acknowledged for w in self.warnings)
def to_json(self) -> str:
"""Serialize to JSON string."""
return json.dumps(asdict(self), indent=2, default=str)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@classmethod
def from_json(cls, json_str: str) -> "InterviewState":
"""Deserialize from JSON string."""
data = json.loads(json_str)
return cls.from_dict(data)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "InterviewState":
"""Create from dictionary."""
# Handle nested types
return cls(
version=data.get("version", "1.0"),
session_id=data.get("session_id", str(uuid.uuid4())),
study_name=data.get("study_name", ""),
study_path=data.get("study_path", ""),
parent_study=data.get("parent_study"),
started_at=data.get("started_at", datetime.now().isoformat()),
last_updated=data.get("last_updated", datetime.now().isoformat()),
current_phase=data.get("current_phase", InterviewPhase.INTROSPECTION.value),
complexity=data.get("complexity", "simple"),
questions_answered=data.get("questions_answered", []),
questions_remaining=data.get("questions_remaining", []),
current_question_id=data.get("current_question_id"),
introspection=data.get("introspection", {}),
answers=data.get("answers", {}),
inferred_config=data.get("inferred_config", {}),
warnings=data.get("warnings", []),
warnings_acknowledged=data.get("warnings_acknowledged", []),
errors=data.get("errors", []),
blueprint=data.get("blueprint"),
)
def validate(self) -> List[str]:
"""Validate state, return list of errors."""
errors = []
if not self.session_id:
errors.append("Missing session_id")
if not self.study_name:
errors.append("Missing study_name")
try:
InterviewPhase.from_string(self.current_phase)
except ValueError:
errors.append(f"Invalid current_phase: {self.current_phase}")
if self.complexity not in ["simple", "moderate", "complex"]:
errors.append(f"Invalid complexity: {self.complexity}")
return errors
@dataclass
class StateSnapshot:
"""Snapshot of state for history/undo."""
timestamp: str
phase: str
questions_count: int
state_hash: str
file_path: str
class InterviewStateManager:
"""
Manages interview state persistence.
Handles:
- Save/load state to JSON
- Human-readable audit log (MD)
- State backup rotation
- History for undo/branch
"""
MAX_BACKUPS = 5
def __init__(self, study_path: Path):
"""
Initialize state manager.
Args:
study_path: Path to the study directory
"""
self.study_path = Path(study_path)
self.interview_dir = self.study_path / ".interview"
self.state_file = self.interview_dir / "interview_state.json"
self.log_file = self.interview_dir / "INTERVIEW_LOG.md"
self.backup_dir = self.interview_dir / "backups"
self.lock_file = self.interview_dir / ".lock"
# Ensure directories exist
self._ensure_directories()
def _ensure_directories(self) -> None:
"""Create necessary directories if they don't exist."""
self.interview_dir.mkdir(parents=True, exist_ok=True)
self.backup_dir.mkdir(exist_ok=True)
def _acquire_lock(self) -> bool:
"""Acquire lock file for concurrent access prevention."""
try:
if self.lock_file.exists():
# Check if lock is stale (older than 5 minutes)
mtime = self.lock_file.stat().st_mtime
age = datetime.now().timestamp() - mtime
if age > 300: # 5 minutes
self.lock_file.unlink()
else:
return False
self.lock_file.write_text(str(os.getpid()))
return True
except Exception:
return False
def _release_lock(self) -> None:
"""Release lock file."""
try:
if self.lock_file.exists():
self.lock_file.unlink()
except Exception:
pass
def exists(self) -> bool:
"""Check if a saved state exists."""
return self.state_file.exists()
def save_state(self, state: InterviewState) -> None:
"""
Persist current state to JSON.
Performs atomic write with backup rotation.
"""
if not self._acquire_lock():
raise RuntimeError("Could not acquire lock for state file")
try:
# Update timestamp
state.touch()
# Create backup if state file exists
if self.state_file.exists():
self._rotate_backups()
backup_name = f"state_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
shutil.copy(self.state_file, self.backup_dir / backup_name)
# Atomic write: write to temp file then rename
temp_file = self.state_file.with_suffix(".tmp")
temp_file.write_text(state.to_json(), encoding="utf-8")
temp_file.replace(self.state_file)
finally:
self._release_lock()
def _rotate_backups(self) -> None:
"""Keep only the most recent backups."""
backups = sorted(
self.backup_dir.glob("state_*.json"),
key=lambda p: p.stat().st_mtime,
reverse=True
)
# Remove old backups
for backup in backups[self.MAX_BACKUPS:]:
backup.unlink()
def load_state(self) -> Optional[InterviewState]:
"""
Load existing state if available.
Returns:
InterviewState if exists and valid, None otherwise
"""
if not self.state_file.exists():
return None
try:
json_str = self.state_file.read_text(encoding="utf-8")
state = InterviewState.from_json(json_str)
# Validate state
errors = state.validate()
if errors:
raise ValueError(f"Invalid state: {errors}")
return state
except (json.JSONDecodeError, ValueError) as e:
# Log error but don't crash
print(f"Warning: Could not load interview state: {e}")
return None
def append_log(self, entry: LogEntry) -> None:
"""
Add entry to human-readable audit log.
Creates log file with header if it doesn't exist.
"""
# Initialize log file if needed
if not self.log_file.exists():
header = self._create_log_header()
self.log_file.write_text(header, encoding="utf-8")
# Append entry
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(entry.to_markdown())
def _create_log_header(self) -> str:
"""Create header for new log file."""
return f"""# Interview Log
**Study**: {self.study_path.name}
**Started**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
This log records all questions and answers from the study interview process.
---
"""
def finalize_log(self, state: InterviewState) -> None:
"""Add final summary to log when interview completes."""
summary = f"""
## Interview Complete
**Completed**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Questions Answered**: {len(state.questions_answered)}
**Complexity**: {state.complexity}
### Summary
- **Problem**: {state.answers.get('problem_description', 'N/A')}
- **Objectives**: {len(state.answers.get('objectives', []))}
- **Constraints**: {len(state.answers.get('constraints', []))}
- **Design Variables**: {len(state.answers.get('design_variables', []))}
### Warnings Acknowledged
"""
for warning in state.warnings_acknowledged:
summary += f"- {warning}\n"
if not state.warnings_acknowledged:
summary += "- None\n"
summary += "\n---\n"
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(summary)
def get_history(self) -> List[StateSnapshot]:
"""
Get modification history for undo/branch.
Returns list of state snapshots from backups.
"""
snapshots = []
for backup in sorted(self.backup_dir.glob("state_*.json")):
try:
data = json.loads(backup.read_text(encoding="utf-8"))
snapshot = StateSnapshot(
timestamp=data.get("last_updated", "unknown"),
phase=data.get("current_phase", "unknown"),
questions_count=len(data.get("questions_answered", [])),
state_hash=str(hash(backup.read_text())),
file_path=str(backup),
)
snapshots.append(snapshot)
except Exception:
continue
return snapshots
def restore_from_backup(self, backup_path: str) -> Optional[InterviewState]:
"""Restore state from a backup file."""
backup = Path(backup_path)
if not backup.exists():
return None
try:
json_str = backup.read_text(encoding="utf-8")
return InterviewState.from_json(json_str)
except Exception:
return None
def delete_state(self) -> None:
"""Delete all interview state (for restart)."""
if self.state_file.exists():
self.state_file.unlink()
# Keep log file but add note
if self.log_file.exists():
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(f"\n## State Reset\n\n**Reset at**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n---\n\n")

View File

@@ -0,0 +1,747 @@
"""
Question Engine
This module manages question definitions, conditions, and dynamic options.
It handles:
- Loading question schemas from JSON
- Evaluating conditional logic
- Populating dynamic options from introspection
- Question ordering and flow control
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Any, Optional, Literal, Union
import json
import re
@dataclass
class ValidationRule:
"""Validation rule for a question answer."""
required: bool = False
min_length: Optional[int] = None
max_length: Optional[int] = None
min: Optional[float] = None
max: Optional[float] = None
min_selections: Optional[int] = None
max_selections: Optional[int] = None
pattern: Optional[str] = None
units: Optional[str] = None
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["ValidationRule"]:
"""Create from dictionary."""
if data is None:
return None
return cls(
required=data.get("required", False),
min_length=data.get("min_length"),
max_length=data.get("max_length"),
min=data.get("min"),
max=data.get("max"),
min_selections=data.get("min_selections"),
max_selections=data.get("max_selections"),
pattern=data.get("pattern"),
units=data.get("units"),
)
@dataclass
class QuestionOption:
"""Option for choice/multi_choice questions."""
value: Any
label: str
description: Optional[str] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QuestionOption":
"""Create from dictionary."""
return cls(
value=data["value"],
label=data["label"],
description=data.get("description"),
)
@dataclass
class QuestionCondition:
"""
Conditional logic for when to ask a question.
Supports:
- answered: field has been answered
- equals: field equals value
- contains: array field contains value
- greater_than: numeric comparison
- less_than: numeric comparison
- exists: field exists and is not None
- introspection_has: introspection data has field
- complexity_is: complexity level matches
- and/or/not: logical operators
"""
type: str
field: Optional[str] = None
value: Optional[Any] = None
condition: Optional["QuestionCondition"] = None # For 'not'
conditions: Optional[List["QuestionCondition"]] = None # For 'and'/'or'
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["QuestionCondition"]:
"""Create from dictionary."""
if data is None:
return None
condition = cls(
type=data["type"],
field=data.get("field"),
value=data.get("value"),
)
# Handle nested 'not' condition
if "condition" in data:
condition.condition = cls.from_dict(data["condition"])
# Handle nested 'and'/'or' conditions
if "conditions" in data:
condition.conditions = [
cls.from_dict(c) for c in data["conditions"]
]
return condition
@dataclass
class DynamicOptions:
"""Configuration for dynamic option population."""
type: str
source: str
filter: Optional[str] = None
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["DynamicOptions"]:
"""Create from dictionary."""
if data is None:
return None
return cls(
type=data["type"],
source=data["source"],
filter=data.get("filter"),
)
@dataclass
class DynamicContent:
"""Configuration for dynamic content in question text."""
type: str
source: str
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["DynamicContent"]:
"""Create from dictionary."""
if data is None:
return None
return cls(
type=data["type"],
source=data["source"],
)
@dataclass
class Question:
"""Represents a single interview question."""
id: str
category: str
text: str
question_type: Literal["text", "choice", "multi_choice", "numeric", "confirm", "parameter_select", "bounds"]
maps_to: str
help_text: Optional[str] = None
options: Optional[List[QuestionOption]] = None
default: Optional[Any] = None
validation: Optional[ValidationRule] = None
condition: Optional[QuestionCondition] = None
engineering_guidance: Optional[str] = None
dynamic_options: Optional[DynamicOptions] = None
dynamic_content: Optional[DynamicContent] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Question":
"""Create from dictionary."""
options = None
if data.get("options"):
options = [QuestionOption.from_dict(o) for o in data["options"]]
return cls(
id=data["id"],
category=data["category"],
text=data["text"],
question_type=data["question_type"],
maps_to=data["maps_to"],
help_text=data.get("help_text"),
options=options,
default=data.get("default"),
validation=ValidationRule.from_dict(data.get("validation")),
condition=QuestionCondition.from_dict(data.get("condition")),
engineering_guidance=data.get("engineering_guidance"),
dynamic_options=DynamicOptions.from_dict(data.get("dynamic_options")),
dynamic_content=DynamicContent.from_dict(data.get("dynamic_content")),
)
@dataclass
class QuestionCategory:
"""Category of related questions."""
id: str
name: str
phase: str
order: int
always_ask: bool = True
condition: Optional[QuestionCondition] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QuestionCategory":
"""Create from dictionary."""
return cls(
id=data["id"],
name=data["name"],
phase=data["phase"],
order=data["order"],
always_ask=data.get("always_ask", True),
condition=QuestionCondition.from_dict(data.get("condition")),
)
class QuestionEngine:
"""
Manages question definitions and flow logic.
Handles:
- Loading questions from JSON schema
- Evaluating conditions to determine next question
- Populating dynamic options from introspection
- Answer parsing and validation
"""
def __init__(self, schema_path: Optional[Path] = None):
"""
Initialize question engine.
Args:
schema_path: Path to question schema JSON. If None, uses default.
"""
if schema_path is None:
schema_path = Path(__file__).parent / "schemas" / "interview_questions.json"
self.schema_path = schema_path
self.schema: Dict[str, Any] = {}
self.categories: List[QuestionCategory] = []
self.questions: Dict[str, Question] = {}
self.questions_by_category: Dict[str, List[Question]] = {}
self._load_schema()
def _load_schema(self) -> None:
"""Load question schema from JSON file."""
if not self.schema_path.exists():
raise FileNotFoundError(f"Question schema not found: {self.schema_path}")
with open(self.schema_path, "r", encoding="utf-8") as f:
self.schema = json.load(f)
# Parse categories
self.categories = [
QuestionCategory.from_dict(c) for c in self.schema.get("categories", [])
]
self.categories.sort(key=lambda c: c.order)
# Parse questions
for q_data in self.schema.get("questions", []):
question = Question.from_dict(q_data)
self.questions[question.id] = question
# Organize by category
if question.category not in self.questions_by_category:
self.questions_by_category[question.category] = []
self.questions_by_category[question.category].append(question)
def get_all_questions(self) -> List[Question]:
"""Get all questions in order."""
result = []
for category in self.categories:
if category.id in self.questions_by_category:
result.extend(self.questions_by_category[category.id])
return result
def get_question(self, question_id: str) -> Optional[Question]:
"""Get a specific question by ID."""
return self.questions.get(question_id)
def get_next_question(
self,
state: "InterviewState",
introspection: Dict[str, Any]
) -> Optional[Question]:
"""
Determine the next question based on state and conditions.
Args:
state: Current interview state
introspection: Introspection results from model
Returns:
Next question to ask, or None if interview is complete
"""
answered_ids = {q["question_id"] for q in state.questions_answered}
# Go through categories in order
for category in self.categories:
# Check if category should be asked
if not self._should_ask_category(category, state, introspection):
continue
# Get questions in this category
category_questions = self.questions_by_category.get(category.id, [])
for question in category_questions:
# Skip if already answered
if question.id in answered_ids:
continue
# Check if question condition is met
if self._should_ask_question(question, state, introspection):
# Populate dynamic options if needed
return self._prepare_question(question, state, introspection)
# No more questions
return None
def _should_ask_category(
self,
category: QuestionCategory,
state: "InterviewState",
introspection: Dict[str, Any]
) -> bool:
"""Check if a category should be asked."""
if category.always_ask:
return True
if category.condition:
return self.evaluate_condition(category.condition, state, introspection)
return True
def _should_ask_question(
self,
question: Question,
state: "InterviewState",
introspection: Dict[str, Any]
) -> bool:
"""Check if a question should be asked."""
if question.condition is None:
return True
return self.evaluate_condition(question.condition, state, introspection)
def evaluate_condition(
self,
condition: QuestionCondition,
state: "InterviewState",
introspection: Dict[str, Any]
) -> bool:
"""
Evaluate if a condition is met.
Args:
condition: Condition to evaluate
state: Current interview state
introspection: Introspection results
Returns:
True if condition is met
"""
cond_type = condition.type
if cond_type == "answered":
return self._get_nested_value(state.answers, condition.field) is not None
elif cond_type == "equals":
actual = self._get_nested_value(state.answers, condition.field)
return actual == condition.value
elif cond_type == "contains":
actual = self._get_nested_value(state.answers, condition.field)
if isinstance(actual, list):
return condition.value in actual
return False
elif cond_type == "greater_than":
actual = self._get_nested_value(state.answers, condition.field)
if actual is not None and isinstance(actual, (int, float)):
return actual > condition.value
return False
elif cond_type == "less_than":
actual = self._get_nested_value(state.answers, condition.field)
if actual is not None and isinstance(actual, (int, float)):
return actual < condition.value
return False
elif cond_type == "exists":
actual = self._get_nested_value(state.answers, condition.field)
return actual is not None
elif cond_type == "introspection_has":
return condition.field in introspection
elif cond_type == "complexity_is":
expected = condition.value
if isinstance(expected, list):
return state.complexity in expected
return state.complexity == expected
elif cond_type == "and":
if condition.conditions:
return all(
self.evaluate_condition(c, state, introspection)
for c in condition.conditions
)
return True
elif cond_type == "or":
if condition.conditions:
return any(
self.evaluate_condition(c, state, introspection)
for c in condition.conditions
)
return False
elif cond_type == "not":
if condition.condition:
return not self.evaluate_condition(condition.condition, state, introspection)
return True
else:
# Unknown condition type
return True
def _get_nested_value(self, data: Dict[str, Any], path: str) -> Any:
"""
Get a value from nested dict using dot notation.
Supports array indexing: "objectives[0].goal"
"""
if not path:
return None
parts = re.split(r'\.|\[|\]', path)
parts = [p for p in parts if p] # Remove empty strings
current = data
for part in parts:
if current is None:
return None
if isinstance(current, dict):
current = current.get(part)
elif isinstance(current, list):
try:
idx = int(part)
if 0 <= idx < len(current):
current = current[idx]
else:
return None
except ValueError:
return None
else:
return None
return current
def _prepare_question(
self,
question: Question,
state: "InterviewState",
introspection: Dict[str, Any]
) -> Question:
"""
Prepare a question for presentation.
Populates dynamic options and content.
"""
# Create a copy to avoid mutating the original
import copy
prepared = copy.deepcopy(question)
# Populate dynamic options
if prepared.dynamic_options:
prepared.options = self._populate_dynamic_options(
prepared.dynamic_options, state, introspection
)
return prepared
def _populate_dynamic_options(
self,
dynamic: DynamicOptions,
state: "InterviewState",
introspection: Dict[str, Any]
) -> List[QuestionOption]:
"""Populate dynamic options from introspection data."""
options = []
if dynamic.type == "expressions":
# Get expressions from introspection
expressions = introspection.get("expressions", [])
# Apply filter if specified
if dynamic.filter == "design_variable_heuristics":
expressions = self._filter_design_variables(expressions)
elif dynamic.filter == "exclude_selected_dvs":
selected = [dv.get("parameter") for dv in state.answers.get("design_variables", [])]
expressions = [e for e in expressions if e.get("name") not in selected]
# Convert to options
for expr in expressions:
name = expr.get("name", "")
value = expr.get("value", 0)
options.append(QuestionOption(
value=name,
label=f"{name} (current: {value})",
description=expr.get("formula") if expr.get("formula") != str(value) else None,
))
return options
def _filter_design_variables(self, expressions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter expressions to likely design variables using heuristics."""
# High confidence patterns
high_patterns = [
r"thickness", r"width", r"height", r"diameter", r"radius",
r"length", r"depth", r"angle", r"fillet", r"chamfer",
r"rib_\w+", r"wall_\w+", r"flange_\w+"
]
# Medium confidence patterns
medium_patterns = [
r"dim_\w+", r"size_\w+", r"param_\w+", r"p\d+", r"var_\w+"
]
# Exclusion patterns
exclude_patterns = [
r"mesh_\w+", r"count_\w+", r"num_\w+", r"material\w*",
r"derived_\w+", r"calc_\w+", r"_result$", r"_output$"
]
def matches_any(name: str, patterns: List[str]) -> bool:
return any(re.search(p, name.lower()) for p in patterns)
# Score and filter
scored = []
for expr in expressions:
name = expr.get("name", "")
# Skip exclusions
if matches_any(name, exclude_patterns):
continue
# Skip if not a simple numeric value
value = expr.get("value")
if not isinstance(value, (int, float)):
continue
# Skip if it's a formula (computed value)
formula = expr.get("formula", "")
if formula and formula != str(value):
continue
# Score
score = 0
if matches_any(name, high_patterns):
score = 2
elif matches_any(name, medium_patterns):
score = 1
if score > 0 or len(name) > 2: # Include if named or matches pattern
scored.append((score, expr))
# Sort by score descending
scored.sort(key=lambda x: -x[0])
return [expr for _, expr in scored]
def validate_answer(
self,
answer: Any,
question: Question
) -> tuple[bool, Optional[str]]:
"""
Validate an answer against question rules.
Returns:
Tuple of (is_valid, error_message)
"""
if question.validation is None:
return True, None
validation = question.validation
# Required check
if validation.required:
if answer is None or answer == "" or answer == []:
return False, "This field is required"
# Skip further validation if empty and not required
if answer is None or answer == "":
return True, None
# Text length validation
if question.question_type == "text":
if validation.min_length and len(str(answer)) < validation.min_length:
return False, f"Answer must be at least {validation.min_length} characters"
if validation.max_length and len(str(answer)) > validation.max_length:
return False, f"Answer must be at most {validation.max_length} characters"
# Numeric validation
if question.question_type == "numeric":
try:
num = float(answer)
if validation.min is not None and num < validation.min:
return False, f"Value must be at least {validation.min}"
if validation.max is not None and num > validation.max:
return False, f"Value must be at most {validation.max}"
except (ValueError, TypeError):
return False, "Please enter a valid number"
# Multi-choice validation
if question.question_type in ["multi_choice", "parameter_select"]:
if isinstance(answer, list):
if validation.min_selections and len(answer) < validation.min_selections:
return False, f"Please select at least {validation.min_selections} option(s)"
if validation.max_selections and len(answer) > validation.max_selections:
return False, f"Please select at most {validation.max_selections} option(s)"
# Pattern validation
if validation.pattern:
if not re.match(validation.pattern, str(answer)):
return False, "Answer does not match required format"
return True, None
def parse_answer(
self,
raw_answer: str,
question: Question
) -> Any:
"""
Parse a raw answer string into the appropriate type.
Args:
raw_answer: Raw string answer from user
question: Question being answered
Returns:
Parsed answer value
"""
answer = raw_answer.strip()
if question.question_type == "text":
return answer
elif question.question_type == "numeric":
# Extract number, handling units
match = re.search(r"[-+]?\d*\.?\d+", answer)
if match:
return float(match.group())
return None
elif question.question_type == "confirm":
lower = answer.lower()
if lower in ["yes", "y", "true", "1", "ok", "sure", "confirm", "correct"]:
return True
elif lower in ["no", "n", "false", "0", "cancel", "incorrect"]:
return False
return None
elif question.question_type == "choice":
# Try matching by number
if answer.isdigit():
idx = int(answer) - 1
if question.options and 0 <= idx < len(question.options):
return question.options[idx].value
# Try matching by value or label
if question.options:
for opt in question.options:
if answer.lower() == str(opt.value).lower():
return opt.value
if answer.lower() == opt.label.lower():
return opt.value
# Fuzzy match
if answer.lower() in opt.label.lower():
return opt.value
return answer
elif question.question_type == "multi_choice":
# Parse comma/and separated values
parts = re.split(r"[,&]|\band\b", answer)
values = []
for part in parts:
part = part.strip()
if not part:
continue
# Try matching by number
if part.isdigit():
idx = int(part) - 1
if question.options and 0 <= idx < len(question.options):
values.append(question.options[idx].value)
continue
# Try matching by value or label
if question.options:
for opt in question.options:
if part.lower() == str(opt.value).lower():
values.append(opt.value)
break
if part.lower() == opt.label.lower():
values.append(opt.value)
break
if part.lower() in opt.label.lower():
values.append(opt.value)
break
return values if values else [answer]
elif question.question_type == "parameter_select":
# Similar to multi_choice but for parameters
parts = re.split(r"[,&]|\band\b", answer)
return [p.strip() for p in parts if p.strip()]
elif question.question_type == "bounds":
# Parse bounds like "2-10" or "2 to 10" or "min 2, max 10"
bounds = {}
# Try "min to max" format
match = re.search(r"(\d+\.?\d*)\s*(?:to|-)\s*(\d+\.?\d*)", answer)
if match:
bounds["min"] = float(match.group(1))
bounds["max"] = float(match.group(2))
return bounds
# Try "min X, max Y" format
min_match = re.search(r"min[:\s]+(\d+\.?\d*)", answer.lower())
max_match = re.search(r"max[:\s]+(\d+\.?\d*)", answer.lower())
if min_match:
bounds["min"] = float(min_match.group(1))
if max_match:
bounds["max"] = float(max_match.group(1))
return bounds if bounds else None
return answer
# Import InterviewState here to avoid circular imports
from .interview_state import InterviewState

View File

@@ -0,0 +1,213 @@
{
"version": "1.0",
"description": "Common optimization setup anti-patterns and their detection",
"patterns": [
{
"id": "mass_no_constraint",
"name": "Mass Minimization Without Constraints",
"description": "Minimizing mass without any structural constraints will result in zero-thickness (or zero-size) designs that are physically impossible",
"severity": "error",
"condition": {
"type": "and",
"conditions": [
{
"type": "or",
"conditions": [
{"type": "contains", "field": "objectives", "value": "minimize_mass"},
{"type": "contains", "field": "objectives", "value": "minimize_weight"}
]
},
{"type": "empty", "field": "constraints"}
]
},
"fix_suggestion": "Add at least one constraint: maximum stress, maximum displacement, or minimum frequency",
"auto_fix": null
},
{
"id": "modal_single_solution",
"name": "Modal Analysis with Single Solution Step",
"description": "When both static and modal analysis are needed, using only a single solution may miss computing one type of result",
"severity": "error",
"condition": {
"type": "and",
"conditions": [
{"type": "contains", "field": "analysis_types", "value": "modal"},
{"type": "contains", "field": "analysis_types", "value": "static"},
{"type": "equals", "field": "solve_all_solutions", "value": false}
]
},
"fix_suggestion": "Enable 'solve all solutions' to ensure both static and modal results are computed",
"auto_fix": {
"field": "solve_all_solutions",
"value": true
}
},
{
"id": "bounds_too_wide",
"name": "Design Variable Bounds Too Wide",
"description": "When bounds span more than 10x the range (max/min > 10), optimization may struggle to converge efficiently",
"severity": "warning",
"condition": {
"type": "any_of",
"field": "design_variables",
"check": {
"type": "ratio_greater_than",
"field": ["max_value", "min_value"],
"value": 10
}
},
"fix_suggestion": "Consider narrowing bounds based on engineering knowledge. Very wide bounds increase the search space exponentially.",
"auto_fix": null
},
{
"id": "stress_over_yield",
"name": "Stress Limit Exceeds Material Yield",
"description": "The specified stress constraint exceeds the material yield stress, which could allow plastic deformation",
"severity": "warning",
"condition": {
"type": "and",
"conditions": [
{"type": "exists", "field": "constraints.max_stress"},
{"type": "exists", "field": "introspection.material"},
{
"type": "greater_than",
"field": "constraints.max_stress",
"compare_to": "material.yield_stress_mpa"
}
]
},
"fix_suggestion": "The stress limit should typically be the yield stress divided by a safety factor (1.5-2.0 for structural applications)",
"auto_fix": null
},
{
"id": "conflicting_objectives",
"name": "Typically Conflicting Objectives",
"description": "The selected objectives are typically in conflict. This is not an error, but expect a trade-off Pareto front rather than a single optimal solution.",
"severity": "info",
"condition": {
"type": "or",
"conditions": [
{
"type": "and",
"conditions": [
{"type": "contains", "field": "objectives", "value": "minimize_mass"},
{"type": "contains", "field": "objectives", "value": "minimize_displacement"}
]
},
{
"type": "and",
"conditions": [
{"type": "contains", "field": "objectives", "value": "minimize_mass"},
{"type": "contains", "field": "objectives", "value": "maximize_frequency"}
]
}
]
},
"fix_suggestion": "Consider which objective is more important, or proceed with multi-objective optimization to explore trade-offs",
"auto_fix": null
},
{
"id": "too_many_objectives",
"name": "Too Many Objectives",
"description": "More than 3 objectives makes interpretation difficult and may not improve the optimization",
"severity": "warning",
"condition": {
"type": "count_greater_than",
"field": "objectives",
"value": 3
},
"fix_suggestion": "Consider reducing to 2-3 primary objectives. Additional goals can often be handled as constraints.",
"auto_fix": null
},
{
"id": "missing_stress_constraint",
"name": "Missing Stress Constraint",
"description": "Static analysis without a stress constraint may result in designs that fail structurally",
"severity": "warning",
"condition": {
"type": "and",
"conditions": [
{"type": "contains", "field": "analysis_types", "value": "static"},
{"type": "not_exists", "field": "constraints.max_stress"},
{
"type": "not",
"condition": {"type": "contains", "field": "objectives", "value": "minimize_stress"}
}
]
},
"fix_suggestion": "Add a stress constraint based on material yield stress and appropriate safety factor",
"auto_fix": null
},
{
"id": "too_few_trials",
"name": "Insufficient Trials for Design Space",
"description": "The number of trials may be too low for the number of design variables to adequately explore the design space",
"severity": "warning",
"condition": {
"type": "less_than",
"field": "n_trials",
"compare_to": {
"type": "multiply",
"field": "design_variable_count",
"value": 15
}
},
"fix_suggestion": "Rule of thumb: use at least 10-20 trials per design variable. Consider increasing trials.",
"auto_fix": null
},
{
"id": "infeasible_baseline",
"name": "Baseline Violates Constraints",
"description": "The nominal design already violates one or more constraints. The optimizer starts in the infeasible region.",
"severity": "warning",
"condition": {
"type": "exists",
"field": "baseline_violations"
},
"fix_suggestion": "Consider relaxing constraints or modifying the baseline design to start from a feasible point",
"auto_fix": null
},
{
"id": "no_design_variables",
"name": "No Design Variables Selected",
"description": "At least one design variable must be selected for optimization",
"severity": "error",
"condition": {
"type": "empty",
"field": "design_variables"
},
"fix_suggestion": "Select one or more parameters to vary during optimization",
"auto_fix": null
},
{
"id": "thermal_no_temperature",
"name": "Thermal Analysis Without Temperature Gradient",
"description": "Thermal analysis typically requires a temperature boundary condition or thermal load",
"severity": "warning",
"condition": {
"type": "and",
"conditions": [
{"type": "contains", "field": "analysis_types", "value": "thermal"},
{"type": "not_exists", "field": "introspection.thermal_bc"}
]
},
"fix_suggestion": "Verify thermal boundary conditions are defined in the simulation",
"auto_fix": null
},
{
"id": "single_dv_many_trials",
"name": "Single Variable with Many Trials",
"description": "For single-variable optimization, many trials may be inefficient. Consider using gradient-based methods.",
"severity": "info",
"condition": {
"type": "and",
"conditions": [
{"type": "count_equals", "field": "design_variables", "value": 1},
{"type": "greater_than", "field": "n_trials", "value": 50}
]
},
"fix_suggestion": "For single-variable problems, L-BFGS-B or golden section search may converge faster than sampling-based optimization",
"auto_fix": null
}
]
}

View File

@@ -0,0 +1,466 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"version": "1.0",
"description": "Interview questions for Atomizer study creation",
"categories": [
{
"id": "problem_definition",
"name": "Problem Definition",
"phase": "problem_definition",
"order": 1,
"always_ask": true
},
{
"id": "objectives",
"name": "Optimization Objectives",
"phase": "objectives",
"order": 2,
"always_ask": true
},
{
"id": "constraints",
"name": "Constraints & Limits",
"phase": "constraints",
"order": 3,
"always_ask": true
},
{
"id": "design_variables",
"name": "Design Variables",
"phase": "design_variables",
"order": 4,
"always_ask": true
},
{
"id": "physics_config",
"name": "Physics Configuration",
"phase": "design_variables",
"order": 5,
"condition": {
"type": "complexity_is",
"value": ["moderate", "complex"]
}
},
{
"id": "optimization_settings",
"name": "Optimization Settings",
"phase": "validation",
"order": 6,
"condition": {
"type": "complexity_is",
"value": ["moderate", "complex"]
}
},
{
"id": "validation",
"name": "Validation",
"phase": "validation",
"order": 7,
"always_ask": false
}
],
"questions": [
{
"id": "pd_01",
"category": "problem_definition",
"text": "What engineering problem are you trying to solve with this optimization?",
"help_text": "Describe the goal in engineering terms. For example: 'Reduce the weight of a bracket while maintaining structural integrity' or 'Tune the natural frequency to avoid resonance'.",
"question_type": "text",
"options": null,
"default": null,
"validation": {
"required": true,
"min_length": 10
},
"condition": null,
"maps_to": "problem_description",
"engineering_guidance": "A clear problem statement helps ensure the optimization setup matches your actual goals."
},
{
"id": "pd_02",
"category": "problem_definition",
"text": "What is the physical context of this component?",
"help_text": "Describe how this part is used. For example: 'Mounting bracket for an aircraft wing' or 'Support structure for a telescope mirror'.",
"question_type": "text",
"options": null,
"default": null,
"validation": {
"required": false
},
"condition": {
"type": "complexity_is",
"value": ["moderate", "complex"]
},
"maps_to": "physical_context",
"engineering_guidance": "Understanding the physical context helps validate constraint choices."
},
{
"id": "pd_03",
"category": "problem_definition",
"text": "What type of analysis does your model use?",
"help_text": "Select all analysis types that are set up in your simulation.",
"question_type": "multi_choice",
"options": [
{"value": "static", "label": "Static structural analysis"},
{"value": "modal", "label": "Modal/frequency analysis"},
{"value": "thermal", "label": "Thermal analysis"},
{"value": "coupled_thermal_structural", "label": "Coupled thermal-structural"},
{"value": "buckling", "label": "Buckling analysis"},
{"value": "nonlinear", "label": "Nonlinear analysis"}
],
"default": ["static"],
"validation": {
"required": true,
"min_selections": 1
},
"condition": null,
"maps_to": "analysis_types",
"engineering_guidance": "The analysis type determines which extractors and solution strategies are available."
},
{
"id": "obj_01",
"category": "objectives",
"text": "What is your primary optimization goal?",
"help_text": "Choose the main thing you want to optimize for.",
"question_type": "choice",
"options": [
{"value": "minimize_mass", "label": "Minimize mass/weight"},
{"value": "minimize_displacement", "label": "Minimize displacement (maximize stiffness)"},
{"value": "maximize_frequency", "label": "Maximize natural frequency"},
{"value": "minimize_stress", "label": "Minimize peak stress"},
{"value": "target_frequency", "label": "Target a specific frequency"},
{"value": "minimize_wavefront_error", "label": "Minimize wavefront error (optical)"},
{"value": "custom", "label": "Custom objective (I'll specify)"}
],
"default": null,
"validation": {
"required": true
},
"condition": null,
"maps_to": "objectives[0].goal",
"engineering_guidance": "Mass minimization requires at least one constraint (stress, displacement, or frequency) to avoid degenerating to zero-thickness designs."
},
{
"id": "obj_02",
"category": "objectives",
"text": "Do you have any secondary objectives?",
"help_text": "Select additional objectives if this is a multi-objective optimization. Leave empty for single-objective.",
"question_type": "multi_choice",
"options": [
{"value": "minimize_mass", "label": "Minimize mass/weight"},
{"value": "minimize_displacement", "label": "Minimize displacement"},
{"value": "maximize_frequency", "label": "Maximize frequency"},
{"value": "minimize_stress", "label": "Minimize stress"},
{"value": "none", "label": "No secondary objectives (single-objective)"}
],
"default": ["none"],
"validation": {
"required": true
},
"condition": null,
"maps_to": "objectives_secondary",
"engineering_guidance": "Multi-objective optimization produces a Pareto front of trade-off solutions. More than 3 objectives can make interpretation difficult."
},
{
"id": "obj_03",
"category": "objectives",
"text": "I've selected the following extractors for your objectives. Does this look correct?",
"help_text": "The extractor is the code that reads the physics results from the simulation. I've automatically selected based on your goals.",
"question_type": "confirm",
"options": null,
"default": true,
"validation": {
"required": true
},
"condition": null,
"maps_to": "extractors_confirmed",
"engineering_guidance": null,
"dynamic_content": {
"type": "extractor_summary",
"source": "inferred_config.extractors"
}
},
{
"id": "con_01",
"category": "constraints",
"text": "What is the maximum allowable stress?",
"help_text": "Enter the stress limit in MPa. This is typically based on material yield stress with a safety factor.",
"question_type": "numeric",
"options": null,
"default": null,
"validation": {
"required": true,
"min": 1,
"max": 10000,
"units": "MPa"
},
"condition": {
"type": "or",
"conditions": [
{"type": "contains", "field": "analysis_types", "value": "static"},
{"type": "equals", "field": "objectives[0].goal", "value": "minimize_mass"}
]
},
"maps_to": "constraints.max_stress",
"engineering_guidance": "For aluminum 6061-T6, yield stress is 276 MPa. A safety factor of 1.5 gives ~180 MPa limit."
},
{
"id": "con_02",
"category": "constraints",
"text": "What is the maximum allowable displacement?",
"help_text": "Enter the displacement limit. Include units (mm or in).",
"question_type": "numeric",
"options": null,
"default": null,
"validation": {
"required": false,
"min": 0,
"units": "mm"
},
"condition": {
"type": "or",
"conditions": [
{"type": "contains", "field": "analysis_types", "value": "static"},
{"type": "equals", "field": "objectives[0].goal", "value": "minimize_mass"}
]
},
"maps_to": "constraints.max_displacement",
"engineering_guidance": "Displacement limits often come from functional requirements - clearance, alignment, etc."
},
{
"id": "con_03",
"category": "constraints",
"text": "What is the minimum acceptable natural frequency?",
"help_text": "Enter the frequency limit in Hz.",
"question_type": "numeric",
"options": null,
"default": null,
"validation": {
"required": true,
"min": 0.1,
"units": "Hz"
},
"condition": {
"type": "contains",
"field": "analysis_types",
"value": "modal"
},
"maps_to": "constraints.min_frequency",
"engineering_guidance": "Typically set to avoid resonance with known excitation frequencies (motors, vibration sources)."
},
{
"id": "con_04",
"category": "constraints",
"text": "Do you have a mass budget (maximum allowed mass)?",
"help_text": "Enter the mass limit in kg, or skip if not applicable.",
"question_type": "numeric",
"options": null,
"default": null,
"validation": {
"required": false,
"min": 0,
"units": "kg"
},
"condition": {
"type": "not",
"condition": {
"type": "equals",
"field": "objectives[0].goal",
"value": "minimize_mass"
}
},
"maps_to": "constraints.max_mass",
"engineering_guidance": "A mass budget is often required when mass is not the primary objective."
},
{
"id": "con_05",
"category": "constraints",
"text": "How should constraints be handled?",
"help_text": "Hard constraints reject any design that violates them. Soft constraints allow violations but penalize the objective.",
"question_type": "choice",
"options": [
{"value": "hard", "label": "Hard constraints (reject violations)"},
{"value": "soft", "label": "Soft constraints (penalize violations)"},
{"value": "mixed", "label": "Mixed (I'll specify per constraint)"}
],
"default": "hard",
"validation": {
"required": true
},
"condition": null,
"maps_to": "constraint_handling",
"engineering_guidance": "Hard constraints are more conservative. Soft constraints allow exploration but may produce infeasible final designs."
},
{
"id": "dv_01",
"category": "design_variables",
"text": "Which parameters should be varied during optimization?",
"help_text": "Select from the detected expressions in your model, or type custom names.",
"question_type": "parameter_select",
"options": null,
"default": null,
"validation": {
"required": true,
"min_selections": 1,
"max_selections": 20
},
"condition": null,
"maps_to": "design_variables",
"engineering_guidance": "More design variables = larger search space. 3-6 is typical for efficient optimization.",
"dynamic_options": {
"type": "expressions",
"source": "introspection.expressions",
"filter": "design_variable_heuristics"
}
},
{
"id": "dv_02",
"category": "design_variables",
"text": "Please confirm or adjust the bounds for each design variable.",
"help_text": "For each parameter, verify the min and max values are appropriate.",
"question_type": "bounds",
"options": null,
"default": null,
"validation": {
"required": true
},
"condition": null,
"maps_to": "design_variable_bounds",
"engineering_guidance": "Bounds should be physically meaningful. Too wide (>10x range) may slow convergence.",
"dynamic_content": {
"type": "bounds_table",
"source": "answers.design_variables"
}
},
{
"id": "dv_03",
"category": "design_variables",
"text": "Are there any parameters that should remain fixed (not optimized)?",
"help_text": "Select parameters that should keep their current values.",
"question_type": "parameter_select",
"options": null,
"default": null,
"validation": {
"required": false
},
"condition": {
"type": "complexity_is",
"value": ["complex"]
},
"maps_to": "fixed_parameters",
"engineering_guidance": "Fix parameters that have regulatory or interface constraints.",
"dynamic_options": {
"type": "expressions",
"source": "introspection.expressions",
"filter": "exclude_selected_dvs"
}
},
{
"id": "phys_01",
"category": "physics_config",
"text": "What element type does your mesh use for stress extraction?",
"help_text": "This affects which stress extractor is used.",
"question_type": "choice",
"options": [
{"value": "solid", "label": "Solid elements (CTETRA, CHEXA, CPENTA)"},
{"value": "shell", "label": "Shell elements (CQUAD4, CTRIA3)"},
{"value": "beam", "label": "Beam elements (CBAR, CBEAM)"},
{"value": "mixed", "label": "Mixed element types"},
{"value": "auto", "label": "Auto-detect from model"}
],
"default": "auto",
"validation": {
"required": true
},
"condition": {
"type": "or",
"conditions": [
{"type": "equals", "field": "objectives[0].goal", "value": "minimize_stress"},
{"type": "exists", "field": "constraints.max_stress"}
]
},
"maps_to": "element_type",
"engineering_guidance": null
},
{
"id": "phys_02",
"category": "physics_config",
"text": "Your model has multiple solution steps. Should all solutions be evaluated?",
"help_text": "Some models have static + modal, or multiple load cases.",
"question_type": "confirm",
"options": null,
"default": true,
"validation": {
"required": true
},
"condition": {
"type": "introspection_has",
"field": "multiple_solutions"
},
"maps_to": "solve_all_solutions",
"engineering_guidance": "If you have both static and modal analysis, both should typically be solved to get all required outputs."
},
{
"id": "opt_01",
"category": "optimization_settings",
"text": "How many trials should be run?",
"help_text": "More trials = better exploration but longer runtime.",
"question_type": "choice",
"options": [
{"value": 50, "label": "50 trials (~quick exploration)"},
{"value": 100, "label": "100 trials (standard)"},
{"value": 200, "label": "200 trials (thorough)"},
{"value": 500, "label": "500 trials (comprehensive)"},
{"value": "custom", "label": "Custom number"}
],
"default": 100,
"validation": {
"required": true
},
"condition": {
"type": "complexity_is",
"value": ["moderate", "complex"]
},
"maps_to": "n_trials",
"engineering_guidance": "Rule of thumb: 10-20 trials per design variable minimum. Complex multi-objective needs more."
},
{
"id": "opt_02",
"category": "optimization_settings",
"text": "Would you like to enable neural acceleration?",
"help_text": "Neural surrogates can speed up optimization by reducing FEA calls. Requires initial training trials.",
"question_type": "confirm",
"options": null,
"default": false,
"validation": {
"required": true
},
"condition": {
"type": "and",
"conditions": [
{"type": "greater_than", "field": "n_trials", "value": 100},
{"type": "complexity_is", "value": ["moderate", "complex"]}
]
},
"maps_to": "use_neural_acceleration",
"engineering_guidance": "Neural acceleration is most effective for expensive simulations (>30 sec/eval) with 100+ trials."
},
{
"id": "val_01",
"category": "validation",
"text": "Would you like to run a baseline validation before starting?",
"help_text": "This runs a single FEA solve to verify extractors work correctly with nominal parameters.",
"question_type": "confirm",
"options": null,
"default": true,
"validation": {
"required": true
},
"condition": null,
"maps_to": "run_baseline_validation",
"engineering_guidance": "Highly recommended. Catches configuration errors before wasting optimization time."
}
]
}

View File

@@ -0,0 +1,262 @@
{
"version": "1.0",
"description": "Common engineering materials database for validation and guidance",
"materials": [
{
"id": "al_6061_t6",
"names": ["aluminum 6061-t6", "al6061-t6", "6061-t6", "al 6061", "6061 aluminum", "aa6061-t6"],
"category": "aluminum",
"properties": {
"density_kg_m3": 2700,
"yield_stress_mpa": 276,
"ultimate_stress_mpa": 310,
"elastic_modulus_gpa": 68.9,
"shear_modulus_gpa": 26,
"poisson_ratio": 0.33,
"fatigue_limit_mpa": 96,
"thermal_conductivity_w_mk": 167,
"cte_per_k": 23.6e-6
},
"notes": "Common aerospace aluminum alloy. Good machinability, corrosion resistance.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 3.0,
"aerospace": 2.0
}
},
{
"id": "al_2024_t3",
"names": ["aluminum 2024-t3", "al2024-t3", "2024-t3", "al 2024", "2024 aluminum"],
"category": "aluminum",
"properties": {
"density_kg_m3": 2780,
"yield_stress_mpa": 345,
"ultimate_stress_mpa": 483,
"elastic_modulus_gpa": 73.1,
"shear_modulus_gpa": 28,
"poisson_ratio": 0.33,
"fatigue_limit_mpa": 138,
"thermal_conductivity_w_mk": 121,
"cte_per_k": 23.2e-6
},
"notes": "High-strength aerospace aluminum. Excellent fatigue resistance.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5,
"aerospace": 2.0
}
},
{
"id": "al_7075_t6",
"names": ["aluminum 7075-t6", "al7075-t6", "7075-t6", "al 7075", "7075 aluminum"],
"category": "aluminum",
"properties": {
"density_kg_m3": 2810,
"yield_stress_mpa": 503,
"ultimate_stress_mpa": 572,
"elastic_modulus_gpa": 71.7,
"shear_modulus_gpa": 26.9,
"poisson_ratio": 0.33,
"fatigue_limit_mpa": 159,
"thermal_conductivity_w_mk": 130,
"cte_per_k": 23.4e-6
},
"notes": "Very high strength aluminum. Used in aircraft structures.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5,
"aerospace": 2.0
}
},
{
"id": "steel_304",
"names": ["stainless steel 304", "ss304", "304 stainless", "304ss", "aisi 304"],
"category": "steel",
"properties": {
"density_kg_m3": 8000,
"yield_stress_mpa": 215,
"ultimate_stress_mpa": 505,
"elastic_modulus_gpa": 193,
"shear_modulus_gpa": 77,
"poisson_ratio": 0.29,
"fatigue_limit_mpa": 240,
"thermal_conductivity_w_mk": 16.2,
"cte_per_k": 17.3e-6
},
"notes": "Austenitic stainless steel. Excellent corrosion resistance.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5
}
},
{
"id": "steel_316",
"names": ["stainless steel 316", "ss316", "316 stainless", "316ss", "aisi 316"],
"category": "steel",
"properties": {
"density_kg_m3": 8000,
"yield_stress_mpa": 290,
"ultimate_stress_mpa": 580,
"elastic_modulus_gpa": 193,
"shear_modulus_gpa": 77,
"poisson_ratio": 0.29,
"fatigue_limit_mpa": 260,
"thermal_conductivity_w_mk": 16.3,
"cte_per_k": 16e-6
},
"notes": "Marine grade stainless steel. Superior corrosion resistance to 304.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5
}
},
{
"id": "steel_4340",
"names": ["steel 4340", "4340 steel", "aisi 4340", "4340"],
"category": "steel",
"properties": {
"density_kg_m3": 7850,
"yield_stress_mpa": 862,
"ultimate_stress_mpa": 1034,
"elastic_modulus_gpa": 205,
"shear_modulus_gpa": 80,
"poisson_ratio": 0.29,
"fatigue_limit_mpa": 480,
"thermal_conductivity_w_mk": 44.5,
"cte_per_k": 12.3e-6
},
"notes": "High strength alloy steel. Heat treatable.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5
}
},
{
"id": "steel_a36",
"names": ["steel a36", "a36 steel", "astm a36", "a36", "structural steel"],
"category": "steel",
"properties": {
"density_kg_m3": 7850,
"yield_stress_mpa": 250,
"ultimate_stress_mpa": 400,
"elastic_modulus_gpa": 200,
"shear_modulus_gpa": 79,
"poisson_ratio": 0.26,
"fatigue_limit_mpa": 160,
"thermal_conductivity_w_mk": 51.9,
"cte_per_k": 11.7e-6
},
"notes": "Common structural steel. Low cost, good weldability.",
"recommended_safety_factors": {
"static": 1.67,
"fatigue": 3.0
}
},
{
"id": "ti_6al_4v",
"names": ["titanium 6al-4v", "ti-6al-4v", "ti64", "ti 6-4", "grade 5 titanium"],
"category": "titanium",
"properties": {
"density_kg_m3": 4430,
"yield_stress_mpa": 880,
"ultimate_stress_mpa": 950,
"elastic_modulus_gpa": 113.8,
"shear_modulus_gpa": 44,
"poisson_ratio": 0.342,
"fatigue_limit_mpa": 500,
"thermal_conductivity_w_mk": 6.7,
"cte_per_k": 8.6e-6
},
"notes": "Common aerospace titanium alloy. Excellent strength-to-weight ratio.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5,
"aerospace": 2.0
}
},
{
"id": "ti_cp_grade2",
"names": ["titanium grade 2", "cp titanium", "commercially pure titanium", "ti grade 2"],
"category": "titanium",
"properties": {
"density_kg_m3": 4510,
"yield_stress_mpa": 275,
"ultimate_stress_mpa": 345,
"elastic_modulus_gpa": 105,
"shear_modulus_gpa": 40,
"poisson_ratio": 0.37,
"fatigue_limit_mpa": 160,
"thermal_conductivity_w_mk": 16.4,
"cte_per_k": 8.4e-6
},
"notes": "Commercially pure titanium. Good corrosion resistance, formability.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5
}
},
{
"id": "inconel_718",
"names": ["inconel 718", "in718", "alloy 718", "nickel 718"],
"category": "nickel_alloy",
"properties": {
"density_kg_m3": 8190,
"yield_stress_mpa": 1100,
"ultimate_stress_mpa": 1375,
"elastic_modulus_gpa": 200,
"shear_modulus_gpa": 77,
"poisson_ratio": 0.29,
"fatigue_limit_mpa": 600,
"thermal_conductivity_w_mk": 11.4,
"cte_per_k": 13e-6
},
"notes": "Nickel superalloy. Excellent high-temperature properties.",
"recommended_safety_factors": {
"static": 1.5,
"fatigue": 2.5
}
},
{
"id": "zerodur",
"names": ["zerodur", "schott zerodur", "zerodur glass ceramic"],
"category": "glass_ceramic",
"properties": {
"density_kg_m3": 2530,
"yield_stress_mpa": null,
"ultimate_stress_mpa": 50,
"elastic_modulus_gpa": 90.3,
"shear_modulus_gpa": 36.3,
"poisson_ratio": 0.24,
"fatigue_limit_mpa": null,
"thermal_conductivity_w_mk": 1.46,
"cte_per_k": 0.05e-6
},
"notes": "Ultra-low expansion glass ceramic for optics. Brittle - tensile stress limit only.",
"recommended_safety_factors": {
"static": 4.0,
"optical": 8.0
}
},
{
"id": "cfrp_unidirectional",
"names": ["carbon fiber", "cfrp", "carbon fiber reinforced polymer", "cfrp ud"],
"category": "composite",
"properties": {
"density_kg_m3": 1600,
"yield_stress_mpa": null,
"ultimate_stress_mpa": 1500,
"elastic_modulus_gpa": 135,
"shear_modulus_gpa": 5,
"poisson_ratio": 0.3,
"fatigue_limit_mpa": 600,
"thermal_conductivity_w_mk": 5,
"cte_per_k": -0.5e-6
},
"notes": "Unidirectional carbon fiber. Properties in fiber direction. Highly anisotropic.",
"recommended_safety_factors": {
"static": 2.0,
"fatigue": 3.0
}
}
]
}

View File

@@ -0,0 +1,558 @@
"""
Study Blueprint
Data structures for the study blueprint - the validated configuration
ready for study generation.
"""
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Any, Optional
import json
@dataclass
class DesignVariable:
"""Design variable specification."""
parameter: str
current_value: float
min_value: float
max_value: float
units: Optional[str] = None
is_integer: bool = False
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_config_format(self) -> Dict[str, Any]:
"""Convert to optimization_config.json format."""
return {
"expression_name": self.parameter,
"bounds": [self.min_value, self.max_value],
"units": self.units or "",
"is_integer": self.is_integer,
}
@dataclass
class Objective:
"""Optimization objective specification."""
name: str
goal: str # minimize, maximize, target
extractor: str # Extractor ID (e.g., "E1", "E4")
extractor_name: Optional[str] = None
extractor_params: Optional[Dict[str, Any]] = None
weight: float = 1.0
target_value: Optional[float] = None # For target objectives
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_config_format(self) -> Dict[str, Any]:
"""Convert to optimization_config.json format."""
config = {
"name": self.name,
"type": self.goal,
"extractor": self.extractor,
"weight": self.weight,
}
if self.extractor_params:
config["extractor_params"] = self.extractor_params
if self.target_value is not None:
config["target"] = self.target_value
return config
@dataclass
class Constraint:
"""Optimization constraint specification."""
name: str
constraint_type: str # max, min
threshold: float
extractor: str # Extractor ID
extractor_name: Optional[str] = None
extractor_params: Optional[Dict[str, Any]] = None
is_hard: bool = True
penalty_weight: float = 1000.0 # For soft constraints
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_config_format(self) -> Dict[str, Any]:
"""Convert to optimization_config.json format."""
config = {
"name": self.name,
"type": self.constraint_type,
"threshold": self.threshold,
"extractor": self.extractor,
"hard": self.is_hard,
}
if self.extractor_params:
config["extractor_params"] = self.extractor_params
if not self.is_hard:
config["penalty_weight"] = self.penalty_weight
return config
@dataclass
class StudyBlueprint:
"""
Complete study blueprint ready for generation.
This is the validated configuration that will be used to create
the study files (optimization_config.json, run_optimization.py, etc.)
"""
# Study metadata
study_name: str
study_description: str = ""
interview_session_id: str = ""
# Model paths
model_path: str = ""
sim_path: str = ""
fem_path: str = ""
# Design space
design_variables: List[DesignVariable] = field(default_factory=list)
# Optimization goals
objectives: List[Objective] = field(default_factory=list)
constraints: List[Constraint] = field(default_factory=list)
# Optimization settings
protocol: str = "protocol_10_single" # or "protocol_11_multi"
n_trials: int = 100
sampler: str = "TPE"
use_neural_acceleration: bool = False
# Solver settings
solver_config: Dict[str, Any] = field(default_factory=dict)
solve_all_solutions: bool = True
# Extractors configuration
extractors_config: Dict[str, Any] = field(default_factory=dict)
# Validation
warnings_acknowledged: List[str] = field(default_factory=list)
baseline_validated: bool = False
baseline_results: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"study_name": self.study_name,
"study_description": self.study_description,
"interview_session_id": self.interview_session_id,
"model_path": self.model_path,
"sim_path": self.sim_path,
"fem_path": self.fem_path,
"design_variables": [dv.to_dict() for dv in self.design_variables],
"objectives": [obj.to_dict() for obj in self.objectives],
"constraints": [con.to_dict() for con in self.constraints],
"protocol": self.protocol,
"n_trials": self.n_trials,
"sampler": self.sampler,
"use_neural_acceleration": self.use_neural_acceleration,
"solver_config": self.solver_config,
"solve_all_solutions": self.solve_all_solutions,
"extractors_config": self.extractors_config,
"warnings_acknowledged": self.warnings_acknowledged,
"baseline_validated": self.baseline_validated,
"baseline_results": self.baseline_results,
}
def to_json(self) -> str:
"""Serialize to JSON string."""
return json.dumps(self.to_dict(), indent=2)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "StudyBlueprint":
"""Create from dictionary."""
design_variables = [
DesignVariable(**dv) for dv in data.get("design_variables", [])
]
objectives = [
Objective(**obj) for obj in data.get("objectives", [])
]
constraints = [
Constraint(**con) for con in data.get("constraints", [])
]
return cls(
study_name=data.get("study_name", ""),
study_description=data.get("study_description", ""),
interview_session_id=data.get("interview_session_id", ""),
model_path=data.get("model_path", ""),
sim_path=data.get("sim_path", ""),
fem_path=data.get("fem_path", ""),
design_variables=design_variables,
objectives=objectives,
constraints=constraints,
protocol=data.get("protocol", "protocol_10_single"),
n_trials=data.get("n_trials", 100),
sampler=data.get("sampler", "TPE"),
use_neural_acceleration=data.get("use_neural_acceleration", False),
solver_config=data.get("solver_config", {}),
solve_all_solutions=data.get("solve_all_solutions", True),
extractors_config=data.get("extractors_config", {}),
warnings_acknowledged=data.get("warnings_acknowledged", []),
baseline_validated=data.get("baseline_validated", False),
baseline_results=data.get("baseline_results"),
)
def to_config_json(self) -> Dict[str, Any]:
"""
Convert to optimization_config.json format.
This is the format expected by the optimization runner.
"""
config = {
"study_name": self.study_name,
"description": self.study_description,
"version": "2.0",
"model": {
"part_file": self.model_path,
"sim_file": self.sim_path,
"fem_file": self.fem_path,
},
"design_variables": [
dv.to_config_format() for dv in self.design_variables
],
"objectives": [
obj.to_config_format() for obj in self.objectives
],
"constraints": [
con.to_config_format() for con in self.constraints
],
"optimization": {
"n_trials": self.n_trials,
"sampler": self.sampler,
"protocol": self.protocol,
"neural_acceleration": self.use_neural_acceleration,
},
"solver": {
"solve_all": self.solve_all_solutions,
**self.solver_config,
},
"extractors": self.extractors_config,
"_metadata": {
"interview_session_id": self.interview_session_id,
"warnings_acknowledged": self.warnings_acknowledged,
"baseline_validated": self.baseline_validated,
}
}
return config
def to_markdown(self) -> str:
"""Generate human-readable markdown summary."""
lines = []
lines.append(f"# Study Blueprint: {self.study_name}")
lines.append("")
if self.study_description:
lines.append(f"**Description**: {self.study_description}")
lines.append("")
# Design Variables
lines.append(f"## Design Variables ({len(self.design_variables)})")
lines.append("")
lines.append("| Parameter | Current | Min | Max | Units |")
lines.append("|-----------|---------|-----|-----|-------|")
for dv in self.design_variables:
lines.append(f"| {dv.parameter} | {dv.current_value} | {dv.min_value} | {dv.max_value} | {dv.units or '-'} |")
lines.append("")
# Objectives
lines.append(f"## Objectives ({len(self.objectives)})")
lines.append("")
lines.append("| Name | Goal | Extractor | Weight |")
lines.append("|------|------|-----------|--------|")
for obj in self.objectives:
lines.append(f"| {obj.name} | {obj.goal} | {obj.extractor} | {obj.weight} |")
lines.append("")
# Constraints
if self.constraints:
lines.append(f"## Constraints ({len(self.constraints)})")
lines.append("")
lines.append("| Name | Type | Threshold | Extractor | Hard? |")
lines.append("|------|------|-----------|-----------|-------|")
for con in self.constraints:
op = "<=" if con.constraint_type == "max" else ">="
lines.append(f"| {con.name} | {op} | {con.threshold} | {con.extractor} | {'Yes' if con.is_hard else 'No'} |")
lines.append("")
# Settings
lines.append("## Optimization Settings")
lines.append("")
lines.append(f"- **Protocol**: {self.protocol}")
lines.append(f"- **Trials**: {self.n_trials}")
lines.append(f"- **Sampler**: {self.sampler}")
lines.append(f"- **Neural Acceleration**: {'Enabled' if self.use_neural_acceleration else 'Disabled'}")
lines.append("")
# Validation
lines.append("## Validation")
lines.append("")
lines.append(f"- **Baseline Validated**: {'Yes' if self.baseline_validated else 'No'}")
if self.warnings_acknowledged:
lines.append(f"- **Warnings Acknowledged**: {len(self.warnings_acknowledged)}")
for w in self.warnings_acknowledged:
lines.append(f" - {w}")
lines.append("")
return "\n".join(lines)
def validate(self) -> List[str]:
"""
Validate blueprint completeness.
Returns:
List of validation errors (empty if valid)
"""
errors = []
if not self.study_name:
errors.append("Study name is required")
if not self.design_variables:
errors.append("At least one design variable is required")
if not self.objectives:
errors.append("At least one objective is required")
for dv in self.design_variables:
if dv.min_value >= dv.max_value:
errors.append(f"Invalid bounds for {dv.parameter}: min >= max")
return errors
def is_multi_objective(self) -> bool:
"""Check if this is a multi-objective study."""
return len(self.objectives) > 1
def get_objective_count(self) -> int:
"""Get number of objectives."""
return len(self.objectives)
def get_constraint_count(self) -> int:
"""Get number of constraints."""
return len(self.constraints)
def get_design_variable_count(self) -> int:
"""Get number of design variables."""
return len(self.design_variables)
class BlueprintBuilder:
"""
Helper class for building StudyBlueprint from interview state.
"""
def __init__(self):
"""Initialize builder."""
from .interview_intelligence import InterviewIntelligence
self.intelligence = InterviewIntelligence()
def from_interview_state(
self,
state: "InterviewState",
introspection: Optional[Dict[str, Any]] = None
) -> StudyBlueprint:
"""
Build StudyBlueprint from completed interview state.
Args:
state: Completed interview state
introspection: Optional introspection results
Returns:
StudyBlueprint ready for generation
"""
answers = state.answers
intro = introspection or state.introspection
# Build design variables
design_variables = []
for dv_data in answers.get("design_variables", []):
if isinstance(dv_data, dict):
dv = DesignVariable(
parameter=dv_data.get("parameter", ""),
current_value=dv_data.get("current_value", 0),
min_value=dv_data.get("min_value", 0),
max_value=dv_data.get("max_value", 1),
units=dv_data.get("units"),
is_integer=dv_data.get("is_integer", False),
)
design_variables.append(dv)
elif isinstance(dv_data, str):
# Just a parameter name - look up in introspection
expr = self._find_expression(dv_data, intro.get("expressions", []))
if expr:
value = expr.get("value", 0)
dv = DesignVariable(
parameter=dv_data,
current_value=value,
min_value=value * 0.5 if value > 0 else value * 1.5,
max_value=value * 1.5 if value > 0 else value * 0.5,
)
design_variables.append(dv)
# Build objectives
objectives = []
primary_goal = answers.get("objectives", [{}])
if isinstance(primary_goal, list) and primary_goal:
primary = primary_goal[0] if isinstance(primary_goal[0], dict) else {"goal": primary_goal[0]}
else:
primary = {"goal": str(primary_goal)}
# Map to extractor
extractor_sel = self.intelligence.extractor_mapper.map_goal_to_extractor(
primary.get("goal", ""),
intro
)
objectives.append(Objective(
name=primary.get("name", "primary_objective"),
goal=self._normalize_goal(primary.get("goal", "")),
extractor=extractor_sel.extractor_id,
extractor_name=extractor_sel.extractor_name,
extractor_params=extractor_sel.params,
weight=primary.get("weight", 1.0),
))
# Add secondary objectives
secondary = answers.get("objectives_secondary", [])
for sec_goal in secondary:
if sec_goal == "none" or not sec_goal:
continue
sec_sel = self.intelligence.extractor_mapper.map_goal_to_extractor(
sec_goal, intro
)
objectives.append(Objective(
name=f"secondary_{sec_goal}",
goal=self._normalize_goal(sec_goal),
extractor=sec_sel.extractor_id,
extractor_name=sec_sel.extractor_name,
extractor_params=sec_sel.params,
weight=0.5, # Default lower weight for secondary
))
# Build constraints
constraints = []
constraint_answers = answers.get("constraints", {})
constraint_handling = answers.get("constraint_handling", "hard")
if "max_stress" in constraint_answers and constraint_answers["max_stress"]:
stress_sel = self.intelligence.extractor_mapper.map_constraint_to_extractor("stress", intro)
constraints.append(Constraint(
name="max_stress",
constraint_type="max",
threshold=constraint_answers["max_stress"],
extractor=stress_sel.extractor_id,
extractor_name=stress_sel.extractor_name,
extractor_params=stress_sel.params,
is_hard=constraint_handling != "soft",
))
if "max_displacement" in constraint_answers and constraint_answers["max_displacement"]:
disp_sel = self.intelligence.extractor_mapper.map_constraint_to_extractor("displacement", intro)
constraints.append(Constraint(
name="max_displacement",
constraint_type="max",
threshold=constraint_answers["max_displacement"],
extractor=disp_sel.extractor_id,
extractor_name=disp_sel.extractor_name,
extractor_params=disp_sel.params,
is_hard=constraint_handling != "soft",
))
if "min_frequency" in constraint_answers and constraint_answers["min_frequency"]:
freq_sel = self.intelligence.extractor_mapper.map_constraint_to_extractor("frequency", intro)
constraints.append(Constraint(
name="min_frequency",
constraint_type="min",
threshold=constraint_answers["min_frequency"],
extractor=freq_sel.extractor_id,
extractor_name=freq_sel.extractor_name,
extractor_params=freq_sel.params,
is_hard=constraint_handling != "soft",
))
if "max_mass" in constraint_answers and constraint_answers["max_mass"]:
mass_sel = self.intelligence.extractor_mapper.map_constraint_to_extractor("mass", intro)
constraints.append(Constraint(
name="max_mass",
constraint_type="max",
threshold=constraint_answers["max_mass"],
extractor=mass_sel.extractor_id,
extractor_name=mass_sel.extractor_name,
is_hard=constraint_handling != "soft",
))
# Determine protocol
protocol = "protocol_11_multi" if len(objectives) > 1 else "protocol_10_single"
# Get settings
n_trials = answers.get("n_trials", 100)
if n_trials == "custom":
n_trials = 100 # Default
# Build blueprint
blueprint = StudyBlueprint(
study_name=state.study_name,
study_description=answers.get("problem_description", ""),
interview_session_id=state.session_id,
model_path=intro.get("part_file", ""),
sim_path=intro.get("sim_file", ""),
fem_path=intro.get("fem_file", ""),
design_variables=design_variables,
objectives=objectives,
constraints=constraints,
protocol=protocol,
n_trials=int(n_trials) if isinstance(n_trials, (int, float)) else 100,
sampler=self.intelligence.suggest_sampler(len(objectives), len(design_variables)),
use_neural_acceleration=answers.get("use_neural_acceleration", False),
solve_all_solutions=answers.get("solve_all_solutions", True),
warnings_acknowledged=state.warnings_acknowledged,
baseline_validated=answers.get("run_baseline_validation", False),
)
return blueprint
def _find_expression(self, name: str, expressions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Find expression by name."""
for expr in expressions:
if expr.get("name") == name:
return expr
return None
def _normalize_goal(self, goal: str) -> str:
"""Normalize goal string to standard format."""
goal_lower = goal.lower()
if "minimize" in goal_lower or "reduce" in goal_lower:
return "minimize"
elif "maximize" in goal_lower or "increase" in goal_lower:
return "maximize"
elif "target" in goal_lower:
return "target"
else:
return goal
# Import for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .interview_state import InterviewState

View File

@@ -0,0 +1,589 @@
"""
Study Interview Engine
Main orchestrator for the interview process.
Coordinates question flow, state management, validation, and blueprint generation.
"""
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Literal
import uuid
from .interview_state import (
InterviewState,
InterviewPhase,
InterviewStateManager,
AnsweredQuestion,
LogEntry,
)
from .question_engine import QuestionEngine, Question
from .interview_presenter import InterviewPresenter, ClaudePresenter
from .engineering_validator import EngineeringValidator, ValidationResult, AntiPattern
from .interview_intelligence import InterviewIntelligence
from .study_blueprint import StudyBlueprint, BlueprintBuilder
@dataclass
class InterviewSession:
"""Represents an active interview session."""
session_id: str
study_name: str
study_path: Path
started_at: datetime
current_phase: InterviewPhase
introspection: Dict[str, Any]
is_complete: bool = False
is_resumed: bool = False
@dataclass
class NextAction:
"""What should happen after processing an answer."""
action_type: Literal["ask_question", "show_summary", "validate", "generate", "error", "confirm_warning"]
question: Optional[Question] = None
message: Optional[str] = None
warnings: List[str] = field(default_factory=list)
blueprint: Optional[StudyBlueprint] = None
anti_patterns: List[AntiPattern] = field(default_factory=list)
class StudyInterviewEngine:
"""
Main orchestrator for study interviews.
Manages the complete interview lifecycle:
1. Start or resume interview
2. Present questions via presenter
3. Process answers with validation
4. Generate blueprint for review
5. Handle modifications
6. Coordinate study generation
"""
def __init__(
self,
study_path: Path,
presenter: Optional[InterviewPresenter] = None
):
"""
Initialize interview engine.
Args:
study_path: Path to the study directory
presenter: Presentation layer (defaults to ClaudePresenter)
"""
self.study_path = Path(study_path)
self.presenter = presenter or ClaudePresenter()
self.state_manager = InterviewStateManager(self.study_path)
self.question_engine = QuestionEngine()
self.validator = EngineeringValidator()
self.intelligence = InterviewIntelligence()
self.blueprint_builder = BlueprintBuilder()
# Current state
self.state: Optional[InterviewState] = None
self.introspection: Dict[str, Any] = {}
self.current_question: Optional[Question] = None
self.session: Optional[InterviewSession] = None
# Estimated questions (for progress)
self.estimated_total_questions = 12 # Will be updated based on complexity
def start_interview(
self,
study_name: str,
model_path: Optional[Path] = None,
introspection: Optional[Dict[str, Any]] = None
) -> InterviewSession:
"""
Start a new interview or resume existing one.
Args:
study_name: Name for the study
model_path: Path to the NX model (optional)
introspection: Pre-computed introspection results (optional)
Returns:
InterviewSession representing the active interview
"""
# Check for existing state
existing_state = self.state_manager.load_state()
if existing_state and not existing_state.is_complete():
# Resume existing interview
self.state = existing_state
self.introspection = existing_state.introspection
self.session = InterviewSession(
session_id=existing_state.session_id,
study_name=existing_state.study_name,
study_path=self.study_path,
started_at=datetime.fromisoformat(existing_state.started_at),
current_phase=existing_state.get_phase(),
introspection=self.introspection,
is_resumed=True,
)
return self.session
# Start new interview
self.state = InterviewState(
session_id=str(uuid.uuid4()),
study_name=study_name,
study_path=str(self.study_path),
current_phase=InterviewPhase.INTROSPECTION.value,
)
# Store introspection if provided
if introspection:
self.introspection = introspection
self.state.introspection = introspection
# Move to problem definition if introspection already done
self.state.set_phase(InterviewPhase.PROBLEM_DEFINITION)
# Save initial state
self.state_manager.save_state(self.state)
self.session = InterviewSession(
session_id=self.state.session_id,
study_name=study_name,
study_path=self.study_path,
started_at=datetime.now(),
current_phase=self.state.get_phase(),
introspection=self.introspection,
)
return self.session
def get_first_question(self) -> NextAction:
"""
Get the first question to ask.
Returns:
NextAction with the first question
"""
if self.state is None:
return NextAction(
action_type="error",
message="Interview not started. Call start_interview() first."
)
# Get next question
next_q = self.question_engine.get_next_question(self.state, self.introspection)
if next_q is None:
# No questions - should not happen at start
return NextAction(
action_type="error",
message="No questions available."
)
self.current_question = next_q
return NextAction(
action_type="ask_question",
question=next_q,
message=self.presenter.present_question(
next_q,
question_number=self.state.current_question_count() + 1,
total_questions=self.estimated_total_questions,
category_name=self._get_category_name(next_q.category),
)
)
def process_answer(self, answer: str) -> NextAction:
"""
Process user answer and determine next action.
Args:
answer: User's answer (natural language)
Returns:
NextAction indicating what to do next
"""
if self.state is None or self.current_question is None:
return NextAction(
action_type="error",
message="No active question. Call get_first_question() or get_next_question()."
)
question = self.current_question
# 1. Parse answer based on question type
parsed = self.presenter.parse_response(answer, question)
# 2. Validate answer
is_valid, error_msg = self.question_engine.validate_answer(parsed, question)
if not is_valid:
return NextAction(
action_type="error",
message=f"Invalid answer: {error_msg}",
question=question, # Re-ask same question
)
# 3. Store answer
self._store_answer(question, answer, parsed)
# 4. Update phase if needed
self._update_phase(question)
# 5. Update complexity after initial questions
if question.category == "problem_definition":
self._update_complexity()
# 6. Check for warnings/anti-patterns
anti_patterns = self.validator.detect_anti_patterns(self.state, self.introspection)
new_warnings = [ap.description for ap in anti_patterns if ap.severity in ["error", "warning"]]
# Filter to only new warnings
existing_warnings = set(self.state.warnings)
for w in new_warnings:
if w not in existing_warnings:
self.state.add_warning(w)
# 7. Check if we should show anti-pattern warnings
blocking_patterns = [ap for ap in anti_patterns if ap.severity == "error" and not ap.acknowledged]
if blocking_patterns:
return NextAction(
action_type="confirm_warning",
message=self._format_anti_pattern_warnings(blocking_patterns),
anti_patterns=blocking_patterns,
)
# 8. Get next question
next_q = self.question_engine.get_next_question(self.state, self.introspection)
if next_q is None:
# Interview complete - generate blueprint
return self._finalize_interview()
self.current_question = next_q
return NextAction(
action_type="ask_question",
question=next_q,
message=self.presenter.present_question(
next_q,
question_number=self.state.current_question_count() + 1,
total_questions=self.estimated_total_questions,
category_name=self._get_category_name(next_q.category),
),
warnings=[w for w in self.state.warnings if w not in self.state.warnings_acknowledged],
)
def acknowledge_warnings(self, acknowledged: bool = True) -> NextAction:
"""
Acknowledge current warnings and continue.
Args:
acknowledged: Whether user acknowledged warnings
Returns:
NextAction (continue or abort)
"""
if not acknowledged:
return NextAction(
action_type="error",
message="Interview paused. Please fix the issues and restart, or acknowledge warnings to proceed."
)
# Mark all current warnings as acknowledged
for w in self.state.warnings:
self.state.acknowledge_warning(w)
# Continue to next question
next_q = self.question_engine.get_next_question(self.state, self.introspection)
if next_q is None:
return self._finalize_interview()
self.current_question = next_q
return NextAction(
action_type="ask_question",
question=next_q,
message=self.presenter.present_question(
next_q,
question_number=self.state.current_question_count() + 1,
total_questions=self.estimated_total_questions,
category_name=self._get_category_name(next_q.category),
)
)
def generate_blueprint(self) -> StudyBlueprint:
"""
Generate study blueprint from interview state.
Returns:
StudyBlueprint ready for generation
"""
if self.state is None:
raise ValueError("No interview state available")
blueprint = self.blueprint_builder.from_interview_state(
self.state,
self.introspection
)
# Store in state
self.state.blueprint = blueprint.to_dict()
self.state_manager.save_state(self.state)
return blueprint
def modify_blueprint(self, changes: Dict[str, Any]) -> StudyBlueprint:
"""
Apply what-if modifications to the blueprint.
Args:
changes: Dictionary of changes to apply
Returns:
Modified StudyBlueprint
"""
if self.state is None or self.state.blueprint is None:
raise ValueError("No blueprint available to modify")
blueprint = StudyBlueprint.from_dict(self.state.blueprint)
# Apply changes
for key, value in changes.items():
if key == "n_trials":
blueprint.n_trials = int(value)
elif key == "sampler":
blueprint.sampler = value
elif key == "add_constraint":
# Handle adding constraints
pass
elif key == "remove_constraint":
# Handle removing constraints
pass
# Add more modification types as needed
# Re-validate
validation_errors = blueprint.validate()
if validation_errors:
raise ValueError(f"Invalid modifications: {validation_errors}")
# Update state
self.state.blueprint = blueprint.to_dict()
self.state_manager.save_state(self.state)
return blueprint
def confirm_blueprint(self) -> bool:
"""
Confirm blueprint and mark interview as complete.
Returns:
True if successful
"""
if self.state is None:
return False
self.state.set_phase(InterviewPhase.COMPLETE)
self.state_manager.save_state(self.state)
# Finalize log
self.state_manager.finalize_log(self.state)
return True
def get_current_state(self) -> Optional[InterviewState]:
"""Get current interview state."""
return self.state
def get_progress(self) -> str:
"""Get formatted progress string."""
if self.state is None:
return "No active interview"
return self.presenter.show_progress(
self.state.current_question_count(),
self.estimated_total_questions,
self._get_phase_name(self.state.current_phase)
)
def reset_interview(self) -> None:
"""Reset interview and start fresh."""
self.state_manager.delete_state()
self.state = None
self.current_question = None
self.session = None
# Private methods
def _store_answer(self, question: Question, raw: str, parsed: Any) -> None:
"""Store answer in state."""
# Create answered question record
answered = AnsweredQuestion(
question_id=question.id,
answered_at=datetime.now().isoformat(),
raw_response=raw,
parsed_value=parsed,
)
self.state.add_answered_question(answered)
# Map to answer field
self._map_answer_to_field(question.maps_to, parsed)
# Create log entry
log_entry = LogEntry(
timestamp=datetime.now(),
question_id=question.id,
question_text=question.text,
answer_raw=raw,
answer_parsed=parsed,
)
self.state_manager.append_log(log_entry)
self.state_manager.save_state(self.state)
def _map_answer_to_field(self, maps_to: str, value: Any) -> None:
"""Map parsed value to the appropriate answer field."""
if not maps_to:
return
# Handle array indexing: "objectives[0].goal"
if "[" in maps_to:
import re
match = re.match(r"(\w+)\[(\d+)\]\.(\w+)", maps_to)
if match:
array_name, idx, field = match.groups()
idx = int(idx)
# Ensure array exists
if array_name not in self.state.answers:
self.state.answers[array_name] = []
# Ensure element exists
while len(self.state.answers[array_name]) <= idx:
self.state.answers[array_name].append({})
self.state.answers[array_name][idx][field] = value
return
# Handle nested fields: "constraints.max_stress"
if "." in maps_to:
parts = maps_to.split(".")
current = self.state.answers
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
return
# Simple field
self.state.set_answer(maps_to, value)
def _update_phase(self, question: Question) -> None:
"""Update interview phase based on question category."""
category_to_phase = {
"problem_definition": InterviewPhase.PROBLEM_DEFINITION,
"objectives": InterviewPhase.OBJECTIVES,
"constraints": InterviewPhase.CONSTRAINTS,
"design_variables": InterviewPhase.DESIGN_VARIABLES,
"physics_config": InterviewPhase.DESIGN_VARIABLES,
"optimization_settings": InterviewPhase.VALIDATION,
"validation": InterviewPhase.VALIDATION,
}
new_phase = category_to_phase.get(question.category)
if new_phase and new_phase != self.state.get_phase():
self.state.set_phase(new_phase)
def _update_complexity(self) -> None:
"""Update complexity estimate after initial questions."""
complexity = self.intelligence.determine_complexity(self.state, self.introspection)
self.state.complexity = complexity
# Adjust estimated questions
if complexity == "simple":
self.estimated_total_questions = 8
elif complexity == "moderate":
self.estimated_total_questions = 12
else:
self.estimated_total_questions = 16
def _finalize_interview(self) -> NextAction:
"""Finalize interview and show summary."""
self.state.set_phase(InterviewPhase.REVIEW)
blueprint = self.generate_blueprint()
return NextAction(
action_type="show_summary",
message=self.presenter.show_summary(blueprint),
blueprint=blueprint,
)
def _format_anti_pattern_warnings(self, patterns: List[AntiPattern]) -> str:
"""Format anti-pattern warnings for display."""
lines = ["**Issues Detected:**", ""]
for ap in patterns:
severity_icon = "X" if ap.severity == "error" else "!"
lines.append(f"[{severity_icon}] **{ap.name}**")
lines.append(f" {ap.description}")
if ap.fix_suggestion:
lines.append(f" *Suggestion*: {ap.fix_suggestion}")
lines.append("")
lines.append("Would you like to proceed anyway? Type **yes** to continue or **no** to go back and fix.")
return "\n".join(lines)
def _get_category_name(self, category: str) -> str:
"""Get human-readable category name."""
names = {
"problem_definition": "Problem Definition",
"objectives": "Optimization Goals",
"constraints": "Constraints",
"design_variables": "Design Variables",
"physics_config": "Physics Configuration",
"optimization_settings": "Optimization Settings",
"validation": "Validation",
}
return names.get(category, category.replace("_", " ").title())
def _get_phase_name(self, phase: str) -> str:
"""Get human-readable phase name."""
names = {
"introspection": "Model Analysis",
"problem_definition": "Problem Definition",
"objectives": "Setting Objectives",
"constraints": "Defining Constraints",
"design_variables": "Selecting Variables",
"validation": "Validation",
"review": "Review & Confirm",
"complete": "Complete",
}
return names.get(phase, phase.replace("_", " ").title())
# Convenience function for quick interview
def run_interview(
study_path: Path,
study_name: str,
introspection: Optional[Dict[str, Any]] = None
) -> StudyInterviewEngine:
"""
Create and start an interview engine.
Args:
study_path: Path to study directory
study_name: Study name
introspection: Optional introspection results
Returns:
Configured StudyInterviewEngine ready for use
"""
engine = StudyInterviewEngine(study_path)
engine.start_interview(study_name, introspection=introspection)
return engine