feat: add Atomizer HQ multi-agent cluster infrastructure
- 8-agent OpenClaw cluster (Manager, Tech-Lead, Secretary, Auditor, Optimizer, Study-Builder, NX-Expert, Webster) - Orchestration engine: orchestrate.py (sync delegation + handoffs) - Workflow engine: YAML-defined multi-step pipelines - Agent workspaces: SOUL.md, AGENTS.md, MEMORY.md per agent - Shared skills: delegate, orchestrate, atomizer-protocols - Capability registry (AGENTS_REGISTRY.json) - Cluster management: cluster.sh, systemd template - All secrets replaced with env var references
This commit is contained in:
437
hq/workspaces/shared/skills/orchestrate/workflow.py
Executable file
437
hq/workspaces/shared/skills/orchestrate/workflow.py
Executable file
@@ -0,0 +1,437 @@
|
||||
#!/usr/bin/env python3
|
||||
"""YAML workflow engine for Atomizer orchestration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print(json.dumps({"status": "error", "error": "PyYAML is required (pip install pyyaml)"}, indent=2))
|
||||
sys.exit(1)
|
||||
|
||||
WORKFLOWS_DIR = Path("/home/papa/atomizer/workspaces/shared/workflows")
|
||||
ORCHESTRATE_PY = Path("/home/papa/atomizer/workspaces/shared/skills/orchestrate/orchestrate.py")
|
||||
HANDOFF_WORKFLOWS_DIR = Path("/home/papa/atomizer/handoffs/workflows")
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def parse_inputs(items: list[str]) -> dict[str, Any]:
|
||||
parsed: dict[str, Any] = {}
|
||||
for item in items:
|
||||
if "=" not in item:
|
||||
raise ValueError(f"Invalid --input '{item}', expected key=value")
|
||||
k, v = item.split("=", 1)
|
||||
parsed[k.strip()] = v.strip()
|
||||
return parsed
|
||||
|
||||
|
||||
def resolve_workflow_path(name_or_path: str) -> Path:
|
||||
p = Path(name_or_path)
|
||||
if p.exists():
|
||||
return p
|
||||
candidates = [WORKFLOWS_DIR / name_or_path, WORKFLOWS_DIR / f"{name_or_path}.yaml", WORKFLOWS_DIR / f"{name_or_path}.yml"]
|
||||
for c in candidates:
|
||||
if c.exists():
|
||||
return c
|
||||
raise FileNotFoundError(f"Workflow not found: {name_or_path}")
|
||||
|
||||
|
||||
def load_workflow(path: Path) -> dict[str, Any]:
|
||||
data = yaml.safe_load(path.read_text())
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Workflow YAML must be an object")
|
||||
if not isinstance(data.get("steps"), list) or not data["steps"]:
|
||||
raise ValueError("Workflow must define non-empty 'steps'")
|
||||
return data
|
||||
|
||||
|
||||
def validate_graph(steps: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, set[str]], dict[str, set[str]], list[list[str]]]:
|
||||
step_map: dict[str, dict[str, Any]] = {}
|
||||
deps: dict[str, set[str]] = {}
|
||||
reverse: dict[str, set[str]] = {}
|
||||
|
||||
for step in steps:
|
||||
sid = step.get("id")
|
||||
if not sid or not isinstance(sid, str):
|
||||
raise ValueError("Each step needs string 'id'")
|
||||
if sid in step_map:
|
||||
raise ValueError(f"Duplicate step id: {sid}")
|
||||
step_map[sid] = step
|
||||
deps[sid] = set(step.get("depends_on", []) or [])
|
||||
reverse[sid] = set()
|
||||
|
||||
for sid, dset in deps.items():
|
||||
for dep in dset:
|
||||
if dep not in step_map:
|
||||
raise ValueError(f"Step '{sid}' depends on unknown step '{dep}'")
|
||||
reverse[dep].add(sid)
|
||||
|
||||
# topological layering + cycle check
|
||||
indeg = {sid: len(dset) for sid, dset in deps.items()}
|
||||
ready = sorted([sid for sid, d in indeg.items() if d == 0])
|
||||
visited = 0
|
||||
layers: list[list[str]] = []
|
||||
|
||||
while ready:
|
||||
layer = list(ready)
|
||||
layers.append(layer)
|
||||
visited += len(layer)
|
||||
next_ready: list[str] = []
|
||||
for sid in layer:
|
||||
for child in sorted(reverse[sid]):
|
||||
indeg[child] -= 1
|
||||
if indeg[child] == 0:
|
||||
next_ready.append(child)
|
||||
ready = sorted(next_ready)
|
||||
|
||||
if visited != len(step_map):
|
||||
cycle_nodes = [sid for sid, d in indeg.items() if d > 0]
|
||||
raise ValueError(f"Dependency cycle detected involving: {', '.join(cycle_nodes)}")
|
||||
|
||||
return step_map, deps, reverse, layers
|
||||
|
||||
|
||||
_VAR_RE = re.compile(r"\{([^{}]+)\}")
|
||||
|
||||
|
||||
def substitute(text: str, step_outputs: dict[str, Any], inputs: dict[str, Any]) -> str:
|
||||
def repl(match: re.Match[str]) -> str:
|
||||
key = match.group(1).strip()
|
||||
if key.startswith("inputs."):
|
||||
iv = key.split(".", 1)[1]
|
||||
if iv not in inputs:
|
||||
return match.group(0)
|
||||
return str(inputs[iv])
|
||||
if key in step_outputs:
|
||||
val = step_outputs[key]
|
||||
if isinstance(val, (dict, list)):
|
||||
return json.dumps(val, ensure_ascii=False)
|
||||
return str(val)
|
||||
return match.group(0)
|
||||
|
||||
return _VAR_RE.sub(repl, text)
|
||||
|
||||
|
||||
def approval_check(step: dict[str, Any], non_interactive: bool) -> bool:
|
||||
gate = step.get("approval_gate")
|
||||
if not gate:
|
||||
return True
|
||||
if non_interactive:
|
||||
print(f"WARNING: non-interactive mode, skipping approval gate '{gate}' for step '{step['id']}'", file=sys.stderr)
|
||||
return True
|
||||
print(f"Approval gate required for step '{step['id']}' ({gate}). Approve? [yes/no]: ", end="", flush=True)
|
||||
response = sys.stdin.readline().strip().lower()
|
||||
return response in {"y", "yes"}
|
||||
|
||||
|
||||
def run_orchestrate(agent: str, task: str, timeout_s: int, caller: str, workflow_run_id: str, step_id: str, retries: int) -> dict[str, Any]:
|
||||
cmd = [
|
||||
"python3", str(ORCHESTRATE_PY),
|
||||
agent,
|
||||
task,
|
||||
"--timeout", str(timeout_s),
|
||||
"--caller", caller,
|
||||
"--workflow-id", workflow_run_id,
|
||||
"--step-id", step_id,
|
||||
"--retries", str(max(1, retries)),
|
||||
"--format", "json",
|
||||
]
|
||||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
out = (proc.stdout or "").strip()
|
||||
if not out:
|
||||
return {
|
||||
"status": "failed",
|
||||
"result": None,
|
||||
"notes": f"No stdout from orchestrate.py; stderr: {(proc.stderr or '').strip()[:1000]}",
|
||||
"exitCode": proc.returncode,
|
||||
}
|
||||
|
||||
try:
|
||||
data = json.loads(out)
|
||||
except json.JSONDecodeError:
|
||||
return {
|
||||
"status": "failed",
|
||||
"result": out,
|
||||
"notes": f"Non-JSON response from orchestrate.py; stderr: {(proc.stderr or '').strip()[:1000]}",
|
||||
"exitCode": proc.returncode,
|
||||
}
|
||||
|
||||
data["exitCode"] = proc.returncode
|
||||
if proc.stderr:
|
||||
data["stderr"] = proc.stderr.strip()[:2000]
|
||||
return data
|
||||
|
||||
|
||||
def validation_passed(validation_result: dict[str, Any]) -> bool:
|
||||
if validation_result.get("status") not in {"complete", "partial"}:
|
||||
return False
|
||||
body = str(validation_result.get("result", "")).strip()
|
||||
# If validator returned JSON in result, try to parse decision.
|
||||
try:
|
||||
obj = json.loads(body)
|
||||
decision = str(obj.get("decision", "")).lower()
|
||||
if decision in {"accept", "approved", "pass", "passed"}:
|
||||
return True
|
||||
if decision in {"reject", "fail", "failed"}:
|
||||
return False
|
||||
except Exception:
|
||||
pass
|
||||
lowered = body.lower()
|
||||
if "reject" in lowered or "fail" in lowered:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def execute_step(
|
||||
step: dict[str, Any],
|
||||
inputs: dict[str, Any],
|
||||
step_outputs: dict[str, Any],
|
||||
caller: str,
|
||||
workflow_run_id: str,
|
||||
remaining_timeout: int,
|
||||
non_interactive: bool,
|
||||
out_dir: Path,
|
||||
lock: threading.Lock,
|
||||
) -> dict[str, Any]:
|
||||
sid = step["id"]
|
||||
start = time.time()
|
||||
|
||||
if not approval_check(step, non_interactive):
|
||||
result = {
|
||||
"step_id": sid,
|
||||
"status": "failed",
|
||||
"error": "approval_denied",
|
||||
"started_at": now_iso(),
|
||||
"finished_at": now_iso(),
|
||||
"duration_s": 0,
|
||||
}
|
||||
(out_dir / f"{sid}.json").write_text(json.dumps(result, indent=2))
|
||||
return result
|
||||
|
||||
task = substitute(str(step.get("task", "")), step_outputs, inputs)
|
||||
agent = step.get("agent")
|
||||
if not agent:
|
||||
result = {
|
||||
"step_id": sid,
|
||||
"status": "failed",
|
||||
"error": "missing_agent",
|
||||
"started_at": now_iso(),
|
||||
"finished_at": now_iso(),
|
||||
"duration_s": 0,
|
||||
}
|
||||
(out_dir / f"{sid}.json").write_text(json.dumps(result, indent=2))
|
||||
return result
|
||||
|
||||
step_timeout = int(step.get("timeout", 300))
|
||||
timeout_s = max(1, min(step_timeout, remaining_timeout))
|
||||
retries = int(step.get("retries", 1))
|
||||
|
||||
run_res = run_orchestrate(agent, task, timeout_s, caller, workflow_run_id, sid, retries)
|
||||
|
||||
step_result: dict[str, Any] = {
|
||||
"step_id": sid,
|
||||
"agent": agent,
|
||||
"status": run_res.get("status", "failed"),
|
||||
"result": run_res.get("result"),
|
||||
"notes": run_res.get("notes"),
|
||||
"run": run_res,
|
||||
"started_at": datetime.fromtimestamp(start, tz=timezone.utc).isoformat(),
|
||||
"finished_at": now_iso(),
|
||||
"duration_s": round(time.time() - start, 3),
|
||||
}
|
||||
|
||||
validation_cfg = step.get("validation")
|
||||
if validation_cfg and step_result["status"] in {"complete", "partial"}:
|
||||
v_agent = validation_cfg.get("agent")
|
||||
criteria = validation_cfg.get("criteria", "Validate this output for quality and correctness.")
|
||||
if v_agent:
|
||||
v_task = (
|
||||
"Validate the following workflow step output. Return a decision in JSON like "
|
||||
"{\"decision\":\"accept|reject\",\"reason\":\"...\"}.\n\n"
|
||||
f"Step ID: {sid}\n"
|
||||
f"Criteria: {criteria}\n\n"
|
||||
f"Output to validate:\n{step_result.get('result')}"
|
||||
)
|
||||
v_timeout = int(validation_cfg.get("timeout", min(180, timeout_s)))
|
||||
validation_res = run_orchestrate(v_agent, v_task, max(1, v_timeout), caller, workflow_run_id, f"{sid}__validation", 1)
|
||||
step_result["validation"] = validation_res
|
||||
if not validation_passed(validation_res):
|
||||
step_result["status"] = "failed"
|
||||
step_result["error"] = "validation_failed"
|
||||
step_result["notes"] = f"Validation failed by {v_agent}: {validation_res.get('result') or validation_res.get('notes')}"
|
||||
|
||||
with lock:
|
||||
(out_dir / f"{sid}.json").write_text(json.dumps(step_result, indent=2))
|
||||
return step_result
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Run YAML workflows using orchestrate.py")
|
||||
parser.add_argument("workflow")
|
||||
parser.add_argument("--input", action="append", default=[], help="key=value (repeatable)")
|
||||
parser.add_argument("--caller", default="manager")
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--non-interactive", action="store_true")
|
||||
parser.add_argument("--timeout", type=int, default=1800, help="Overall workflow timeout seconds")
|
||||
args = parser.parse_args()
|
||||
|
||||
wf_path = resolve_workflow_path(args.workflow)
|
||||
wf = load_workflow(wf_path)
|
||||
inputs = parse_inputs(args.input)
|
||||
|
||||
steps = wf["steps"]
|
||||
step_map, deps, reverse, layers = validate_graph(steps)
|
||||
|
||||
workflow_run_id = f"wf-{int(time.time())}-{uuid.uuid4().hex[:8]}"
|
||||
out_dir = HANDOFF_WORKFLOWS_DIR / workflow_run_id
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if args.dry_run:
|
||||
plan = {
|
||||
"status": "dry_run",
|
||||
"workflow": wf.get("name", wf_path.name),
|
||||
"workflow_file": str(wf_path),
|
||||
"workflow_run_id": workflow_run_id,
|
||||
"inputs": inputs,
|
||||
"steps": [
|
||||
{
|
||||
"id": s["id"],
|
||||
"agent": s.get("agent"),
|
||||
"depends_on": s.get("depends_on", []),
|
||||
"timeout": s.get("timeout", 300),
|
||||
"retries": s.get("retries", 1),
|
||||
"approval_gate": s.get("approval_gate"),
|
||||
"has_validation": bool(s.get("validation")),
|
||||
}
|
||||
for s in steps
|
||||
],
|
||||
"execution_layers": layers,
|
||||
"result_dir": str(out_dir),
|
||||
}
|
||||
print(json.dumps(plan, indent=2))
|
||||
return
|
||||
|
||||
started = time.time()
|
||||
deadline = started + args.timeout
|
||||
lock = threading.Lock()
|
||||
|
||||
state: dict[str, str] = {sid: "pending" for sid in step_map}
|
||||
step_results: dict[str, dict[str, Any]] = {}
|
||||
step_outputs: dict[str, Any] = {}
|
||||
overall_status = "complete"
|
||||
|
||||
max_workers = max(1, min(len(step_map), (os.cpu_count() or 4)))
|
||||
|
||||
while True:
|
||||
if time.time() >= deadline:
|
||||
overall_status = "timeout"
|
||||
break
|
||||
|
||||
pending = [sid for sid, st in state.items() if st == "pending"]
|
||||
if not pending:
|
||||
break
|
||||
|
||||
ready = []
|
||||
for sid in pending:
|
||||
if all(state[d] in {"complete", "skipped"} for d in deps[sid]):
|
||||
ready.append(sid)
|
||||
|
||||
if not ready:
|
||||
# deadlock due to upstream abort/fail on pending deps
|
||||
if any(st == "aborted" for st in state.values()):
|
||||
break
|
||||
overall_status = "failed"
|
||||
break
|
||||
|
||||
futures = {}
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||||
for sid in ready:
|
||||
state[sid] = "running"
|
||||
remaining_timeout = int(max(1, deadline - time.time()))
|
||||
futures[pool.submit(
|
||||
execute_step,
|
||||
step_map[sid],
|
||||
inputs,
|
||||
step_outputs,
|
||||
args.caller,
|
||||
workflow_run_id,
|
||||
remaining_timeout,
|
||||
args.non_interactive,
|
||||
out_dir,
|
||||
lock,
|
||||
)] = sid
|
||||
|
||||
for fut in as_completed(futures):
|
||||
sid = futures[fut]
|
||||
res = fut.result()
|
||||
step_results[sid] = res
|
||||
st = res.get("status", "failed")
|
||||
|
||||
if st in {"complete", "partial"}:
|
||||
state[sid] = "complete"
|
||||
step_outputs[sid] = res.get("result")
|
||||
out_name = step_map[sid].get("output")
|
||||
if out_name:
|
||||
step_outputs[str(out_name)] = res.get("result")
|
||||
else:
|
||||
on_fail = str(step_map[sid].get("on_fail", "abort")).lower()
|
||||
if on_fail == "skip":
|
||||
state[sid] = "skipped"
|
||||
overall_status = "partial"
|
||||
else:
|
||||
state[sid] = "failed"
|
||||
overall_status = "failed"
|
||||
# abort all pending steps
|
||||
for psid in list(state):
|
||||
if state[psid] == "pending":
|
||||
state[psid] = "aborted"
|
||||
|
||||
finished = time.time()
|
||||
if overall_status == "complete" and any(st == "skipped" for st in state.values()):
|
||||
overall_status = "partial"
|
||||
|
||||
summary = {
|
||||
"status": overall_status,
|
||||
"workflow": wf.get("name", wf_path.name),
|
||||
"workflow_file": str(wf_path),
|
||||
"workflow_run_id": workflow_run_id,
|
||||
"caller": args.caller,
|
||||
"started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat(),
|
||||
"finished_at": datetime.fromtimestamp(finished, tz=timezone.utc).isoformat(),
|
||||
"duration_s": round(finished - started, 3),
|
||||
"timeout_s": args.timeout,
|
||||
"inputs": inputs,
|
||||
"state": state,
|
||||
"results": step_results,
|
||||
"result_dir": str(out_dir),
|
||||
"notifications": wf.get("notifications", {}),
|
||||
}
|
||||
|
||||
(out_dir / "summary.json").write_text(json.dumps(summary, indent=2))
|
||||
print(json.dumps(summary, indent=2))
|
||||
|
||||
if overall_status in {"complete", "partial"}:
|
||||
sys.exit(0)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user