#!/usr/bin/env python3 """ DevLoop CLI - Command-line interface for closed-loop development. Uses your CLI subscriptions: - OpenCode CLI (Gemini) for planning and analysis - Claude Code CLI for implementation Usage: python devloop_cli.py start "Create support_arm study" python devloop_cli.py plan "Fix dashboard validation" python devloop_cli.py implement plan.json python devloop_cli.py test --study support_arm python devloop_cli.py analyze test_results.json python devloop_cli.py status """ import argparse import asyncio import json import sys from pathlib import Path # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) async def start_cycle(objective: str, max_iterations: int = 5): """Start a development cycle using CLI tools.""" from optimization_engine.devloop.cli_bridge import DevLoopCLIOrchestrator print(f"Starting DevLoop cycle: {objective}") print("=" * 60) print("Using: OpenCode (Gemini) for planning, Claude Code for implementation") print("=" * 60) orchestrator = DevLoopCLIOrchestrator() result = await orchestrator.run_cycle( objective=objective, max_iterations=max_iterations, ) print("\n" + "=" * 60) print(f"Cycle complete: {result['status']}") print(f" Iterations: {len(result['iterations'])}") print(f" Duration: {result.get('duration_seconds', 0):.1f}s") for i, iter_result in enumerate(result["iterations"], 1): impl = iter_result.get("implementation", {}) tests = iter_result.get("test_results", {}).get("summary", {}) print(f"\n Iteration {i}:") print(f" Implementation: {'OK' if impl.get('success') else 'FAILED'}") print(f" Tests: {tests.get('passed', 0)}/{tests.get('total', 0)} passed") return result async def run_plan(objective: str, context_file: str = None): """Run only the planning phase with Gemini via OpenCode.""" from optimization_engine.devloop.cli_bridge import OpenCodeCLI print(f"Planning with Gemini (OpenCode): {objective}") print("-" * 60) workspace = Path("C:/Users/antoi/Atomizer") opencode = OpenCodeCLI(workspace) context = None if context_file: with open(context_file) as f: context = json.load(f) plan = await opencode.plan(objective, context) print("\nPlan created:") print(json.dumps(plan, indent=2)) # Save plan to file plan_file = workspace / ".devloop" / "current_plan.json" plan_file.parent.mkdir(exist_ok=True) with open(plan_file, "w") as f: json.dump(plan, f, indent=2) print(f"\nPlan saved to: {plan_file}") return plan async def run_implement(plan_file: str = None): """Run only the implementation phase with Claude Code.""" from optimization_engine.devloop.cli_bridge import DevLoopCLIOrchestrator workspace = Path("C:/Users/antoi/Atomizer") # Load plan if plan_file: plan_path = Path(plan_file) else: plan_path = workspace / ".devloop" / "current_plan.json" if not plan_path.exists(): print(f"Error: Plan file not found: {plan_path}") print("Run 'devloop_cli.py plan ' first") return None with open(plan_path) as f: plan = json.load(f) print(f"Implementing plan: {plan.get('objective', 'Unknown')}") print("-" * 60) print(f"Tasks: {len(plan.get('tasks', []))}") orchestrator = DevLoopCLIOrchestrator(workspace) result = await orchestrator.step_implement(plan) print(f"\nImplementation {'succeeded' if result.success else 'failed'}") print(f" Duration: {result.duration_seconds:.1f}s") print(f" Files modified: {len(result.files_modified)}") for f in result.files_modified: print(f" - {f}") if result.error: print(f"\nError: {result.error}") return result async def run_browser_tests(level: str = "quick", study_name: str = None): """Run browser tests using Playwright via DevLoop.""" from optimization_engine.devloop.test_runner import DashboardTestRunner from optimization_engine.devloop.browser_scenarios import get_browser_scenarios print(f"Running browser tests (level={level})") print("-" * 60) runner = DashboardTestRunner() scenarios = get_browser_scenarios(level=level, study_name=study_name) print(f"Scenarios: {len(scenarios)}") for s in scenarios: print(f" - {s['name']}") results = await runner.run_test_suite(scenarios) summary = results.get("summary", {}) print(f"\nResults: {summary.get('passed', 0)}/{summary.get('total', 0)} passed") for scenario in results.get("scenarios", []): status = "PASS" if scenario.get("passed") else "FAIL" print(f" [{status}] {scenario.get('scenario_name')}") if not scenario.get("passed") and scenario.get("error"): print(f" Error: {scenario.get('error')}") # Save results workspace = Path("C:/Users/antoi/Atomizer") results_file = workspace / ".devloop" / "browser_test_results.json" results_file.parent.mkdir(exist_ok=True) with open(results_file, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to: {results_file}") return results async def run_tests( study_name: str = None, scenarios_file: str = None, include_browser: bool = False ): """Run tests for a specific study or from scenarios file.""" from optimization_engine.devloop.test_runner import DashboardTestRunner runner = DashboardTestRunner() if scenarios_file: with open(scenarios_file) as f: scenarios = json.load(f) elif study_name: print(f"Running tests for study: {study_name}") print("-" * 60) # Find the study - check both flat and nested locations from pathlib import Path studies_root = Path("studies") # Check flat structure first (studies/study_name) if (studies_root / study_name).exists(): study_path = f"studies/{study_name}" # Then check nested _Other structure elif (studies_root / "_Other" / study_name).exists(): study_path = f"studies/_Other/{study_name}" # Check other topic folders else: study_path = None for topic_dir in studies_root.iterdir(): if topic_dir.is_dir() and (topic_dir / study_name).exists(): study_path = f"studies/{topic_dir.name}/{study_name}" break if not study_path: study_path = f"studies/{study_name}" # Default, will fail gracefully print(f"Study path: {study_path}") # Generate test scenarios for the study scenarios = [ { "id": "test_study_dir", "name": f"Study directory exists: {study_name}", "type": "filesystem", "steps": [{"action": "check_exists", "path": study_path}], "expected_outcome": {"exists": True}, }, { "id": "test_spec", "name": "AtomizerSpec is valid JSON", "type": "filesystem", "steps": [ { "action": "check_json_valid", "path": f"{study_path}/atomizer_spec.json", } ], "expected_outcome": {"valid_json": True}, }, { "id": "test_readme", "name": "README exists", "type": "filesystem", "steps": [{"action": "check_exists", "path": f"{study_path}/README.md"}], "expected_outcome": {"exists": True}, }, { "id": "test_run_script", "name": "run_optimization.py exists", "type": "filesystem", "steps": [ { "action": "check_exists", "path": f"{study_path}/run_optimization.py", } ], "expected_outcome": {"exists": True}, }, { "id": "test_model_dir", "name": "Model directory exists", "type": "filesystem", "steps": [{"action": "check_exists", "path": f"{study_path}/1_setup/model"}], "expected_outcome": {"exists": True}, }, ] else: print("Error: Provide --study or --scenarios") return None results = await runner.run_test_suite(scenarios) summary = results.get("summary", {}) print(f"\nResults: {summary.get('passed', 0)}/{summary.get('total', 0)} passed") for scenario in results.get("scenarios", []): status = "PASS" if scenario.get("passed") else "FAIL" print(f" [{status}] {scenario.get('scenario_name')}") if not scenario.get("passed") and scenario.get("error"): print(f" Error: {scenario.get('error')}") # Save results workspace = Path("C:/Users/antoi/Atomizer") results_file = workspace / ".devloop" / "test_results.json" results_file.parent.mkdir(exist_ok=True) with open(results_file, "w") as f: json.dump(results, f, indent=2) print(f"\nResults saved to: {results_file}") return results async def run_analyze(results_file: str = None): """Analyze test results with Gemini via OpenCode.""" from optimization_engine.devloop.cli_bridge import OpenCodeCLI workspace = Path("C:/Users/antoi/Atomizer") # Load results if results_file: results_path = Path(results_file) else: results_path = workspace / ".devloop" / "test_results.json" if not results_path.exists(): print(f"Error: Results file not found: {results_path}") print("Run 'devloop_cli.py test --study ' first") return None with open(results_path) as f: test_results = json.load(f) print("Analyzing test results with Gemini (OpenCode)...") print("-" * 60) opencode = OpenCodeCLI(workspace) analysis = await opencode.analyze(test_results) print(f"\nAnalysis complete:") print(f" Issues found: {analysis.get('issues_found', False)}") for issue in analysis.get("issues", []): print(f"\n Issue: {issue.get('id')}") print(f" Description: {issue.get('description')}") print(f" Severity: {issue.get('severity')}") print(f" Root cause: {issue.get('root_cause')}") for rec in analysis.get("recommendations", []): print(f"\n Recommendation: {rec}") # Save analysis analysis_file = workspace / ".devloop" / "analysis.json" with open(analysis_file, "w") as f: json.dump(analysis, f, indent=2) print(f"\nAnalysis saved to: {analysis_file}") return analysis async def show_status(): """Show current DevLoop status.""" workspace = Path("C:/Users/antoi/Atomizer") devloop_dir = workspace / ".devloop" print("DevLoop Status") print("=" * 60) # Check for existing files plan_file = devloop_dir / "current_plan.json" results_file = devloop_dir / "test_results.json" analysis_file = devloop_dir / "analysis.json" if plan_file.exists(): with open(plan_file) as f: plan = json.load(f) print(f"\nCurrent Plan: {plan.get('objective', 'Unknown')}") print(f" Tasks: {len(plan.get('tasks', []))}") else: print("\nNo current plan") if results_file.exists(): with open(results_file) as f: results = json.load(f) summary = results.get("summary", {}) print(f"\nLast Test Results:") print(f" Passed: {summary.get('passed', 0)}/{summary.get('total', 0)}") else: print("\nNo test results") if analysis_file.exists(): with open(analysis_file) as f: analysis = json.load(f) print(f"\nLast Analysis:") print(f" Issues: {len(analysis.get('issues', []))}") else: print("\nNo analysis") print("\n" + "=" * 60) print("CLI Tools:") print(" - Claude Code: C:\\Users\\antoi\\.local\\bin\\claude.exe") print(" - OpenCode: C:\\Users\\antoi\\AppData\\Roaming\\npm\\opencode.cmd") async def quick_support_arm(): """Quick test with support_arm study.""" print("Quick DevLoop test with support_arm study") print("=" * 60) # Test the study results = await run_tests(study_name="support_arm") if results and results.get("summary", {}).get("failed", 0) == 0: print("\n" + "=" * 60) print("SUCCESS: support_arm study is properly configured!") print("\nNext steps:") print( " 1. Run optimization: cd studies/_Other/support_arm && python run_optimization.py --test" ) print(" 2. Start dashboard: cd atomizer-dashboard && npm run dev") print(" 3. View in canvas: http://localhost:3000/canvas/support_arm") else: print("\n" + "=" * 60) print("Some tests failed. Running analysis...") await run_analyze() def main(): parser = argparse.ArgumentParser( description="DevLoop CLI - Closed-loop development using CLI subscriptions", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run full development cycle python devloop_cli.py start "Create new bracket study" # Step-by-step execution python devloop_cli.py plan "Fix dashboard validation" python devloop_cli.py implement python devloop_cli.py test --study support_arm python devloop_cli.py analyze # Browser tests (Playwright) python devloop_cli.py browser # Quick smoke test python devloop_cli.py browser --level full # All UI tests python devloop_cli.py browser --study support_arm # Study-specific # Quick test python devloop_cli.py quick Tools used: - OpenCode (Gemini): Planning and analysis - Claude Code: Implementation and fixes - Playwright: Browser UI testing """, ) subparsers = parser.add_subparsers(dest="command", help="Commands") # Start command - full cycle start_parser = subparsers.add_parser("start", help="Start a full development cycle") start_parser.add_argument("objective", help="What to achieve") start_parser.add_argument("--max-iterations", type=int, default=5, help="Max fix iterations") # Plan command plan_parser = subparsers.add_parser("plan", help="Create plan with Gemini (OpenCode)") plan_parser.add_argument("objective", help="What to plan") plan_parser.add_argument("--context", help="Context JSON file") # Implement command impl_parser = subparsers.add_parser("implement", help="Implement plan with Claude Code") impl_parser.add_argument("--plan", help="Plan JSON file (default: .devloop/current_plan.json)") # Test command test_parser = subparsers.add_parser("test", help="Run tests") test_parser.add_argument("--study", help="Study name to test") test_parser.add_argument("--scenarios", help="Test scenarios JSON file") # Analyze command analyze_parser = subparsers.add_parser("analyze", help="Analyze results with Gemini (OpenCode)") analyze_parser.add_argument("--results", help="Test results JSON file") # Status command subparsers.add_parser("status", help="Show current DevLoop status") # Quick command subparsers.add_parser("quick", help="Quick test with support_arm study") # Browser command browser_parser = subparsers.add_parser("browser", help="Run browser UI tests with Playwright") browser_parser.add_argument( "--level", choices=["quick", "home", "full", "study"], default="quick", help="Test level: quick (smoke), home (home page), full (all), study (study-specific)", ) browser_parser.add_argument("--study", help="Study name for study-specific tests") args = parser.parse_args() if args.command == "start": asyncio.run(start_cycle(args.objective, args.max_iterations)) elif args.command == "plan": asyncio.run(run_plan(args.objective, args.context)) elif args.command == "implement": asyncio.run(run_implement(args.plan)) elif args.command == "test": asyncio.run(run_tests(args.study, args.scenarios)) elif args.command == "analyze": asyncio.run(run_analyze(args.results)) elif args.command == "status": asyncio.run(show_status()) elif args.command == "quick": asyncio.run(quick_support_arm()) elif args.command == "browser": asyncio.run(run_browser_tests(args.level, args.study)) else: parser.print_help() if __name__ == "__main__": main()