""" Question Engine This module manages question definitions, conditions, and dynamic options. It handles: - Loading question schemas from JSON - Evaluating conditional logic - Populating dynamic options from introspection - Question ordering and flow control """ from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Any, Optional, Literal, Union import json import re @dataclass class ValidationRule: """Validation rule for a question answer.""" required: bool = False min_length: Optional[int] = None max_length: Optional[int] = None min: Optional[float] = None max: Optional[float] = None min_selections: Optional[int] = None max_selections: Optional[int] = None pattern: Optional[str] = None units: Optional[str] = None @classmethod def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["ValidationRule"]: """Create from dictionary.""" if data is None: return None return cls( required=data.get("required", False), min_length=data.get("min_length"), max_length=data.get("max_length"), min=data.get("min"), max=data.get("max"), min_selections=data.get("min_selections"), max_selections=data.get("max_selections"), pattern=data.get("pattern"), units=data.get("units"), ) @dataclass class QuestionOption: """Option for choice/multi_choice questions.""" value: Any label: str description: Optional[str] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> "QuestionOption": """Create from dictionary.""" return cls( value=data["value"], label=data["label"], description=data.get("description"), ) @dataclass class QuestionCondition: """ Conditional logic for when to ask a question. Supports: - answered: field has been answered - equals: field equals value - contains: array field contains value - greater_than: numeric comparison - less_than: numeric comparison - exists: field exists and is not None - introspection_has: introspection data has field - complexity_is: complexity level matches - and/or/not: logical operators """ type: str field: Optional[str] = None value: Optional[Any] = None condition: Optional["QuestionCondition"] = None # For 'not' conditions: Optional[List["QuestionCondition"]] = None # For 'and'/'or' @classmethod def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["QuestionCondition"]: """Create from dictionary.""" if data is None: return None condition = cls( type=data["type"], field=data.get("field"), value=data.get("value"), ) # Handle nested 'not' condition if "condition" in data: condition.condition = cls.from_dict(data["condition"]) # Handle nested 'and'/'or' conditions if "conditions" in data: condition.conditions = [ cls.from_dict(c) for c in data["conditions"] ] return condition @dataclass class DynamicOptions: """Configuration for dynamic option population.""" type: str source: str filter: Optional[str] = None @classmethod def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["DynamicOptions"]: """Create from dictionary.""" if data is None: return None return cls( type=data["type"], source=data["source"], filter=data.get("filter"), ) @dataclass class DynamicContent: """Configuration for dynamic content in question text.""" type: str source: str @classmethod def from_dict(cls, data: Optional[Dict[str, Any]]) -> Optional["DynamicContent"]: """Create from dictionary.""" if data is None: return None return cls( type=data["type"], source=data["source"], ) @dataclass class Question: """Represents a single interview question.""" id: str category: str text: str question_type: Literal["text", "choice", "multi_choice", "numeric", "confirm", "parameter_select", "bounds"] maps_to: str help_text: Optional[str] = None options: Optional[List[QuestionOption]] = None default: Optional[Any] = None validation: Optional[ValidationRule] = None condition: Optional[QuestionCondition] = None engineering_guidance: Optional[str] = None dynamic_options: Optional[DynamicOptions] = None dynamic_content: Optional[DynamicContent] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Question": """Create from dictionary.""" options = None if data.get("options"): options = [QuestionOption.from_dict(o) for o in data["options"]] return cls( id=data["id"], category=data["category"], text=data["text"], question_type=data["question_type"], maps_to=data["maps_to"], help_text=data.get("help_text"), options=options, default=data.get("default"), validation=ValidationRule.from_dict(data.get("validation")), condition=QuestionCondition.from_dict(data.get("condition")), engineering_guidance=data.get("engineering_guidance"), dynamic_options=DynamicOptions.from_dict(data.get("dynamic_options")), dynamic_content=DynamicContent.from_dict(data.get("dynamic_content")), ) @dataclass class QuestionCategory: """Category of related questions.""" id: str name: str phase: str order: int always_ask: bool = True condition: Optional[QuestionCondition] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> "QuestionCategory": """Create from dictionary.""" return cls( id=data["id"], name=data["name"], phase=data["phase"], order=data["order"], always_ask=data.get("always_ask", True), condition=QuestionCondition.from_dict(data.get("condition")), ) class QuestionEngine: """ Manages question definitions and flow logic. Handles: - Loading questions from JSON schema - Evaluating conditions to determine next question - Populating dynamic options from introspection - Answer parsing and validation """ def __init__(self, schema_path: Optional[Path] = None): """ Initialize question engine. Args: schema_path: Path to question schema JSON. If None, uses default. """ if schema_path is None: schema_path = Path(__file__).parent / "schemas" / "interview_questions.json" self.schema_path = schema_path self.schema: Dict[str, Any] = {} self.categories: List[QuestionCategory] = [] self.questions: Dict[str, Question] = {} self.questions_by_category: Dict[str, List[Question]] = {} self._load_schema() def _load_schema(self) -> None: """Load question schema from JSON file.""" if not self.schema_path.exists(): raise FileNotFoundError(f"Question schema not found: {self.schema_path}") with open(self.schema_path, "r", encoding="utf-8") as f: self.schema = json.load(f) # Parse categories self.categories = [ QuestionCategory.from_dict(c) for c in self.schema.get("categories", []) ] self.categories.sort(key=lambda c: c.order) # Parse questions for q_data in self.schema.get("questions", []): question = Question.from_dict(q_data) self.questions[question.id] = question # Organize by category if question.category not in self.questions_by_category: self.questions_by_category[question.category] = [] self.questions_by_category[question.category].append(question) def get_all_questions(self) -> List[Question]: """Get all questions in order.""" result = [] for category in self.categories: if category.id in self.questions_by_category: result.extend(self.questions_by_category[category.id]) return result def get_question(self, question_id: str) -> Optional[Question]: """Get a specific question by ID.""" return self.questions.get(question_id) def get_next_question( self, state: "InterviewState", introspection: Dict[str, Any] ) -> Optional[Question]: """ Determine the next question based on state and conditions. Args: state: Current interview state introspection: Introspection results from model Returns: Next question to ask, or None if interview is complete """ answered_ids = {q["question_id"] for q in state.questions_answered} # Go through categories in order for category in self.categories: # Check if category should be asked if not self._should_ask_category(category, state, introspection): continue # Get questions in this category category_questions = self.questions_by_category.get(category.id, []) for question in category_questions: # Skip if already answered if question.id in answered_ids: continue # Check if question condition is met if self._should_ask_question(question, state, introspection): # Populate dynamic options if needed return self._prepare_question(question, state, introspection) # No more questions return None def _should_ask_category( self, category: QuestionCategory, state: "InterviewState", introspection: Dict[str, Any] ) -> bool: """Check if a category should be asked.""" if category.always_ask: return True if category.condition: return self.evaluate_condition(category.condition, state, introspection) return True def _should_ask_question( self, question: Question, state: "InterviewState", introspection: Dict[str, Any] ) -> bool: """Check if a question should be asked.""" if question.condition is None: return True return self.evaluate_condition(question.condition, state, introspection) def evaluate_condition( self, condition: QuestionCondition, state: "InterviewState", introspection: Dict[str, Any] ) -> bool: """ Evaluate if a condition is met. Args: condition: Condition to evaluate state: Current interview state introspection: Introspection results Returns: True if condition is met """ cond_type = condition.type if cond_type == "answered": return self._get_nested_value(state.answers, condition.field) is not None elif cond_type == "equals": actual = self._get_nested_value(state.answers, condition.field) return actual == condition.value elif cond_type == "contains": actual = self._get_nested_value(state.answers, condition.field) if isinstance(actual, list): return condition.value in actual return False elif cond_type == "greater_than": actual = self._get_nested_value(state.answers, condition.field) if actual is not None and isinstance(actual, (int, float)): return actual > condition.value return False elif cond_type == "less_than": actual = self._get_nested_value(state.answers, condition.field) if actual is not None and isinstance(actual, (int, float)): return actual < condition.value return False elif cond_type == "exists": actual = self._get_nested_value(state.answers, condition.field) return actual is not None elif cond_type == "introspection_has": return condition.field in introspection elif cond_type == "complexity_is": expected = condition.value if isinstance(expected, list): return state.complexity in expected return state.complexity == expected elif cond_type == "and": if condition.conditions: return all( self.evaluate_condition(c, state, introspection) for c in condition.conditions ) return True elif cond_type == "or": if condition.conditions: return any( self.evaluate_condition(c, state, introspection) for c in condition.conditions ) return False elif cond_type == "not": if condition.condition: return not self.evaluate_condition(condition.condition, state, introspection) return True else: # Unknown condition type return True def _get_nested_value(self, data: Dict[str, Any], path: str) -> Any: """ Get a value from nested dict using dot notation. Supports array indexing: "objectives[0].goal" """ if not path: return None parts = re.split(r'\.|\[|\]', path) parts = [p for p in parts if p] # Remove empty strings current = data for part in parts: if current is None: return None if isinstance(current, dict): current = current.get(part) elif isinstance(current, list): try: idx = int(part) if 0 <= idx < len(current): current = current[idx] else: return None except ValueError: return None else: return None return current def _prepare_question( self, question: Question, state: "InterviewState", introspection: Dict[str, Any] ) -> Question: """ Prepare a question for presentation. Populates dynamic options and content. """ # Create a copy to avoid mutating the original import copy prepared = copy.deepcopy(question) # Populate dynamic options if prepared.dynamic_options: prepared.options = self._populate_dynamic_options( prepared.dynamic_options, state, introspection ) return prepared def _populate_dynamic_options( self, dynamic: DynamicOptions, state: "InterviewState", introspection: Dict[str, Any] ) -> List[QuestionOption]: """Populate dynamic options from introspection data.""" options = [] if dynamic.type == "expressions": # Get expressions from introspection expressions = introspection.get("expressions", []) # Apply filter if specified if dynamic.filter == "design_variable_heuristics": expressions = self._filter_design_variables(expressions) elif dynamic.filter == "exclude_selected_dvs": selected = [dv.get("parameter") for dv in state.answers.get("design_variables", [])] expressions = [e for e in expressions if e.get("name") not in selected] # Convert to options for expr in expressions: name = expr.get("name", "") value = expr.get("value", 0) options.append(QuestionOption( value=name, label=f"{name} (current: {value})", description=expr.get("formula") if expr.get("formula") != str(value) else None, )) return options def _filter_design_variables(self, expressions: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Filter expressions to likely design variables using heuristics.""" # High confidence patterns high_patterns = [ r"thickness", r"width", r"height", r"diameter", r"radius", r"length", r"depth", r"angle", r"fillet", r"chamfer", r"rib_\w+", r"wall_\w+", r"flange_\w+" ] # Medium confidence patterns medium_patterns = [ r"dim_\w+", r"size_\w+", r"param_\w+", r"p\d+", r"var_\w+" ] # Exclusion patterns exclude_patterns = [ r"mesh_\w+", r"count_\w+", r"num_\w+", r"material\w*", r"derived_\w+", r"calc_\w+", r"_result$", r"_output$" ] def matches_any(name: str, patterns: List[str]) -> bool: return any(re.search(p, name.lower()) for p in patterns) # Score and filter scored = [] for expr in expressions: name = expr.get("name", "") # Skip exclusions if matches_any(name, exclude_patterns): continue # Skip if not a simple numeric value value = expr.get("value") if not isinstance(value, (int, float)): continue # Skip if it's a formula (computed value) formula = expr.get("formula", "") if formula and formula != str(value): continue # Score score = 0 if matches_any(name, high_patterns): score = 2 elif matches_any(name, medium_patterns): score = 1 if score > 0 or len(name) > 2: # Include if named or matches pattern scored.append((score, expr)) # Sort by score descending scored.sort(key=lambda x: -x[0]) return [expr for _, expr in scored] def validate_answer( self, answer: Any, question: Question ) -> tuple[bool, Optional[str]]: """ Validate an answer against question rules. Returns: Tuple of (is_valid, error_message) """ if question.validation is None: return True, None validation = question.validation # Required check if validation.required: if answer is None or answer == "" or answer == []: return False, "This field is required" # Skip further validation if empty and not required if answer is None or answer == "": return True, None # Text length validation if question.question_type == "text": if validation.min_length and len(str(answer)) < validation.min_length: return False, f"Answer must be at least {validation.min_length} characters" if validation.max_length and len(str(answer)) > validation.max_length: return False, f"Answer must be at most {validation.max_length} characters" # Numeric validation if question.question_type == "numeric": try: num = float(answer) if validation.min is not None and num < validation.min: return False, f"Value must be at least {validation.min}" if validation.max is not None and num > validation.max: return False, f"Value must be at most {validation.max}" except (ValueError, TypeError): return False, "Please enter a valid number" # Multi-choice validation if question.question_type in ["multi_choice", "parameter_select"]: if isinstance(answer, list): if validation.min_selections and len(answer) < validation.min_selections: return False, f"Please select at least {validation.min_selections} option(s)" if validation.max_selections and len(answer) > validation.max_selections: return False, f"Please select at most {validation.max_selections} option(s)" # Pattern validation if validation.pattern: if not re.match(validation.pattern, str(answer)): return False, "Answer does not match required format" return True, None def parse_answer( self, raw_answer: str, question: Question ) -> Any: """ Parse a raw answer string into the appropriate type. Args: raw_answer: Raw string answer from user question: Question being answered Returns: Parsed answer value """ answer = raw_answer.strip() if question.question_type == "text": return answer elif question.question_type == "numeric": # Extract number, handling units match = re.search(r"[-+]?\d*\.?\d+", answer) if match: return float(match.group()) return None elif question.question_type == "confirm": lower = answer.lower() if lower in ["yes", "y", "true", "1", "ok", "sure", "confirm", "correct"]: return True elif lower in ["no", "n", "false", "0", "cancel", "incorrect"]: return False return None elif question.question_type == "choice": # Try matching by number if answer.isdigit(): idx = int(answer) - 1 if question.options and 0 <= idx < len(question.options): return question.options[idx].value # Try matching by value or label if question.options: for opt in question.options: if answer.lower() == str(opt.value).lower(): return opt.value if answer.lower() == opt.label.lower(): return opt.value # Fuzzy match if answer.lower() in opt.label.lower(): return opt.value return answer elif question.question_type == "multi_choice": # Parse comma/and separated values parts = re.split(r"[,&]|\band\b", answer) values = [] for part in parts: part = part.strip() if not part: continue # Try matching by number if part.isdigit(): idx = int(part) - 1 if question.options and 0 <= idx < len(question.options): values.append(question.options[idx].value) continue # Try matching by value or label if question.options: for opt in question.options: if part.lower() == str(opt.value).lower(): values.append(opt.value) break if part.lower() == opt.label.lower(): values.append(opt.value) break if part.lower() in opt.label.lower(): values.append(opt.value) break return values if values else [answer] elif question.question_type == "parameter_select": # Similar to multi_choice but for parameters parts = re.split(r"[,&]|\band\b", answer) return [p.strip() for p in parts if p.strip()] elif question.question_type == "bounds": # Parse bounds like "2-10" or "2 to 10" or "min 2, max 10" bounds = {} # Try "min to max" format match = re.search(r"(\d+\.?\d*)\s*(?:to|-)\s*(\d+\.?\d*)", answer) if match: bounds["min"] = float(match.group(1)) bounds["max"] = float(match.group(2)) return bounds # Try "min X, max Y" format min_match = re.search(r"min[:\s]+(\d+\.?\d*)", answer.lower()) max_match = re.search(r"max[:\s]+(\d+\.?\d*)", answer.lower()) if min_match: bounds["min"] = float(min_match.group(1)) if max_match: bounds["max"] = float(max_match.group(1)) return bounds if bounds else None return answer # Import InterviewState here to avoid circular imports from .interview_state import InterviewState