Files
Atomizer/tools/devloop_cli.py
Anto01 3193831340 feat: Add DevLoop automation and HTML Reports
## DevLoop - Closed-Loop Development System
- Orchestrator for plan → build → test → analyze cycle
- Gemini planning via OpenCode CLI
- Claude implementation via CLI bridge
- Playwright browser testing integration
- Test runner with API, filesystem, and browser tests
- Persistent state in .devloop/ directory
- CLI tool: tools/devloop_cli.py

Usage:
  python tools/devloop_cli.py start 'Create new feature'
  python tools/devloop_cli.py plan 'Fix bug in X'
  python tools/devloop_cli.py test --study support_arm
  python tools/devloop_cli.py browser --level full

## HTML Reports (optimization_engine/reporting/)
- Interactive Plotly-based reports
- Convergence plot, Pareto front, parallel coordinates
- Parameter importance analysis
- Self-contained HTML (offline-capable)
- Tailwind CSS styling

## Playwright E2E Tests
- Home page tests
- Test results in test-results/

## LAC Knowledge Base Updates
- Session insights (failures, workarounds, patterns)
- Optimization memory for arm support study
2026-01-24 21:18:18 -05:00

486 lines
16 KiB
Python

#!/usr/bin/env python3
"""
DevLoop CLI - Command-line interface for closed-loop development.
Uses your CLI subscriptions:
- OpenCode CLI (Gemini) for planning and analysis
- Claude Code CLI for implementation
Usage:
python devloop_cli.py start "Create support_arm study"
python devloop_cli.py plan "Fix dashboard validation"
python devloop_cli.py implement plan.json
python devloop_cli.py test --study support_arm
python devloop_cli.py analyze test_results.json
python devloop_cli.py status
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
async def start_cycle(objective: str, max_iterations: int = 5):
"""Start a development cycle using CLI tools."""
from optimization_engine.devloop.cli_bridge import DevLoopCLIOrchestrator
print(f"Starting DevLoop cycle: {objective}")
print("=" * 60)
print("Using: OpenCode (Gemini) for planning, Claude Code for implementation")
print("=" * 60)
orchestrator = DevLoopCLIOrchestrator()
result = await orchestrator.run_cycle(
objective=objective,
max_iterations=max_iterations,
)
print("\n" + "=" * 60)
print(f"Cycle complete: {result['status']}")
print(f" Iterations: {len(result['iterations'])}")
print(f" Duration: {result.get('duration_seconds', 0):.1f}s")
for i, iter_result in enumerate(result["iterations"], 1):
impl = iter_result.get("implementation", {})
tests = iter_result.get("test_results", {}).get("summary", {})
print(f"\n Iteration {i}:")
print(f" Implementation: {'OK' if impl.get('success') else 'FAILED'}")
print(f" Tests: {tests.get('passed', 0)}/{tests.get('total', 0)} passed")
return result
async def run_plan(objective: str, context_file: str = None):
"""Run only the planning phase with Gemini via OpenCode."""
from optimization_engine.devloop.cli_bridge import OpenCodeCLI
print(f"Planning with Gemini (OpenCode): {objective}")
print("-" * 60)
workspace = Path("C:/Users/antoi/Atomizer")
opencode = OpenCodeCLI(workspace)
context = None
if context_file:
with open(context_file) as f:
context = json.load(f)
plan = await opencode.plan(objective, context)
print("\nPlan created:")
print(json.dumps(plan, indent=2))
# Save plan to file
plan_file = workspace / ".devloop" / "current_plan.json"
plan_file.parent.mkdir(exist_ok=True)
with open(plan_file, "w") as f:
json.dump(plan, f, indent=2)
print(f"\nPlan saved to: {plan_file}")
return plan
async def run_implement(plan_file: str = None):
"""Run only the implementation phase with Claude Code."""
from optimization_engine.devloop.cli_bridge import DevLoopCLIOrchestrator
workspace = Path("C:/Users/antoi/Atomizer")
# Load plan
if plan_file:
plan_path = Path(plan_file)
else:
plan_path = workspace / ".devloop" / "current_plan.json"
if not plan_path.exists():
print(f"Error: Plan file not found: {plan_path}")
print("Run 'devloop_cli.py plan <objective>' first")
return None
with open(plan_path) as f:
plan = json.load(f)
print(f"Implementing plan: {plan.get('objective', 'Unknown')}")
print("-" * 60)
print(f"Tasks: {len(plan.get('tasks', []))}")
orchestrator = DevLoopCLIOrchestrator(workspace)
result = await orchestrator.step_implement(plan)
print(f"\nImplementation {'succeeded' if result.success else 'failed'}")
print(f" Duration: {result.duration_seconds:.1f}s")
print(f" Files modified: {len(result.files_modified)}")
for f in result.files_modified:
print(f" - {f}")
if result.error:
print(f"\nError: {result.error}")
return result
async def run_browser_tests(level: str = "quick", study_name: str = None):
"""Run browser tests using Playwright via DevLoop."""
from optimization_engine.devloop.test_runner import DashboardTestRunner
from optimization_engine.devloop.browser_scenarios import get_browser_scenarios
print(f"Running browser tests (level={level})")
print("-" * 60)
runner = DashboardTestRunner()
scenarios = get_browser_scenarios(level=level, study_name=study_name)
print(f"Scenarios: {len(scenarios)}")
for s in scenarios:
print(f" - {s['name']}")
results = await runner.run_test_suite(scenarios)
summary = results.get("summary", {})
print(f"\nResults: {summary.get('passed', 0)}/{summary.get('total', 0)} passed")
for scenario in results.get("scenarios", []):
status = "PASS" if scenario.get("passed") else "FAIL"
print(f" [{status}] {scenario.get('scenario_name')}")
if not scenario.get("passed") and scenario.get("error"):
print(f" Error: {scenario.get('error')}")
# Save results
workspace = Path("C:/Users/antoi/Atomizer")
results_file = workspace / ".devloop" / "browser_test_results.json"
results_file.parent.mkdir(exist_ok=True)
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to: {results_file}")
return results
async def run_tests(
study_name: str = None, scenarios_file: str = None, include_browser: bool = False
):
"""Run tests for a specific study or from scenarios file."""
from optimization_engine.devloop.test_runner import DashboardTestRunner
runner = DashboardTestRunner()
if scenarios_file:
with open(scenarios_file) as f:
scenarios = json.load(f)
elif study_name:
print(f"Running tests for study: {study_name}")
print("-" * 60)
# Find the study - check both flat and nested locations
from pathlib import Path
studies_root = Path("studies")
# Check flat structure first (studies/study_name)
if (studies_root / study_name).exists():
study_path = f"studies/{study_name}"
# Then check nested _Other structure
elif (studies_root / "_Other" / study_name).exists():
study_path = f"studies/_Other/{study_name}"
# Check other topic folders
else:
study_path = None
for topic_dir in studies_root.iterdir():
if topic_dir.is_dir() and (topic_dir / study_name).exists():
study_path = f"studies/{topic_dir.name}/{study_name}"
break
if not study_path:
study_path = f"studies/{study_name}" # Default, will fail gracefully
print(f"Study path: {study_path}")
# Generate test scenarios for the study
scenarios = [
{
"id": "test_study_dir",
"name": f"Study directory exists: {study_name}",
"type": "filesystem",
"steps": [{"action": "check_exists", "path": study_path}],
"expected_outcome": {"exists": True},
},
{
"id": "test_spec",
"name": "AtomizerSpec is valid JSON",
"type": "filesystem",
"steps": [
{
"action": "check_json_valid",
"path": f"{study_path}/atomizer_spec.json",
}
],
"expected_outcome": {"valid_json": True},
},
{
"id": "test_readme",
"name": "README exists",
"type": "filesystem",
"steps": [{"action": "check_exists", "path": f"{study_path}/README.md"}],
"expected_outcome": {"exists": True},
},
{
"id": "test_run_script",
"name": "run_optimization.py exists",
"type": "filesystem",
"steps": [
{
"action": "check_exists",
"path": f"{study_path}/run_optimization.py",
}
],
"expected_outcome": {"exists": True},
},
{
"id": "test_model_dir",
"name": "Model directory exists",
"type": "filesystem",
"steps": [{"action": "check_exists", "path": f"{study_path}/1_setup/model"}],
"expected_outcome": {"exists": True},
},
]
else:
print("Error: Provide --study or --scenarios")
return None
results = await runner.run_test_suite(scenarios)
summary = results.get("summary", {})
print(f"\nResults: {summary.get('passed', 0)}/{summary.get('total', 0)} passed")
for scenario in results.get("scenarios", []):
status = "PASS" if scenario.get("passed") else "FAIL"
print(f" [{status}] {scenario.get('scenario_name')}")
if not scenario.get("passed") and scenario.get("error"):
print(f" Error: {scenario.get('error')}")
# Save results
workspace = Path("C:/Users/antoi/Atomizer")
results_file = workspace / ".devloop" / "test_results.json"
results_file.parent.mkdir(exist_ok=True)
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to: {results_file}")
return results
async def run_analyze(results_file: str = None):
"""Analyze test results with Gemini via OpenCode."""
from optimization_engine.devloop.cli_bridge import OpenCodeCLI
workspace = Path("C:/Users/antoi/Atomizer")
# Load results
if results_file:
results_path = Path(results_file)
else:
results_path = workspace / ".devloop" / "test_results.json"
if not results_path.exists():
print(f"Error: Results file not found: {results_path}")
print("Run 'devloop_cli.py test --study <name>' first")
return None
with open(results_path) as f:
test_results = json.load(f)
print("Analyzing test results with Gemini (OpenCode)...")
print("-" * 60)
opencode = OpenCodeCLI(workspace)
analysis = await opencode.analyze(test_results)
print(f"\nAnalysis complete:")
print(f" Issues found: {analysis.get('issues_found', False)}")
for issue in analysis.get("issues", []):
print(f"\n Issue: {issue.get('id')}")
print(f" Description: {issue.get('description')}")
print(f" Severity: {issue.get('severity')}")
print(f" Root cause: {issue.get('root_cause')}")
for rec in analysis.get("recommendations", []):
print(f"\n Recommendation: {rec}")
# Save analysis
analysis_file = workspace / ".devloop" / "analysis.json"
with open(analysis_file, "w") as f:
json.dump(analysis, f, indent=2)
print(f"\nAnalysis saved to: {analysis_file}")
return analysis
async def show_status():
"""Show current DevLoop status."""
workspace = Path("C:/Users/antoi/Atomizer")
devloop_dir = workspace / ".devloop"
print("DevLoop Status")
print("=" * 60)
# Check for existing files
plan_file = devloop_dir / "current_plan.json"
results_file = devloop_dir / "test_results.json"
analysis_file = devloop_dir / "analysis.json"
if plan_file.exists():
with open(plan_file) as f:
plan = json.load(f)
print(f"\nCurrent Plan: {plan.get('objective', 'Unknown')}")
print(f" Tasks: {len(plan.get('tasks', []))}")
else:
print("\nNo current plan")
if results_file.exists():
with open(results_file) as f:
results = json.load(f)
summary = results.get("summary", {})
print(f"\nLast Test Results:")
print(f" Passed: {summary.get('passed', 0)}/{summary.get('total', 0)}")
else:
print("\nNo test results")
if analysis_file.exists():
with open(analysis_file) as f:
analysis = json.load(f)
print(f"\nLast Analysis:")
print(f" Issues: {len(analysis.get('issues', []))}")
else:
print("\nNo analysis")
print("\n" + "=" * 60)
print("CLI Tools:")
print(" - Claude Code: C:\\Users\\antoi\\.local\\bin\\claude.exe")
print(" - OpenCode: C:\\Users\\antoi\\AppData\\Roaming\\npm\\opencode.cmd")
async def quick_support_arm():
"""Quick test with support_arm study."""
print("Quick DevLoop test with support_arm study")
print("=" * 60)
# Test the study
results = await run_tests(study_name="support_arm")
if results and results.get("summary", {}).get("failed", 0) == 0:
print("\n" + "=" * 60)
print("SUCCESS: support_arm study is properly configured!")
print("\nNext steps:")
print(
" 1. Run optimization: cd studies/_Other/support_arm && python run_optimization.py --test"
)
print(" 2. Start dashboard: cd atomizer-dashboard && npm run dev")
print(" 3. View in canvas: http://localhost:3000/canvas/support_arm")
else:
print("\n" + "=" * 60)
print("Some tests failed. Running analysis...")
await run_analyze()
def main():
parser = argparse.ArgumentParser(
description="DevLoop CLI - Closed-loop development using CLI subscriptions",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run full development cycle
python devloop_cli.py start "Create new bracket study"
# Step-by-step execution
python devloop_cli.py plan "Fix dashboard validation"
python devloop_cli.py implement
python devloop_cli.py test --study support_arm
python devloop_cli.py analyze
# Browser tests (Playwright)
python devloop_cli.py browser # Quick smoke test
python devloop_cli.py browser --level full # All UI tests
python devloop_cli.py browser --study support_arm # Study-specific
# Quick test
python devloop_cli.py quick
Tools used:
- OpenCode (Gemini): Planning and analysis
- Claude Code: Implementation and fixes
- Playwright: Browser UI testing
""",
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Start command - full cycle
start_parser = subparsers.add_parser("start", help="Start a full development cycle")
start_parser.add_argument("objective", help="What to achieve")
start_parser.add_argument("--max-iterations", type=int, default=5, help="Max fix iterations")
# Plan command
plan_parser = subparsers.add_parser("plan", help="Create plan with Gemini (OpenCode)")
plan_parser.add_argument("objective", help="What to plan")
plan_parser.add_argument("--context", help="Context JSON file")
# Implement command
impl_parser = subparsers.add_parser("implement", help="Implement plan with Claude Code")
impl_parser.add_argument("--plan", help="Plan JSON file (default: .devloop/current_plan.json)")
# Test command
test_parser = subparsers.add_parser("test", help="Run tests")
test_parser.add_argument("--study", help="Study name to test")
test_parser.add_argument("--scenarios", help="Test scenarios JSON file")
# Analyze command
analyze_parser = subparsers.add_parser("analyze", help="Analyze results with Gemini (OpenCode)")
analyze_parser.add_argument("--results", help="Test results JSON file")
# Status command
subparsers.add_parser("status", help="Show current DevLoop status")
# Quick command
subparsers.add_parser("quick", help="Quick test with support_arm study")
# Browser command
browser_parser = subparsers.add_parser("browser", help="Run browser UI tests with Playwright")
browser_parser.add_argument(
"--level",
choices=["quick", "home", "full", "study"],
default="quick",
help="Test level: quick (smoke), home (home page), full (all), study (study-specific)",
)
browser_parser.add_argument("--study", help="Study name for study-specific tests")
args = parser.parse_args()
if args.command == "start":
asyncio.run(start_cycle(args.objective, args.max_iterations))
elif args.command == "plan":
asyncio.run(run_plan(args.objective, args.context))
elif args.command == "implement":
asyncio.run(run_implement(args.plan))
elif args.command == "test":
asyncio.run(run_tests(args.study, args.scenarios))
elif args.command == "analyze":
asyncio.run(run_analyze(args.results))
elif args.command == "status":
asyncio.run(show_status())
elif args.command == "quick":
asyncio.run(quick_support_arm())
elif args.command == "browser":
asyncio.run(run_browser_tests(args.level, args.study))
else:
parser.print_help()
if __name__ == "__main__":
main()