486 lines
16 KiB
Python
486 lines
16 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
DevLoop CLI - Command-line interface for closed-loop development.
|
||
|
|
|
||
|
|
Uses your CLI subscriptions:
|
||
|
|
- OpenCode CLI (Gemini) for planning and analysis
|
||
|
|
- Claude Code CLI for implementation
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python devloop_cli.py start "Create support_arm study"
|
||
|
|
python devloop_cli.py plan "Fix dashboard validation"
|
||
|
|
python devloop_cli.py implement plan.json
|
||
|
|
python devloop_cli.py test --study support_arm
|
||
|
|
python devloop_cli.py analyze test_results.json
|
||
|
|
python devloop_cli.py status
|
||
|
|
"""
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
# Add project root to path
|
||
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
|
|
||
|
|
|
||
|
|
async def start_cycle(objective: str, max_iterations: int = 5):
|
||
|
|
"""Start a development cycle using CLI tools."""
|
||
|
|
from optimization_engine.devloop.cli_bridge import DevLoopCLIOrchestrator
|
||
|
|
|
||
|
|
print(f"Starting DevLoop cycle: {objective}")
|
||
|
|
print("=" * 60)
|
||
|
|
print("Using: OpenCode (Gemini) for planning, Claude Code for implementation")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
orchestrator = DevLoopCLIOrchestrator()
|
||
|
|
|
||
|
|
result = await orchestrator.run_cycle(
|
||
|
|
objective=objective,
|
||
|
|
max_iterations=max_iterations,
|
||
|
|
)
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print(f"Cycle complete: {result['status']}")
|
||
|
|
print(f" Iterations: {len(result['iterations'])}")
|
||
|
|
print(f" Duration: {result.get('duration_seconds', 0):.1f}s")
|
||
|
|
|
||
|
|
for i, iter_result in enumerate(result["iterations"], 1):
|
||
|
|
impl = iter_result.get("implementation", {})
|
||
|
|
tests = iter_result.get("test_results", {}).get("summary", {})
|
||
|
|
print(f"\n Iteration {i}:")
|
||
|
|
print(f" Implementation: {'OK' if impl.get('success') else 'FAILED'}")
|
||
|
|
print(f" Tests: {tests.get('passed', 0)}/{tests.get('total', 0)} passed")
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
async def run_plan(objective: str, context_file: str = None):
|
||
|
|
"""Run only the planning phase with Gemini via OpenCode."""
|
||
|
|
from optimization_engine.devloop.cli_bridge import OpenCodeCLI
|
||
|
|
|
||
|
|
print(f"Planning with Gemini (OpenCode): {objective}")
|
||
|
|
print("-" * 60)
|
||
|
|
|
||
|
|
workspace = Path("C:/Users/antoi/Atomizer")
|
||
|
|
opencode = OpenCodeCLI(workspace)
|
||
|
|
|
||
|
|
context = None
|
||
|
|
if context_file:
|
||
|
|
with open(context_file) as f:
|
||
|
|
context = json.load(f)
|
||
|
|
|
||
|
|
plan = await opencode.plan(objective, context)
|
||
|
|
|
||
|
|
print("\nPlan created:")
|
||
|
|
print(json.dumps(plan, indent=2))
|
||
|
|
|
||
|
|
# Save plan to file
|
||
|
|
plan_file = workspace / ".devloop" / "current_plan.json"
|
||
|
|
plan_file.parent.mkdir(exist_ok=True)
|
||
|
|
with open(plan_file, "w") as f:
|
||
|
|
json.dump(plan, f, indent=2)
|
||
|
|
print(f"\nPlan saved to: {plan_file}")
|
||
|
|
|
||
|
|
return plan
|
||
|
|
|
||
|
|
|
||
|
|
async def run_implement(plan_file: str = None):
|
||
|
|
"""Run only the implementation phase with Claude Code."""
|
||
|
|
from optimization_engine.devloop.cli_bridge import DevLoopCLIOrchestrator
|
||
|
|
|
||
|
|
workspace = Path("C:/Users/antoi/Atomizer")
|
||
|
|
|
||
|
|
# Load plan
|
||
|
|
if plan_file:
|
||
|
|
plan_path = Path(plan_file)
|
||
|
|
else:
|
||
|
|
plan_path = workspace / ".devloop" / "current_plan.json"
|
||
|
|
|
||
|
|
if not plan_path.exists():
|
||
|
|
print(f"Error: Plan file not found: {plan_path}")
|
||
|
|
print("Run 'devloop_cli.py plan <objective>' first")
|
||
|
|
return None
|
||
|
|
|
||
|
|
with open(plan_path) as f:
|
||
|
|
plan = json.load(f)
|
||
|
|
|
||
|
|
print(f"Implementing plan: {plan.get('objective', 'Unknown')}")
|
||
|
|
print("-" * 60)
|
||
|
|
print(f"Tasks: {len(plan.get('tasks', []))}")
|
||
|
|
|
||
|
|
orchestrator = DevLoopCLIOrchestrator(workspace)
|
||
|
|
result = await orchestrator.step_implement(plan)
|
||
|
|
|
||
|
|
print(f"\nImplementation {'succeeded' if result.success else 'failed'}")
|
||
|
|
print(f" Duration: {result.duration_seconds:.1f}s")
|
||
|
|
print(f" Files modified: {len(result.files_modified)}")
|
||
|
|
for f in result.files_modified:
|
||
|
|
print(f" - {f}")
|
||
|
|
|
||
|
|
if result.error:
|
||
|
|
print(f"\nError: {result.error}")
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
async def run_browser_tests(level: str = "quick", study_name: str = None):
|
||
|
|
"""Run browser tests using Playwright via DevLoop."""
|
||
|
|
from optimization_engine.devloop.test_runner import DashboardTestRunner
|
||
|
|
from optimization_engine.devloop.browser_scenarios import get_browser_scenarios
|
||
|
|
|
||
|
|
print(f"Running browser tests (level={level})")
|
||
|
|
print("-" * 60)
|
||
|
|
|
||
|
|
runner = DashboardTestRunner()
|
||
|
|
scenarios = get_browser_scenarios(level=level, study_name=study_name)
|
||
|
|
|
||
|
|
print(f"Scenarios: {len(scenarios)}")
|
||
|
|
for s in scenarios:
|
||
|
|
print(f" - {s['name']}")
|
||
|
|
|
||
|
|
results = await runner.run_test_suite(scenarios)
|
||
|
|
|
||
|
|
summary = results.get("summary", {})
|
||
|
|
print(f"\nResults: {summary.get('passed', 0)}/{summary.get('total', 0)} passed")
|
||
|
|
|
||
|
|
for scenario in results.get("scenarios", []):
|
||
|
|
status = "PASS" if scenario.get("passed") else "FAIL"
|
||
|
|
print(f" [{status}] {scenario.get('scenario_name')}")
|
||
|
|
if not scenario.get("passed") and scenario.get("error"):
|
||
|
|
print(f" Error: {scenario.get('error')}")
|
||
|
|
|
||
|
|
# Save results
|
||
|
|
workspace = Path("C:/Users/antoi/Atomizer")
|
||
|
|
results_file = workspace / ".devloop" / "browser_test_results.json"
|
||
|
|
results_file.parent.mkdir(exist_ok=True)
|
||
|
|
with open(results_file, "w") as f:
|
||
|
|
json.dump(results, f, indent=2)
|
||
|
|
print(f"\nResults saved to: {results_file}")
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
async def run_tests(
|
||
|
|
study_name: str = None, scenarios_file: str = None, include_browser: bool = False
|
||
|
|
):
|
||
|
|
"""Run tests for a specific study or from scenarios file."""
|
||
|
|
from optimization_engine.devloop.test_runner import DashboardTestRunner
|
||
|
|
|
||
|
|
runner = DashboardTestRunner()
|
||
|
|
|
||
|
|
if scenarios_file:
|
||
|
|
with open(scenarios_file) as f:
|
||
|
|
scenarios = json.load(f)
|
||
|
|
elif study_name:
|
||
|
|
print(f"Running tests for study: {study_name}")
|
||
|
|
print("-" * 60)
|
||
|
|
|
||
|
|
# Find the study - check both flat and nested locations
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
studies_root = Path("studies")
|
||
|
|
|
||
|
|
# Check flat structure first (studies/study_name)
|
||
|
|
if (studies_root / study_name).exists():
|
||
|
|
study_path = f"studies/{study_name}"
|
||
|
|
# Then check nested _Other structure
|
||
|
|
elif (studies_root / "_Other" / study_name).exists():
|
||
|
|
study_path = f"studies/_Other/{study_name}"
|
||
|
|
# Check other topic folders
|
||
|
|
else:
|
||
|
|
study_path = None
|
||
|
|
for topic_dir in studies_root.iterdir():
|
||
|
|
if topic_dir.is_dir() and (topic_dir / study_name).exists():
|
||
|
|
study_path = f"studies/{topic_dir.name}/{study_name}"
|
||
|
|
break
|
||
|
|
if not study_path:
|
||
|
|
study_path = f"studies/{study_name}" # Default, will fail gracefully
|
||
|
|
|
||
|
|
print(f"Study path: {study_path}")
|
||
|
|
|
||
|
|
# Generate test scenarios for the study
|
||
|
|
scenarios = [
|
||
|
|
{
|
||
|
|
"id": "test_study_dir",
|
||
|
|
"name": f"Study directory exists: {study_name}",
|
||
|
|
"type": "filesystem",
|
||
|
|
"steps": [{"action": "check_exists", "path": study_path}],
|
||
|
|
"expected_outcome": {"exists": True},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"id": "test_spec",
|
||
|
|
"name": "AtomizerSpec is valid JSON",
|
||
|
|
"type": "filesystem",
|
||
|
|
"steps": [
|
||
|
|
{
|
||
|
|
"action": "check_json_valid",
|
||
|
|
"path": f"{study_path}/atomizer_spec.json",
|
||
|
|
}
|
||
|
|
],
|
||
|
|
"expected_outcome": {"valid_json": True},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"id": "test_readme",
|
||
|
|
"name": "README exists",
|
||
|
|
"type": "filesystem",
|
||
|
|
"steps": [{"action": "check_exists", "path": f"{study_path}/README.md"}],
|
||
|
|
"expected_outcome": {"exists": True},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"id": "test_run_script",
|
||
|
|
"name": "run_optimization.py exists",
|
||
|
|
"type": "filesystem",
|
||
|
|
"steps": [
|
||
|
|
{
|
||
|
|
"action": "check_exists",
|
||
|
|
"path": f"{study_path}/run_optimization.py",
|
||
|
|
}
|
||
|
|
],
|
||
|
|
"expected_outcome": {"exists": True},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"id": "test_model_dir",
|
||
|
|
"name": "Model directory exists",
|
||
|
|
"type": "filesystem",
|
||
|
|
"steps": [{"action": "check_exists", "path": f"{study_path}/1_setup/model"}],
|
||
|
|
"expected_outcome": {"exists": True},
|
||
|
|
},
|
||
|
|
]
|
||
|
|
else:
|
||
|
|
print("Error: Provide --study or --scenarios")
|
||
|
|
return None
|
||
|
|
|
||
|
|
results = await runner.run_test_suite(scenarios)
|
||
|
|
|
||
|
|
summary = results.get("summary", {})
|
||
|
|
print(f"\nResults: {summary.get('passed', 0)}/{summary.get('total', 0)} passed")
|
||
|
|
|
||
|
|
for scenario in results.get("scenarios", []):
|
||
|
|
status = "PASS" if scenario.get("passed") else "FAIL"
|
||
|
|
print(f" [{status}] {scenario.get('scenario_name')}")
|
||
|
|
if not scenario.get("passed") and scenario.get("error"):
|
||
|
|
print(f" Error: {scenario.get('error')}")
|
||
|
|
|
||
|
|
# Save results
|
||
|
|
workspace = Path("C:/Users/antoi/Atomizer")
|
||
|
|
results_file = workspace / ".devloop" / "test_results.json"
|
||
|
|
results_file.parent.mkdir(exist_ok=True)
|
||
|
|
with open(results_file, "w") as f:
|
||
|
|
json.dump(results, f, indent=2)
|
||
|
|
print(f"\nResults saved to: {results_file}")
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
async def run_analyze(results_file: str = None):
|
||
|
|
"""Analyze test results with Gemini via OpenCode."""
|
||
|
|
from optimization_engine.devloop.cli_bridge import OpenCodeCLI
|
||
|
|
|
||
|
|
workspace = Path("C:/Users/antoi/Atomizer")
|
||
|
|
|
||
|
|
# Load results
|
||
|
|
if results_file:
|
||
|
|
results_path = Path(results_file)
|
||
|
|
else:
|
||
|
|
results_path = workspace / ".devloop" / "test_results.json"
|
||
|
|
|
||
|
|
if not results_path.exists():
|
||
|
|
print(f"Error: Results file not found: {results_path}")
|
||
|
|
print("Run 'devloop_cli.py test --study <name>' first")
|
||
|
|
return None
|
||
|
|
|
||
|
|
with open(results_path) as f:
|
||
|
|
test_results = json.load(f)
|
||
|
|
|
||
|
|
print("Analyzing test results with Gemini (OpenCode)...")
|
||
|
|
print("-" * 60)
|
||
|
|
|
||
|
|
opencode = OpenCodeCLI(workspace)
|
||
|
|
analysis = await opencode.analyze(test_results)
|
||
|
|
|
||
|
|
print(f"\nAnalysis complete:")
|
||
|
|
print(f" Issues found: {analysis.get('issues_found', False)}")
|
||
|
|
|
||
|
|
for issue in analysis.get("issues", []):
|
||
|
|
print(f"\n Issue: {issue.get('id')}")
|
||
|
|
print(f" Description: {issue.get('description')}")
|
||
|
|
print(f" Severity: {issue.get('severity')}")
|
||
|
|
print(f" Root cause: {issue.get('root_cause')}")
|
||
|
|
|
||
|
|
for rec in analysis.get("recommendations", []):
|
||
|
|
print(f"\n Recommendation: {rec}")
|
||
|
|
|
||
|
|
# Save analysis
|
||
|
|
analysis_file = workspace / ".devloop" / "analysis.json"
|
||
|
|
with open(analysis_file, "w") as f:
|
||
|
|
json.dump(analysis, f, indent=2)
|
||
|
|
print(f"\nAnalysis saved to: {analysis_file}")
|
||
|
|
|
||
|
|
return analysis
|
||
|
|
|
||
|
|
|
||
|
|
async def show_status():
|
||
|
|
"""Show current DevLoop status."""
|
||
|
|
workspace = Path("C:/Users/antoi/Atomizer")
|
||
|
|
devloop_dir = workspace / ".devloop"
|
||
|
|
|
||
|
|
print("DevLoop Status")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
# Check for existing files
|
||
|
|
plan_file = devloop_dir / "current_plan.json"
|
||
|
|
results_file = devloop_dir / "test_results.json"
|
||
|
|
analysis_file = devloop_dir / "analysis.json"
|
||
|
|
|
||
|
|
if plan_file.exists():
|
||
|
|
with open(plan_file) as f:
|
||
|
|
plan = json.load(f)
|
||
|
|
print(f"\nCurrent Plan: {plan.get('objective', 'Unknown')}")
|
||
|
|
print(f" Tasks: {len(plan.get('tasks', []))}")
|
||
|
|
else:
|
||
|
|
print("\nNo current plan")
|
||
|
|
|
||
|
|
if results_file.exists():
|
||
|
|
with open(results_file) as f:
|
||
|
|
results = json.load(f)
|
||
|
|
summary = results.get("summary", {})
|
||
|
|
print(f"\nLast Test Results:")
|
||
|
|
print(f" Passed: {summary.get('passed', 0)}/{summary.get('total', 0)}")
|
||
|
|
else:
|
||
|
|
print("\nNo test results")
|
||
|
|
|
||
|
|
if analysis_file.exists():
|
||
|
|
with open(analysis_file) as f:
|
||
|
|
analysis = json.load(f)
|
||
|
|
print(f"\nLast Analysis:")
|
||
|
|
print(f" Issues: {len(analysis.get('issues', []))}")
|
||
|
|
else:
|
||
|
|
print("\nNo analysis")
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("CLI Tools:")
|
||
|
|
print(" - Claude Code: C:\\Users\\antoi\\.local\\bin\\claude.exe")
|
||
|
|
print(" - OpenCode: C:\\Users\\antoi\\AppData\\Roaming\\npm\\opencode.cmd")
|
||
|
|
|
||
|
|
|
||
|
|
async def quick_support_arm():
|
||
|
|
"""Quick test with support_arm study."""
|
||
|
|
print("Quick DevLoop test with support_arm study")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
# Test the study
|
||
|
|
results = await run_tests(study_name="support_arm")
|
||
|
|
|
||
|
|
if results and results.get("summary", {}).get("failed", 0) == 0:
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("SUCCESS: support_arm study is properly configured!")
|
||
|
|
print("\nNext steps:")
|
||
|
|
print(
|
||
|
|
" 1. Run optimization: cd studies/_Other/support_arm && python run_optimization.py --test"
|
||
|
|
)
|
||
|
|
print(" 2. Start dashboard: cd atomizer-dashboard && npm run dev")
|
||
|
|
print(" 3. View in canvas: http://localhost:3000/canvas/support_arm")
|
||
|
|
else:
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Some tests failed. Running analysis...")
|
||
|
|
await run_analyze()
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="DevLoop CLI - Closed-loop development using CLI subscriptions",
|
||
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
|
|
epilog="""
|
||
|
|
Examples:
|
||
|
|
# Run full development cycle
|
||
|
|
python devloop_cli.py start "Create new bracket study"
|
||
|
|
|
||
|
|
# Step-by-step execution
|
||
|
|
python devloop_cli.py plan "Fix dashboard validation"
|
||
|
|
python devloop_cli.py implement
|
||
|
|
python devloop_cli.py test --study support_arm
|
||
|
|
python devloop_cli.py analyze
|
||
|
|
|
||
|
|
# Browser tests (Playwright)
|
||
|
|
python devloop_cli.py browser # Quick smoke test
|
||
|
|
python devloop_cli.py browser --level full # All UI tests
|
||
|
|
python devloop_cli.py browser --study support_arm # Study-specific
|
||
|
|
|
||
|
|
# Quick test
|
||
|
|
python devloop_cli.py quick
|
||
|
|
|
||
|
|
Tools used:
|
||
|
|
- OpenCode (Gemini): Planning and analysis
|
||
|
|
- Claude Code: Implementation and fixes
|
||
|
|
- Playwright: Browser UI testing
|
||
|
|
""",
|
||
|
|
)
|
||
|
|
|
||
|
|
subparsers = parser.add_subparsers(dest="command", help="Commands")
|
||
|
|
|
||
|
|
# Start command - full cycle
|
||
|
|
start_parser = subparsers.add_parser("start", help="Start a full development cycle")
|
||
|
|
start_parser.add_argument("objective", help="What to achieve")
|
||
|
|
start_parser.add_argument("--max-iterations", type=int, default=5, help="Max fix iterations")
|
||
|
|
|
||
|
|
# Plan command
|
||
|
|
plan_parser = subparsers.add_parser("plan", help="Create plan with Gemini (OpenCode)")
|
||
|
|
plan_parser.add_argument("objective", help="What to plan")
|
||
|
|
plan_parser.add_argument("--context", help="Context JSON file")
|
||
|
|
|
||
|
|
# Implement command
|
||
|
|
impl_parser = subparsers.add_parser("implement", help="Implement plan with Claude Code")
|
||
|
|
impl_parser.add_argument("--plan", help="Plan JSON file (default: .devloop/current_plan.json)")
|
||
|
|
|
||
|
|
# Test command
|
||
|
|
test_parser = subparsers.add_parser("test", help="Run tests")
|
||
|
|
test_parser.add_argument("--study", help="Study name to test")
|
||
|
|
test_parser.add_argument("--scenarios", help="Test scenarios JSON file")
|
||
|
|
|
||
|
|
# Analyze command
|
||
|
|
analyze_parser = subparsers.add_parser("analyze", help="Analyze results with Gemini (OpenCode)")
|
||
|
|
analyze_parser.add_argument("--results", help="Test results JSON file")
|
||
|
|
|
||
|
|
# Status command
|
||
|
|
subparsers.add_parser("status", help="Show current DevLoop status")
|
||
|
|
|
||
|
|
# Quick command
|
||
|
|
subparsers.add_parser("quick", help="Quick test with support_arm study")
|
||
|
|
|
||
|
|
# Browser command
|
||
|
|
browser_parser = subparsers.add_parser("browser", help="Run browser UI tests with Playwright")
|
||
|
|
browser_parser.add_argument(
|
||
|
|
"--level",
|
||
|
|
choices=["quick", "home", "full", "study"],
|
||
|
|
default="quick",
|
||
|
|
help="Test level: quick (smoke), home (home page), full (all), study (study-specific)",
|
||
|
|
)
|
||
|
|
browser_parser.add_argument("--study", help="Study name for study-specific tests")
|
||
|
|
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
if args.command == "start":
|
||
|
|
asyncio.run(start_cycle(args.objective, args.max_iterations))
|
||
|
|
elif args.command == "plan":
|
||
|
|
asyncio.run(run_plan(args.objective, args.context))
|
||
|
|
elif args.command == "implement":
|
||
|
|
asyncio.run(run_implement(args.plan))
|
||
|
|
elif args.command == "test":
|
||
|
|
asyncio.run(run_tests(args.study, args.scenarios))
|
||
|
|
elif args.command == "analyze":
|
||
|
|
asyncio.run(run_analyze(args.results))
|
||
|
|
elif args.command == "status":
|
||
|
|
asyncio.run(show_status())
|
||
|
|
elif args.command == "quick":
|
||
|
|
asyncio.run(quick_support_arm())
|
||
|
|
elif args.command == "browser":
|
||
|
|
asyncio.run(run_browser_tests(args.level, args.study))
|
||
|
|
else:
|
||
|
|
parser.print_help()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|