""" Problem Analyzer - Analyze test results and generate fix plans using Gemini. Handles: - Root cause analysis from test failures - Pattern detection across failures - Fix plan generation - Priority assessment """ import asyncio import json import logging from dataclasses import dataclass, field from datetime import datetime from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class Issue: """A detected issue from test results.""" id: str description: str severity: str = "medium" # "critical", "high", "medium", "low" category: str = "unknown" affected_files: List[str] = field(default_factory=list) test_ids: List[str] = field(default_factory=list) root_cause: Optional[str] = None @dataclass class FixPlan: """Plan for fixing an issue.""" issue_id: str approach: str steps: List[Dict] = field(default_factory=list) estimated_effort: str = "medium" rollback_steps: List[str] = field(default_factory=list) @dataclass class AnalysisReport: """Complete analysis report.""" timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) issues_found: bool = False issues: List[Issue] = field(default_factory=list) fix_plans: Dict[str, FixPlan] = field(default_factory=dict) patterns: List[Dict] = field(default_factory=list) recommendations: List[str] = field(default_factory=list) class ProblemAnalyzer: """ Gemini-powered analysis of test failures and improvement opportunities. Capabilities: - Deep analysis of test results - Root cause identification - Pattern detection across failures - Fix plan generation with priority """ def __init__(self, gemini_planner: Optional[Any] = None): """ Initialize the analyzer. Args: gemini_planner: GeminiPlanner instance for API access """ self._planner = gemini_planner self._history: List[AnalysisReport] = [] @property def planner(self): """Get or create Gemini planner.""" if self._planner is None: from .planning import GeminiPlanner self._planner = GeminiPlanner() return self._planner async def analyze_test_results(self, test_report: Dict) -> Dict: """ Perform deep analysis of test results. Args: test_report: Test report from DashboardTestRunner Returns: Analysis dict with issues, fix_plans, patterns """ summary = test_report.get("summary", {}) scenarios = test_report.get("scenarios", []) # Quick return if all passed if summary.get("failed", 0) == 0: return { "issues_found": False, "issues": [], "fix_plans": {}, "patterns": [], "recommendations": ["All tests passed!"], } # Analyze failures failures = [s for s in scenarios if not s.get("passed", True)] # Use Gemini for deep analysis if available if self.planner.client != "mock": return await self._gemini_analysis(test_report, failures) else: return self._rule_based_analysis(test_report, failures) async def _gemini_analysis(self, test_report: Dict, failures: List[Dict]) -> Dict: """Use Gemini for sophisticated analysis.""" prompt = self._build_analysis_prompt(test_report, failures) try: loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.planner._model.generate_content(prompt) ) text = response.text # Parse JSON from response if "```json" in text: start = text.find("```json") + 7 end = text.find("```", start) json_str = text[start:end].strip() analysis = json.loads(json_str) else: analysis = self._rule_based_analysis(test_report, failures) logger.info(f"Gemini analysis found {len(analysis.get('issues', []))} issues") return analysis except Exception as e: logger.error(f"Gemini analysis failed: {e}, falling back to rule-based") return self._rule_based_analysis(test_report, failures) def _build_analysis_prompt(self, test_report: Dict, failures: List[Dict]) -> str: """Build analysis prompt for Gemini.""" return f"""## Test Failure Analysis ### Test Report Summary - Total Tests: {test_report.get("summary", {}).get("total", 0)} - Passed: {test_report.get("summary", {}).get("passed", 0)} - Failed: {test_report.get("summary", {}).get("failed", 0)} ### Failed Tests {json.dumps(failures, indent=2)} ### Analysis Required Analyze these test failures and provide: 1. **Root Cause Analysis**: What caused each failure? 2. **Pattern Detection**: Are there recurring issues? 3. **Fix Priority**: Which issues should be addressed first? 4. **Implementation Plan**: Specific code changes needed Output as JSON: ```json {{ "issues_found": true, "issues": [ {{ "id": "issue_001", "description": "What went wrong", "severity": "high|medium|low", "category": "api|ui|config|filesystem|logic", "affected_files": ["path/to/file.py"], "test_ids": ["test_001"], "root_cause": "Why it happened" }} ], "fix_plans": {{ "issue_001": {{ "issue_id": "issue_001", "approach": "How to fix it", "steps": [ {{"action": "edit", "file": "path/to/file.py", "description": "Change X to Y"}} ], "estimated_effort": "low|medium|high", "rollback_steps": ["How to undo if needed"] }} }}, "patterns": [ {{"pattern": "Common issue type", "occurrences": 3, "suggestion": "Systemic fix"}} ], "recommendations": [ "High-level improvement suggestions" ] }} ``` Focus on actionable, specific fixes that Claude Code can implement. """ def _rule_based_analysis(self, test_report: Dict, failures: List[Dict]) -> Dict: """Rule-based analysis when Gemini is not available.""" issues = [] fix_plans = {} patterns = [] # Categorize failures api_failures = [] filesystem_failures = [] browser_failures = [] cli_failures = [] for failure in failures: scenario_id = failure.get("scenario_id", "unknown") error = failure.get("error", "") details = failure.get("details", {}) # Detect issue type if "api" in scenario_id.lower() or "status_code" in details: api_failures.append(failure) elif "filesystem" in scenario_id.lower() or "exists" in details: filesystem_failures.append(failure) elif "browser" in scenario_id.lower(): browser_failures.append(failure) elif "cli" in scenario_id.lower() or "command" in details: cli_failures.append(failure) # Generate issues for API failures for i, failure in enumerate(api_failures): issue_id = f"api_issue_{i + 1}" status = failure.get("details", {}).get("status_code", "unknown") issues.append( { "id": issue_id, "description": f"API request failed with status {status}", "severity": "high" if status in [500, 503] else "medium", "category": "api", "affected_files": self._guess_api_files(failure), "test_ids": [failure.get("scenario_id")], "root_cause": failure.get("error", "Unknown API error"), } ) fix_plans[issue_id] = { "issue_id": issue_id, "approach": "Check API endpoint implementation", "steps": [ {"action": "check", "description": "Verify endpoint exists in routes"}, {"action": "test", "description": "Run endpoint manually with curl"}, ], "estimated_effort": "medium", "rollback_steps": [], } # Generate issues for filesystem failures for i, failure in enumerate(filesystem_failures): issue_id = f"fs_issue_{i + 1}" path = failure.get("details", {}).get("path", "unknown path") issues.append( { "id": issue_id, "description": f"Expected file/directory not found: {path}", "severity": "high", "category": "filesystem", "affected_files": [path], "test_ids": [failure.get("scenario_id")], "root_cause": "File was not created during implementation", } ) fix_plans[issue_id] = { "issue_id": issue_id, "approach": "Create missing file/directory", "steps": [ {"action": "create", "path": path, "description": f"Create {path}"}, ], "estimated_effort": "low", "rollback_steps": [f"Remove {path}"], } # Detect patterns if len(api_failures) > 1: patterns.append( { "pattern": "Multiple API failures", "occurrences": len(api_failures), "suggestion": "Check if backend server is running", } ) if len(filesystem_failures) > 1: patterns.append( { "pattern": "Multiple missing files", "occurrences": len(filesystem_failures), "suggestion": "Review study creation process", } ) # Generate recommendations recommendations = [] if api_failures: recommendations.append("Verify backend API is running on port 8000") if filesystem_failures: recommendations.append("Check that study directory structure is correctly created") if browser_failures: recommendations.append("Ensure frontend is running on port 3000") if cli_failures: recommendations.append("Check Python environment and script paths") return { "issues_found": len(issues) > 0, "issues": issues, "fix_plans": fix_plans, "patterns": patterns, "recommendations": recommendations, } def _guess_api_files(self, failure: Dict) -> List[str]: """Guess which API files might be affected.""" endpoint = failure.get("details", {}).get("response", {}) # Common API file patterns return [ "atomizer-dashboard/backend/api/routes/", "atomizer-dashboard/backend/api/services/", ] async def analyze_iteration_history(self, iterations: List[Dict]) -> Dict: """ Analyze patterns across multiple iterations. Args: iterations: List of IterationResult dicts Returns: Cross-iteration analysis """ recurring_issues = {} success_rate = 0 for iteration in iterations: if iteration.get("success"): success_rate += 1 # Track recurring issues analysis = iteration.get("analysis", {}) for issue in analysis.get("issues", []): issue_type = issue.get("category", "unknown") if issue_type not in recurring_issues: recurring_issues[issue_type] = 0 recurring_issues[issue_type] += 1 total = len(iterations) or 1 return { "total_iterations": len(iterations), "success_rate": success_rate / total, "recurring_issues": recurring_issues, "most_common_issue": max(recurring_issues, key=recurring_issues.get) if recurring_issues else None, "recommendation": self._generate_meta_recommendation( recurring_issues, success_rate / total ), } def _generate_meta_recommendation(self, recurring_issues: Dict, success_rate: float) -> str: """Generate high-level recommendation based on iteration history.""" if success_rate >= 0.8: return "Development cycle is healthy. Minor issues detected." elif success_rate >= 0.5: most_common = ( max(recurring_issues, key=recurring_issues.get) if recurring_issues else "unknown" ) return f"Focus on fixing {most_common} issues to improve success rate." else: return ( "Development cycle needs attention. Consider reviewing architecture or test design." ) def get_priority_queue(self, analysis: Dict) -> List[Dict]: """ Get issues sorted by priority for fixing. Args: analysis: Analysis result dict Returns: Sorted list of issues with their fix plans """ issues = analysis.get("issues", []) fix_plans = analysis.get("fix_plans", {}) # Priority order severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} # Sort by severity sorted_issues = sorted( issues, key=lambda x: severity_order.get(x.get("severity", "medium"), 2) ) # Attach fix plans queue = [] for issue in sorted_issues: issue_id = issue.get("id") queue.append( { "issue": issue, "fix_plan": fix_plans.get(issue_id), } ) return queue