Files

589 lines
19 KiB
Python
Raw Permalink Normal View History

"""
Interview Presenter
Abstract presentation layer for different UI modes.
Handles:
- Formatting questions for display
- Parsing user responses
- Showing summaries and warnings
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Optional, List, Dict
import re
from .question_engine import Question, QuestionOption
@dataclass
class PresentedQuestion:
"""A question formatted for presentation."""
question_id: str
formatted_text: str
question_number: int
total_questions: int
category_name: str
class InterviewPresenter(ABC):
"""
Abstract base for interview presentation.
Different presenters handle UI-specific rendering:
- ClaudePresenter: Markdown for Claude conversation
- DashboardPresenter: WebSocket events for React UI (future)
- CLIPresenter: Interactive terminal prompts (future)
"""
@abstractmethod
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""
Format a question for display.
Args:
question: Question to present
question_number: Current question number
total_questions: Estimated total questions
category_name: Name of the question category
dynamic_content: Dynamic content to inject (e.g., extractor summary)
Returns:
Formatted question string
"""
pass
@abstractmethod
def parse_response(self, response: str, question: Question) -> Any:
"""
Parse user's response into structured value.
Args:
response: Raw user response
question: Question being answered
Returns:
Parsed answer value
"""
pass
@abstractmethod
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""
Format interview summary/blueprint for display.
Args:
blueprint: Generated study blueprint
Returns:
Formatted summary string
"""
pass
@abstractmethod
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""
Format a warning message for display.
Args:
warning: Warning message
severity: "error", "warning", or "info"
Returns:
Formatted warning string
"""
pass
@abstractmethod
def show_progress(self, current: int, total: int, phase: str) -> str:
"""
Format progress indicator.
Args:
current: Current question number
total: Estimated total questions
phase: Current phase name
Returns:
Formatted progress string
"""
pass
class ClaudePresenter(InterviewPresenter):
"""
Presenter for Claude conversation mode (VS Code, Web).
Formats questions and responses as markdown for natural
conversation flow with Claude.
"""
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""Format question as markdown for Claude to present."""
lines = []
# Header with progress
lines.append(f"### Question {question_number} of ~{total_questions}: {category_name}")
lines.append("")
# Main question text
lines.append(question.text)
lines.append("")
# Dynamic content if provided
if dynamic_content:
lines.append(dynamic_content)
lines.append("")
# Options for choice questions
if question.options and question.question_type in ["choice", "multi_choice"]:
for i, opt in enumerate(question.options, 1):
desc = f" - {opt.description}" if opt.description else ""
lines.append(f"{i}. **{opt.label}**{desc}")
lines.append("")
# Help text
if question.help_text:
lines.append(f"> {question.help_text}")
lines.append("")
# Engineering guidance
if question.engineering_guidance:
lines.append(f"> **Tip**: {question.engineering_guidance}")
lines.append("")
# Default value hint
if question.default is not None and question.default != []:
if isinstance(question.default, list):
default_str = ", ".join(str(d) for d in question.default)
else:
default_str = str(question.default)
lines.append(f"*Default: {default_str}*")
lines.append("")
# Input prompt based on type
if question.question_type == "text":
lines.append("Please describe:")
elif question.question_type == "numeric":
units = question.validation.units if question.validation else ""
lines.append(f"Enter value{f' ({units})' if units else ''}:")
elif question.question_type == "choice":
lines.append("Type your choice (number or description):")
elif question.question_type == "multi_choice":
lines.append("Type your choices (numbers or descriptions, comma-separated):")
elif question.question_type == "confirm":
lines.append("Type **yes** or **no**:")
elif question.question_type == "parameter_select":
lines.append("Type parameter names (comma-separated) or select by number:")
elif question.question_type == "bounds":
lines.append("Enter bounds (e.g., '2 to 10' or 'min 2, max 10'):")
return "\n".join(lines)
def parse_response(self, response: str, question: Question) -> Any:
"""Parse natural language response into structured answer."""
response = response.strip()
if question.question_type == "text":
return response
elif question.question_type == "numeric":
return self._parse_numeric(response, question)
elif question.question_type == "confirm":
return self._parse_confirm(response)
elif question.question_type == "choice":
return self._parse_choice(response, question)
elif question.question_type == "multi_choice":
return self._parse_multi_choice(response, question)
elif question.question_type == "parameter_select":
return self._parse_parameter_select(response, question)
elif question.question_type == "bounds":
return self._parse_bounds(response)
return response
def _parse_numeric(self, response: str, question: Question) -> Optional[float]:
"""Parse numeric response with unit handling."""
# Remove common unit suffixes
cleaned = re.sub(r'\s*(mm|cm|m|kg|g|MPa|Pa|GPa|Hz|kHz|MHz|°|deg)s?\s*$', '', response, flags=re.I)
# Extract number
match = re.search(r'[-+]?\d*\.?\d+', cleaned)
if match:
return float(match.group())
return None
def _parse_confirm(self, response: str) -> Optional[bool]:
"""Parse yes/no confirmation."""
lower = response.lower().strip()
# Positive responses
if lower in ["yes", "y", "true", "1", "ok", "sure", "yep", "yeah", "correct", "confirmed", "confirm", "affirmative"]:
return True
# Negative responses
if lower in ["no", "n", "false", "0", "nope", "nah", "cancel", "incorrect", "negative"]:
return False
# Try to detect intent from natural language
if "yes" in lower or "ok" in lower or "correct" in lower:
return True
if "no" in lower or "don't" in lower or "not" in lower:
return False
return None
def _parse_choice(self, response: str, question: Question) -> Any:
"""Parse single choice response."""
if not question.options:
return response
# Try by number
if response.isdigit():
idx = int(response) - 1
if 0 <= idx < len(question.options):
return question.options[idx].value
# Try by value (exact match)
for opt in question.options:
if response.lower() == str(opt.value).lower():
return opt.value
# Try by label (exact match)
for opt in question.options:
if response.lower() == opt.label.lower():
return opt.value
# Try fuzzy match on label
for opt in question.options:
if response.lower() in opt.label.lower():
return opt.value
# Return as-is for custom values
return response
def _parse_multi_choice(self, response: str, question: Question) -> List[Any]:
"""Parse multiple choice response."""
# Split by comma, 'and', or numbers
parts = re.split(r'[,&]|\band\b|\s+', response)
parts = [p.strip() for p in parts if p.strip()]
values = []
for part in parts:
if not part:
continue
# Try by number
if part.isdigit() and question.options:
idx = int(part) - 1
if 0 <= idx < len(question.options):
value = question.options[idx].value
if value not in values:
values.append(value)
continue
# Try by value/label
if question.options:
found = False
for opt in question.options:
if part.lower() == str(opt.value).lower() or part.lower() == opt.label.lower():
if opt.value not in values:
values.append(opt.value)
found = True
break
if part.lower() in opt.label.lower():
if opt.value not in values:
values.append(opt.value)
found = True
break
if found:
continue
# Add as custom value
if part not in values:
values.append(part)
return values
def _parse_parameter_select(self, response: str, question: Question) -> List[str]:
"""Parse parameter selection response."""
# Split by comma, 'and', or numbers
parts = re.split(r'[,&]|\band\b', response)
parameters = []
for part in parts:
part = part.strip()
if not part:
continue
# Try by number if we have options
if part.isdigit() and question.options:
idx = int(part) - 1
if 0 <= idx < len(question.options):
parameters.append(question.options[idx].value)
continue
# Add as parameter name
parameters.append(part)
return parameters
def _parse_bounds(self, response: str) -> Optional[Dict[str, float]]:
"""Parse bounds specification."""
bounds = {}
# Try "min to max" format
match = re.search(r'(\d+\.?\d*)\s*(?:to|-)\s*(\d+\.?\d*)', response)
if match:
bounds["min"] = float(match.group(1))
bounds["max"] = float(match.group(2))
return bounds
# Try "min: X, max: Y" format
min_match = re.search(r'min[:\s]+(\d+\.?\d*)', response, re.I)
max_match = re.search(r'max[:\s]+(\d+\.?\d*)', response, re.I)
if min_match:
bounds["min"] = float(min_match.group(1))
if max_match:
bounds["max"] = float(max_match.group(1))
return bounds if bounds else None
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""Format interview summary/blueprint for display."""
lines = []
lines.append(f"## Study Blueprint: {blueprint.study_name}")
lines.append("")
# Description
if blueprint.study_description:
lines.append(f"**Description**: {blueprint.study_description}")
lines.append("")
# Design Variables
lines.append(f"### Design Variables ({len(blueprint.design_variables)})")
lines.append("")
lines.append("| Parameter | Current | Min | Max | Units |")
lines.append("|-----------|---------|-----|-----|-------|")
for dv in blueprint.design_variables:
lines.append(f"| {dv.parameter} | {dv.current_value} | {dv.min_value} | {dv.max_value} | {dv.units or '-'} |")
lines.append("")
# Objectives
lines.append(f"### Objectives ({len(blueprint.objectives)})")
lines.append("")
lines.append("| Goal | Extractor | Parameters |")
lines.append("|------|-----------|------------|")
for obj in blueprint.objectives:
params = ", ".join(f"{k}={v}" for k, v in (obj.extractor_params or {}).items()) or "-"
lines.append(f"| {obj.goal} | {obj.extractor} | {params} |")
lines.append("")
# Constraints
if blueprint.constraints:
lines.append(f"### Constraints ({len(blueprint.constraints)})")
lines.append("")
lines.append("| Type | Threshold | Extractor |")
lines.append("|------|-----------|-----------|")
for con in blueprint.constraints:
op = "<=" if con.constraint_type == "max" else ">="
lines.append(f"| {con.name} | {op} {con.threshold} | {con.extractor} |")
lines.append("")
# Settings
lines.append("### Settings")
lines.append("")
lines.append(f"- **Protocol**: {blueprint.protocol}")
lines.append(f"- **Trials**: {blueprint.n_trials}")
lines.append(f"- **Sampler**: {blueprint.sampler}")
lines.append("")
# Warnings
if blueprint.warnings_acknowledged:
lines.append("### Acknowledged Warnings")
lines.append("")
for warning in blueprint.warnings_acknowledged:
lines.append(f"- {warning}")
lines.append("")
lines.append("---")
lines.append("")
lines.append("Does this look correct? Reply **yes** to generate the study, or describe what to change.")
return "\n".join(lines)
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""Format a warning message for display."""
icons = {
"error": "X",
"warning": "!",
"info": "i"
}
icon = icons.get(severity, "!")
if severity == "error":
return f"\n**[{icon}] ERROR**: {warning}\n"
elif severity == "warning":
return f"\n**[{icon}] Warning**: {warning}\n"
else:
return f"\n*[{icon}] Note*: {warning}\n"
def show_progress(self, current: int, total: int, phase: str) -> str:
"""Format progress indicator."""
percentage = int((current / total) * 100) if total > 0 else 0
bar_length = 20
filled = int(bar_length * current / total) if total > 0 else 0
bar = "=" * filled + "-" * (bar_length - filled)
return f"**Progress**: [{bar}] {percentage}% - {phase}"
class DashboardPresenter(InterviewPresenter):
"""
Presenter for dashboard UI mode (future).
Emits WebSocket events for React UI to render.
"""
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""Emit WebSocket event for dashboard to render."""
# This would emit an event to the dashboard
# For now, return JSON representation
import json
return json.dumps({
"type": "question",
"data": {
"question_id": question.id,
"question_number": question_number,
"total_questions": total_questions,
"category": category_name,
"text": question.text,
"question_type": question.question_type,
"options": [{"value": o.value, "label": o.label} for o in (question.options or [])],
"help_text": question.help_text,
"default": question.default,
"dynamic_content": dynamic_content,
}
})
def parse_response(self, response: str, question: Question) -> Any:
"""Parse JSON response from dashboard."""
import json
try:
data = json.loads(response)
return data.get("value", response)
except json.JSONDecodeError:
# Fall back to Claude parser
claude = ClaudePresenter()
return claude.parse_response(response, question)
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""Emit summary event for dashboard."""
import json
return json.dumps({
"type": "summary",
"data": blueprint.to_dict() if hasattr(blueprint, 'to_dict') else str(blueprint)
})
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""Emit warning event for dashboard."""
import json
return json.dumps({
"type": "warning",
"data": {"message": warning, "severity": severity}
})
def show_progress(self, current: int, total: int, phase: str) -> str:
"""Emit progress event for dashboard."""
import json
return json.dumps({
"type": "progress",
"data": {"current": current, "total": total, "phase": phase}
})
class CLIPresenter(InterviewPresenter):
"""
Presenter for CLI wizard mode (future).
Interactive terminal prompts using Rich/Questionary.
"""
def present_question(
self,
question: Question,
question_number: int,
total_questions: int,
category_name: str,
dynamic_content: Optional[str] = None
) -> str:
"""Format for CLI display."""
# Simple text format for CLI
lines = []
lines.append(f"\n[{question_number}/{total_questions}] {category_name}")
lines.append("-" * 50)
lines.append(question.text)
if question.options:
for i, opt in enumerate(question.options, 1):
lines.append(f" {i}. {opt.label}")
if question.help_text:
lines.append(f"\nHint: {question.help_text}")
lines.append("")
return "\n".join(lines)
def parse_response(self, response: str, question: Question) -> Any:
"""Parse CLI response (delegate to Claude parser)."""
claude = ClaudePresenter()
return claude.parse_response(response, question)
def show_summary(self, blueprint: "StudyBlueprint") -> str:
"""Format summary for CLI."""
claude = ClaudePresenter()
return claude.show_summary(blueprint)
def show_warning(self, warning: str, severity: str = "warning") -> str:
"""Format warning for CLI."""
icons = {"error": "[ERROR]", "warning": "[WARN]", "info": "[INFO]"}
return f"\n{icons.get(severity, '[WARN]')} {warning}\n"
def show_progress(self, current: int, total: int, phase: str) -> str:
"""Format progress for CLI."""
return f"Progress: {current}/{total} ({phase})"
# Import for type hints
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .study_blueprint import StudyBlueprint