Files
Atomizer/optimization_engine/interview/question_engine.py

748 lines
24 KiB
Python
Raw Normal View History

"""
Question Engine
This module manages question definitions, conditions, and dynamic options.
It handles:
- Loading question schemas from JSON
- Evaluating conditional logic
- Populating dynamic options from introspection
- Question ordering and flow control
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Any, Optional, Literal, Union
import json
import re
@dataclass
class ValidationRule:
"""Validation rule for a question answer."""
required: bool = False
min_length: Optional[int] = None
max_length: Optional[int] = None
min: Optional[float] = None
max: Optional[float] = None
min_selections: Optional[int] = None
max_selections: Optional[int] = None
pattern: Optional[str] = None
units: Optional[str] = None
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["ValidationRule"]:
"""Create from dictionary."""
if data is None:
return None
return cls(
required=data.get("required", False),
min_length=data.get("min_length"),
max_length=data.get("max_length"),
min=data.get("min"),
max=data.get("max"),
min_selections=data.get("min_selections"),
max_selections=data.get("max_selections"),
pattern=data.get("pattern"),
units=data.get("units"),
)
@dataclass
class QuestionOption:
"""Option for choice/multi_choice questions."""
value: Any
label: str
description: Optional[str] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QuestionOption":
"""Create from dictionary."""
return cls(
value=data["value"],
label=data["label"],
description=data.get("description"),
)
@dataclass
class QuestionCondition:
"""
Conditional logic for when to ask a question.
Supports:
- answered: field has been answered
- equals: field equals value
- contains: array field contains value
- greater_than: numeric comparison
- less_than: numeric comparison
- exists: field exists and is not None
- introspection_has: introspection data has field
- complexity_is: complexity level matches
- and/or/not: logical operators
"""
type: str
field: Optional[str] = None
value: Optional[Any] = None
condition: Optional["QuestionCondition"] = None # For 'not'
conditions: Optional[List["QuestionCondition"]] = None # For 'and'/'or'
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["QuestionCondition"]:
"""Create from dictionary."""
if data is None:
return None
condition = cls(
type=data["type"],
field=data.get("field"),
value=data.get("value"),
)
# Handle nested 'not' condition
if "condition" in data:
condition.condition = cls.from_dict(data["condition"])
# Handle nested 'and'/'or' conditions
if "conditions" in data:
condition.conditions = [
cls.from_dict(c) for c in data["conditions"]
]
return condition
@dataclass
class DynamicOptions:
"""Configuration for dynamic option population."""
type: str
source: str
filter: Optional[str] = None
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["DynamicOptions"]:
"""Create from dictionary."""
if data is None:
return None
return cls(
type=data["type"],
source=data["source"],
filter=data.get("filter"),
)
@dataclass
class DynamicContent:
"""Configuration for dynamic content in question text."""
type: str
source: str
@classmethod
def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["DynamicContent"]:
"""Create from dictionary."""
if data is None:
return None
return cls(
type=data["type"],
source=data["source"],
)
@dataclass
class Question:
"""Represents a single interview question."""
id: str
category: str
text: str
question_type: Literal["text", "choice", "multi_choice", "numeric", "confirm", "parameter_select", "bounds"]
maps_to: str
help_text: Optional[str] = None
options: Optional[List[QuestionOption]] = None
default: Optional[Any] = None
validation: Optional[ValidationRule] = None
condition: Optional[QuestionCondition] = None
engineering_guidance: Optional[str] = None
dynamic_options: Optional[DynamicOptions] = None
dynamic_content: Optional[DynamicContent] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Question":
"""Create from dictionary."""
options = None
if data.get("options"):
options = [QuestionOption.from_dict(o) for o in data["options"]]
return cls(
id=data["id"],
category=data["category"],
text=data["text"],
question_type=data["question_type"],
maps_to=data["maps_to"],
help_text=data.get("help_text"),
options=options,
default=data.get("default"),
validation=ValidationRule.from_dict(data.get("validation")),
condition=QuestionCondition.from_dict(data.get("condition")),
engineering_guidance=data.get("engineering_guidance"),
dynamic_options=DynamicOptions.from_dict(data.get("dynamic_options")),
dynamic_content=DynamicContent.from_dict(data.get("dynamic_content")),
)
@dataclass
class QuestionCategory:
"""Category of related questions."""
id: str
name: str
phase: str
order: int
always_ask: bool = True
condition: Optional[QuestionCondition] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QuestionCategory":
"""Create from dictionary."""
return cls(
id=data["id"],
name=data["name"],
phase=data["phase"],
order=data["order"],
always_ask=data.get("always_ask", True),
condition=QuestionCondition.from_dict(data.get("condition")),
)
class QuestionEngine:
"""
Manages question definitions and flow logic.
Handles:
- Loading questions from JSON schema
- Evaluating conditions to determine next question
- Populating dynamic options from introspection
- Answer parsing and validation
"""
def __init__(self, schema_path: Optional[Path] = None):
"""
Initialize question engine.
Args:
schema_path: Path to question schema JSON. If None, uses default.
"""
if schema_path is None:
schema_path = Path(__file__).parent / "schemas" / "interview_questions.json"
self.schema_path = schema_path
self.schema: Dict[str, Any] = {}
self.categories: List[QuestionCategory] = []
self.questions: Dict[str, Question] = {}
self.questions_by_category: Dict[str, List[Question]] = {}
self._load_schema()
def _load_schema(self) -> None:
"""Load question schema from JSON file."""
if not self.schema_path.exists():
raise FileNotFoundError(f"Question schema not found: {self.schema_path}")
with open(self.schema_path, "r", encoding="utf-8") as f:
self.schema = json.load(f)
# Parse categories
self.categories = [
QuestionCategory.from_dict(c) for c in self.schema.get("categories", [])
]
self.categories.sort(key=lambda c: c.order)
# Parse questions
for q_data in self.schema.get("questions", []):
question = Question.from_dict(q_data)
self.questions[question.id] = question
# Organize by category
if question.category not in self.questions_by_category:
self.questions_by_category[question.category] = []
self.questions_by_category[question.category].append(question)
def get_all_questions(self) -> List[Question]:
"""Get all questions in order."""
result = []
for category in self.categories:
if category.id in self.questions_by_category:
result.extend(self.questions_by_category[category.id])
return result
def get_question(self, question_id: str) -> Optional[Question]:
"""Get a specific question by ID."""
return self.questions.get(question_id)
def get_next_question(
self,
state: "InterviewState",
introspection: Dict[str, Any]
) -> Optional[Question]:
"""
Determine the next question based on state and conditions.
Args:
state: Current interview state
introspection: Introspection results from model
Returns:
Next question to ask, or None if interview is complete
"""
answered_ids = {q["question_id"] for q in state.questions_answered}
# Go through categories in order
for category in self.categories:
# Check if category should be asked
if not self._should_ask_category(category, state, introspection):
continue
# Get questions in this category
category_questions = self.questions_by_category.get(category.id, [])
for question in category_questions:
# Skip if already answered
if question.id in answered_ids:
continue
# Check if question condition is met
if self._should_ask_question(question, state, introspection):
# Populate dynamic options if needed
return self._prepare_question(question, state, introspection)
# No more questions
return None
def _should_ask_category(
self,
category: QuestionCategory,
state: "InterviewState",
introspection: Dict[str, Any]
) -> bool:
"""Check if a category should be asked."""
if category.always_ask:
return True
if category.condition:
return self.evaluate_condition(category.condition, state, introspection)
return True
def _should_ask_question(
self,
question: Question,
state: "InterviewState",
introspection: Dict[str, Any]
) -> bool:
"""Check if a question should be asked."""
if question.condition is None:
return True
return self.evaluate_condition(question.condition, state, introspection)
def evaluate_condition(
self,
condition: QuestionCondition,
state: "InterviewState",
introspection: Dict[str, Any]
) -> bool:
"""
Evaluate if a condition is met.
Args:
condition: Condition to evaluate
state: Current interview state
introspection: Introspection results
Returns:
True if condition is met
"""
cond_type = condition.type
if cond_type == "answered":
return self._get_nested_value(state.answers, condition.field) is not None
elif cond_type == "equals":
actual = self._get_nested_value(state.answers, condition.field)
return actual == condition.value
elif cond_type == "contains":
actual = self._get_nested_value(state.answers, condition.field)
if isinstance(actual, list):
return condition.value in actual
return False
elif cond_type == "greater_than":
actual = self._get_nested_value(state.answers, condition.field)
if actual is not None and isinstance(actual, (int, float)):
return actual > condition.value
return False
elif cond_type == "less_than":
actual = self._get_nested_value(state.answers, condition.field)
if actual is not None and isinstance(actual, (int, float)):
return actual < condition.value
return False
elif cond_type == "exists":
actual = self._get_nested_value(state.answers, condition.field)
return actual is not None
elif cond_type == "introspection_has":
return condition.field in introspection
elif cond_type == "complexity_is":
expected = condition.value
if isinstance(expected, list):
return state.complexity in expected
return state.complexity == expected
elif cond_type == "and":
if condition.conditions:
return all(
self.evaluate_condition(c, state, introspection)
for c in condition.conditions
)
return True
elif cond_type == "or":
if condition.conditions:
return any(
self.evaluate_condition(c, state, introspection)
for c in condition.conditions
)
return False
elif cond_type == "not":
if condition.condition:
return not self.evaluate_condition(condition.condition, state, introspection)
return True
else:
# Unknown condition type
return True
def _get_nested_value(self, data: Dict[str, Any], path: str) -> Any:
"""
Get a value from nested dict using dot notation.
Supports array indexing: "objectives[0].goal"
"""
if not path:
return None
parts = re.split(r'\.|\[|\]', path)
parts = [p for p in parts if p] # Remove empty strings
current = data
for part in parts:
if current is None:
return None
if isinstance(current, dict):
current = current.get(part)
elif isinstance(current, list):
try:
idx = int(part)
if 0 <= idx < len(current):
current = current[idx]
else:
return None
except ValueError:
return None
else:
return None
return current
def _prepare_question(
self,
question: Question,
state: "InterviewState",
introspection: Dict[str, Any]
) -> Question:
"""
Prepare a question for presentation.
Populates dynamic options and content.
"""
# Create a copy to avoid mutating the original
import copy
prepared = copy.deepcopy(question)
# Populate dynamic options
if prepared.dynamic_options:
prepared.options = self._populate_dynamic_options(
prepared.dynamic_options, state, introspection
)
return prepared
def _populate_dynamic_options(
self,
dynamic: DynamicOptions,
state: "InterviewState",
introspection: Dict[str, Any]
) -> List[QuestionOption]:
"""Populate dynamic options from introspection data."""
options = []
if dynamic.type == "expressions":
# Get expressions from introspection
expressions = introspection.get("expressions", [])
# Apply filter if specified
if dynamic.filter == "design_variable_heuristics":
expressions = self._filter_design_variables(expressions)
elif dynamic.filter == "exclude_selected_dvs":
selected = [dv.get("parameter") for dv in state.answers.get("design_variables", [])]
expressions = [e for e in expressions if e.get("name") not in selected]
# Convert to options
for expr in expressions:
name = expr.get("name", "")
value = expr.get("value", 0)
options.append(QuestionOption(
value=name,
label=f"{name} (current: {value})",
description=expr.get("formula") if expr.get("formula") != str(value) else None,
))
return options
def _filter_design_variables(self, expressions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter expressions to likely design variables using heuristics."""
# High confidence patterns
high_patterns = [
r"thickness", r"width", r"height", r"diameter", r"radius",
r"length", r"depth", r"angle", r"fillet", r"chamfer",
r"rib_\w+", r"wall_\w+", r"flange_\w+"
]
# Medium confidence patterns
medium_patterns = [
r"dim_\w+", r"size_\w+", r"param_\w+", r"p\d+", r"var_\w+"
]
# Exclusion patterns
exclude_patterns = [
r"mesh_\w+", r"count_\w+", r"num_\w+", r"material\w*",
r"derived_\w+", r"calc_\w+", r"_result$", r"_output$"
]
def matches_any(name: str, patterns: List[str]) -> bool:
return any(re.search(p, name.lower()) for p in patterns)
# Score and filter
scored = []
for expr in expressions:
name = expr.get("name", "")
# Skip exclusions
if matches_any(name, exclude_patterns):
continue
# Skip if not a simple numeric value
value = expr.get("value")
if not isinstance(value, (int, float)):
continue
# Skip if it's a formula (computed value)
formula = expr.get("formula", "")
if formula and formula != str(value):
continue
# Score
score = 0
if matches_any(name, high_patterns):
score = 2
elif matches_any(name, medium_patterns):
score = 1
if score > 0 or len(name) > 2: # Include if named or matches pattern
scored.append((score, expr))
# Sort by score descending
scored.sort(key=lambda x: -x[0])
return [expr for _, expr in scored]
def validate_answer(
self,
answer: Any,
question: Question
) -> tuple[bool, Optional[str]]:
"""
Validate an answer against question rules.
Returns:
Tuple of (is_valid, error_message)
"""
if question.validation is None:
return True, None
validation = question.validation
# Required check
if validation.required:
if answer is None or answer == "" or answer == []:
return False, "This field is required"
# Skip further validation if empty and not required
if answer is None or answer == "":
return True, None
# Text length validation
if question.question_type == "text":
if validation.min_length and len(str(answer)) < validation.min_length:
return False, f"Answer must be at least {validation.min_length} characters"
if validation.max_length and len(str(answer)) > validation.max_length:
return False, f"Answer must be at most {validation.max_length} characters"
# Numeric validation
if question.question_type == "numeric":
try:
num = float(answer)
if validation.min is not None and num < validation.min:
return False, f"Value must be at least {validation.min}"
if validation.max is not None and num > validation.max:
return False, f"Value must be at most {validation.max}"
except (ValueError, TypeError):
return False, "Please enter a valid number"
# Multi-choice validation
if question.question_type in ["multi_choice", "parameter_select"]:
if isinstance(answer, list):
if validation.min_selections and len(answer) < validation.min_selections:
return False, f"Please select at least {validation.min_selections} option(s)"
if validation.max_selections and len(answer) > validation.max_selections:
return False, f"Please select at most {validation.max_selections} option(s)"
# Pattern validation
if validation.pattern:
if not re.match(validation.pattern, str(answer)):
return False, "Answer does not match required format"
return True, None
def parse_answer(
self,
raw_answer: str,
question: Question
) -> Any:
"""
Parse a raw answer string into the appropriate type.
Args:
raw_answer: Raw string answer from user
question: Question being answered
Returns:
Parsed answer value
"""
answer = raw_answer.strip()
if question.question_type == "text":
return answer
elif question.question_type == "numeric":
# Extract number, handling units
match = re.search(r"[-+]?\d*\.?\d+", answer)
if match:
return float(match.group())
return None
elif question.question_type == "confirm":
lower = answer.lower()
if lower in ["yes", "y", "true", "1", "ok", "sure", "confirm", "correct"]:
return True
elif lower in ["no", "n", "false", "0", "cancel", "incorrect"]:
return False
return None
elif question.question_type == "choice":
# Try matching by number
if answer.isdigit():
idx = int(answer) - 1
if question.options and 0 <= idx < len(question.options):
return question.options[idx].value
# Try matching by value or label
if question.options:
for opt in question.options:
if answer.lower() == str(opt.value).lower():
return opt.value
if answer.lower() == opt.label.lower():
return opt.value
# Fuzzy match
if answer.lower() in opt.label.lower():
return opt.value
return answer
elif question.question_type == "multi_choice":
# Parse comma/and separated values
parts = re.split(r"[,&]|\band\b", answer)
values = []
for part in parts:
part = part.strip()
if not part:
continue
# Try matching by number
if part.isdigit():
idx = int(part) - 1
if question.options and 0 <= idx < len(question.options):
values.append(question.options[idx].value)
continue
# Try matching by value or label
if question.options:
for opt in question.options:
if part.lower() == str(opt.value).lower():
values.append(opt.value)
break
if part.lower() == opt.label.lower():
values.append(opt.value)
break
if part.lower() in opt.label.lower():
values.append(opt.value)
break
return values if values else [answer]
elif question.question_type == "parameter_select":
# Similar to multi_choice but for parameters
parts = re.split(r"[,&]|\band\b", answer)
return [p.strip() for p in parts if p.strip()]
elif question.question_type == "bounds":
# Parse bounds like "2-10" or "2 to 10" or "min 2, max 10"
bounds = {}
# Try "min to max" format
match = re.search(r"(\d+\.?\d*)\s*(?:to|-)\s*(\d+\.?\d*)", answer)
if match:
bounds["min"] = float(match.group(1))
bounds["max"] = float(match.group(2))
return bounds
# Try "min X, max Y" format
min_match = re.search(r"min[:\s]+(\d+\.?\d*)", answer.lower())
max_match = re.search(r"max[:\s]+(\d+\.?\d*)", answer.lower())
if min_match:
bounds["min"] = float(min_match.group(1))
if max_match:
bounds["max"] = float(max_match.group(1))
return bounds if bounds else None
return answer
# Import InterviewState here to avoid circular imports
from .interview_state import InterviewState