#!/usr/bin/env python3 """ Atomizer HQ Orchestration Engine — Phase 1b Synchronous delegation with file-based handoffs, inotify, validation, retries, error handling. Usage: python3 orchestrate.py "" [options] Options: --wait Block until agent completes (default: true) --timeout Max wait time per attempt (default: 300) --format json|text Expected response format (default: json) --context Attach context file to the task --no-deliver Don't post to Discord --run-id Custom run ID (default: auto-generated) --retries Retry on failure (default: 1, max: 3) --validate Validate required handoff fields strictly --workflow-id Workflow run ID (for tracing) --step-id Workflow step ID (for tracing) --caller Calling agent (for ACL enforcement) --channel-context Include recent Discord channel history as untrusted context --channel-messages Number of channel messages to fetch (default: 20, max: 30) """ import argparse import json import os import subprocess import sys import time import uuid from pathlib import Path # ── Constants ──────────────────────────────────────────────────────────────── HANDOFF_DIR = Path("/home/papa/atomizer/handoffs") LOG_DIR = Path("/home/papa/atomizer/logs/orchestration") REGISTRY_PATH = Path("/home/papa/atomizer/workspaces/shared/AGENTS_REGISTRY.json") ORCHESTRATE_DIR = Path("/home/papa/atomizer/workspaces/shared/skills/orchestrate") GATEWAY_TOKEN = "31422bb39bc9e7a4d34f789d8a7cbc582dece8dd170dadd1" # Port map (fallback if registry unavailable) AGENT_PORTS = { "manager": 18800, "tech-lead": 18804, "secretary": 18808, "auditor": 18812, "optimizer": 18816, "study-builder": 18820, "nx-expert": 18824, "webster": 18828, } # Delegation ACL — who can delegate to whom DELEGATION_ACL = { "manager": ["tech-lead", "auditor", "optimizer", "study-builder", "nx-expert", "webster", "secretary"], "tech-lead": ["webster", "nx-expert", "study-builder", "secretary"], "optimizer": ["webster", "study-builder", "secretary"], # All others: no sub-delegation allowed } # Required handoff fields for strict validation REQUIRED_FIELDS = ["status", "result"] STRICT_FIELDS = ["schemaVersion", "status", "result", "confidence", "timestamp"] DELIVERABLE_TYPES = ["document", "code", "analysis", "recommendation", "review", "data"] # ── Helpers ────────────────────────────────────────────────────────────────── def get_agent_port(agent: str) -> int: """Resolve agent name to port, checking registry first.""" if REGISTRY_PATH.exists(): try: registry = json.loads(REGISTRY_PATH.read_text()) agent_info = registry.get("agents", {}).get(agent) if agent_info and "port" in agent_info: return agent_info["port"] except (json.JSONDecodeError, KeyError): pass port = AGENT_PORTS.get(agent) if port is None: emit_error(f"Unknown agent '{agent}'") sys.exit(1) return port def check_acl(caller: str | None, target: str) -> bool: """Check if caller is allowed to delegate to target.""" if caller is None: return True # No caller specified = no ACL enforcement if caller == target: return False # No self-delegation allowed = DELEGATION_ACL.get(caller) if allowed is None: return False # Agent not in ACL = cannot delegate return target in allowed def check_health(agent: str, port: int) -> bool: """Quick health check — can we reach the agent's gateway?""" try: result = subprocess.run( ["curl", "-sf", "-o", "/dev/null", "-w", "%{http_code}", f"http://127.0.0.1:{port}/healthz"], capture_output=True, text=True, timeout=5 ) return result.stdout.strip() in ("200", "204") except (subprocess.TimeoutExpired, Exception): return False def send_task(agent: str, port: int, task: str, run_id: str, attempt: int = 1, prev_error: str = None, context: str = None, no_deliver: bool = False) -> bool: """Send a task to the agent via /hooks/agent endpoint.""" handoff_path = HANDOFF_DIR / f"{run_id}.json" # Build retry context if this is a retry retry_note = "" if attempt > 1 and prev_error: retry_note = f"\n⚠️ RETRY (attempt {attempt}): Previous attempt failed: {prev_error}\nPlease try again carefully.\n" message = f"""[ORCHESTRATED TASK — run_id: {run_id}] {retry_note} IMPORTANT: Answer this task DIRECTLY. Do NOT spawn sub-agents, Codex, or background processes. Use your own knowledge and tools (web_search, web_fetch) directly. Keep your response focused and concise. {task} --- IMPORTANT: When you complete this task, write your response as a JSON file to: {handoff_path} Use this exact format: ```json {{ "schemaVersion": "1.1", "runId": "{run_id}", "agent": "{agent}", "status": "complete", "result": "", "deliverable": {{ "type": "", "title": "", "path": "", "summary": "" }}, "artifacts": [], "confidence": "high|medium|low", "notes": "", "timestamp": "" }} ``` Status values: complete | partial | blocked | failed ⚠️ The "deliverable" block is MANDATORY. Every task must produce a concrete deliverable. If your result is self-contained in "result", set deliverable.path to null and deliverable.type to "analysis" or "recommendation". Write the file BEFORE posting to Discord. The orchestrator is waiting for it.""" if context: message = f"CONTEXT:\n{context}\n\n{message}" payload = { "message": message, "name": f"orchestrate:{run_id}", "sessionKey": f"hook:orchestrate:{run_id}:{attempt}", "deliver": not no_deliver, "wakeMode": "now", "timeoutSeconds": 600, } try: result = subprocess.run( ["curl", "-sf", "-X", "POST", f"http://127.0.0.1:{port}/hooks/agent", "-H", f"Authorization: Bearer {GATEWAY_TOKEN}", "-H", "Content-Type: application/json", "-d", json.dumps(payload)], capture_output=True, text=True, timeout=15 ) return result.returncode == 0 except (subprocess.TimeoutExpired, Exception) as e: log_event(run_id, agent, "send_error", str(e), attempt=attempt) return False def wait_for_handoff(run_id: str, timeout: int) -> dict | None: """Wait for the handoff file using inotify. Falls back to polling.""" handoff_path = HANDOFF_DIR / f"{run_id}.json" # Check if already exists (agent was fast, or late arrival from prev attempt) if handoff_path.exists(): return read_handoff(handoff_path) try: from inotify_simple import INotify, flags inotify = INotify() watch_flags = flags.CREATE | flags.MOVED_TO | flags.CLOSE_WRITE wd = inotify.add_watch(str(HANDOFF_DIR), watch_flags) deadline = time.time() + timeout target_name = f"{run_id}.json" while time.time() < deadline: remaining = max(0.1, deadline - time.time()) events = inotify.read(timeout=int(remaining * 1000)) for event in events: if event.name == target_name: time.sleep(0.3) # Ensure file is fully written inotify.close() return read_handoff(handoff_path) # Direct check in case we missed the inotify event if handoff_path.exists(): inotify.close() return read_handoff(handoff_path) inotify.close() return None except ImportError: return poll_for_handoff(handoff_path, timeout) def poll_for_handoff(handoff_path: Path, timeout: int) -> dict | None: """Fallback polling if inotify unavailable.""" deadline = time.time() + timeout while time.time() < deadline: if handoff_path.exists(): time.sleep(0.3) return read_handoff(handoff_path) time.sleep(2) return None def read_handoff(path: Path) -> dict | None: """Read and parse a handoff file.""" try: raw = path.read_text().strip() data = json.loads(raw) return data except json.JSONDecodeError: return { "status": "malformed", "result": path.read_text()[:2000], "notes": "Invalid JSON in handoff file", "_raw": True, } except Exception as e: return { "status": "error", "result": str(e), "notes": f"Failed to read handoff file: {e}", } def validate_handoff(data: dict, strict: bool = False) -> tuple[bool, str]: """Validate handoff data. Returns (valid, error_message).""" if data is None: return False, "No handoff data" fields = STRICT_FIELDS if strict else REQUIRED_FIELDS missing = [f for f in fields if f not in data] if missing: return False, f"Missing fields: {', '.join(missing)}" status = data.get("status", "") if status not in ("complete", "partial", "blocked", "failed"): return False, f"Invalid status: '{status}'" if status == "failed": return False, f"Agent reported failure: {data.get('notes', 'no details')}" if status == "blocked": return False, f"Agent blocked: {data.get('notes', 'no details')}" # Deliverable enforcement (schema v1.1+) if strict and status == "complete": deliverable = data.get("deliverable") if not deliverable or not isinstance(deliverable, dict): return False, "Missing deliverable block — every completed task must include a deliverable" if not deliverable.get("type"): return False, "Deliverable missing 'type' field" if deliverable["type"] not in DELIVERABLE_TYPES: return False, f"Invalid deliverable type: '{deliverable['type']}' (valid: {', '.join(DELIVERABLE_TYPES)})" if not deliverable.get("summary"): return False, "Deliverable missing 'summary' field" return True, "" def should_retry(result: dict | None, attempt: int, max_retries: int) -> tuple[bool, str]: """Decide whether to retry based on result and attempt count.""" if attempt >= max_retries: return False, "Max retries reached" if result is None: return True, "timeout" status = result.get("status", "") if status == "malformed": return True, "malformed response" if status == "failed": return True, f"agent failed: {result.get('notes', '')}" if status == "partial" and result.get("confidence") == "low": return True, "partial with low confidence" if status == "error": return True, f"error: {result.get('notes', '')}" return False, "" def clear_handoff(run_id: str): """Remove handoff file before retry.""" handoff_path = HANDOFF_DIR / f"{run_id}.json" if handoff_path.exists(): # Rename to .prev instead of deleting (for debugging) handoff_path.rename(handoff_path.with_suffix(".prev.json")) def log_event(run_id: str, agent: str, event_type: str, detail: str = "", attempt: int = 1, elapsed_ms: int = 0, **extra): """Unified logging.""" LOG_DIR.mkdir(parents=True, exist_ok=True) log_file = LOG_DIR / f"{time.strftime('%Y-%m-%d')}.jsonl" entry = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "runId": run_id, "agent": agent, "event": event_type, "detail": detail[:500], "attempt": attempt, "elapsedMs": elapsed_ms, **extra, } with open(log_file, "a") as f: f.write(json.dumps(entry) + "\n") def emit_error(msg: str): """Print error to stderr.""" print(f"ERROR: {msg}", file=sys.stderr) def get_discord_token_for_caller(caller: str) -> str | None: """Load caller bot token from instance config.""" cfg = Path(f"/home/papa/atomizer/instances/{caller}/openclaw.json") if not cfg.exists(): return None try: data = json.loads(cfg.read_text()) return data.get("channels", {}).get("discord", {}).get("token") except Exception: return None def fetch_channel_context(channel: str, messages: int, token: str) -> str | None: """Fetch formatted channel context via helper script.""" script = ORCHESTRATE_DIR / "fetch-channel-context.sh" if not script.exists(): return None try: result = subprocess.run( [str(script), channel, "--messages", str(messages), "--token", token], capture_output=True, text=True, timeout=30, check=False, ) if result.returncode != 0: emit_error(f"Channel context fetch failed: {result.stderr.strip()}") return None return result.stdout.strip() except Exception as e: emit_error(f"Channel context fetch error: {e}") return None # ── Main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Atomizer Orchestration Engine") parser.add_argument("agent", help="Target agent name") parser.add_argument("task", help="Task to delegate") parser.add_argument("--wait", action="store_true", default=True) parser.add_argument("--timeout", type=int, default=300, help="Timeout per attempt in seconds (default: 300)") parser.add_argument("--format", choices=["json", "text"], default="json") parser.add_argument("--context", type=str, default=None, help="Path to context file") parser.add_argument("--no-deliver", action="store_true") parser.add_argument("--run-id", type=str, default=None) parser.add_argument("--retries", type=int, default=1, help="Max attempts (default: 1, max: 3)") parser.add_argument("--validate", action="store_true", default=True, help="Strict validation of handoff fields (default: True since v1.1)") parser.add_argument("--no-validate", action="store_false", dest="validate", help="Disable strict validation") parser.add_argument("--workflow-id", type=str, default=None, help="Workflow run ID for tracing") parser.add_argument("--step-id", type=str, default=None, help="Workflow step ID for tracing") parser.add_argument("--caller", type=str, default=None, help="Calling agent for ACL enforcement") parser.add_argument("--channel-context", type=str, default=None, help="Discord channel name or ID to include as context") parser.add_argument("--channel-messages", type=int, default=20, help="Number of channel messages to fetch (default: 20, max: 30)") args = parser.parse_args() # Clamp retries max_retries = min(max(args.retries, 1), 3) # Generate run ID run_id = args.run_id or f"orch-{int(time.time())}-{uuid.uuid4().hex[:8]}" # Task text can be augmented (e.g., channel context prepend) delegated_task = args.task # ACL check if not check_acl(args.caller, args.agent): result = { "status": "error", "result": None, "notes": f"ACL denied: '{args.caller}' cannot delegate to '{args.agent}'", "agent": args.agent, "runId": run_id, } print(json.dumps(result, indent=2)) log_event(run_id, args.agent, "acl_denied", f"caller={args.caller}") sys.exit(1) # Resolve agent port port = get_agent_port(args.agent) # Health check if not check_health(args.agent, port): result = { "status": "error", "result": None, "notes": f"Agent '{args.agent}' unreachable at port {port}", "agent": args.agent, "runId": run_id, } print(json.dumps(result, indent=2)) log_event(run_id, args.agent, "health_failed", f"port={port}") sys.exit(1) # Load context context = None if args.context: ctx_path = Path(args.context) if ctx_path.exists(): context = ctx_path.read_text() else: emit_error(f"Context file not found: {args.context}") # Optional channel context if args.channel_context: if not args.caller: emit_error("--channel-context requires --caller so bot token can be resolved") sys.exit(1) token = get_discord_token_for_caller(args.caller) if not token: emit_error(f"Could not resolve Discord bot token for caller '{args.caller}'") sys.exit(1) channel_messages = min(max(args.channel_messages, 1), 30) ch_ctx = fetch_channel_context(args.channel_context, channel_messages, token) if not ch_ctx: emit_error(f"Failed to fetch channel context for '{args.channel_context}'") sys.exit(1) delegated_task = f"{ch_ctx}\n\n{delegated_task}" # ── Retry loop ─────────────────────────────────────────────────────── result = None prev_error = None for attempt in range(1, max_retries + 1): attempt_start = time.time() log_event(run_id, args.agent, "attempt_start", delegated_task[:200], attempt=attempt) # Idempotency check: if handoff file exists from a previous attempt, use it handoff_path = HANDOFF_DIR / f"{run_id}.json" if attempt > 1 and handoff_path.exists(): result = read_handoff(handoff_path) if result and result.get("status") in ("complete", "partial"): log_event(run_id, args.agent, "late_arrival", "Handoff file arrived between retries", attempt=attempt) break # Previous result was bad, clear it for retry clear_handoff(run_id) # Send task sent = send_task(args.agent, port, delegated_task, run_id, attempt=attempt, prev_error=prev_error, context=context, no_deliver=args.no_deliver) if not sent: prev_error = "Failed to send task" log_event(run_id, args.agent, "send_failed", prev_error, attempt=attempt) if attempt < max_retries: time.sleep(5) # Brief pause before retry continue result = { "status": "error", "result": None, "notes": f"Failed to send task after {attempt} attempts", } break # Wait for result if args.wait: result = wait_for_handoff(run_id, args.timeout) elapsed = time.time() - attempt_start # Validate if result is not None: valid, error_msg = validate_handoff(result, strict=args.validate) if not valid: log_event(run_id, args.agent, "validation_failed", error_msg, attempt=attempt, elapsed_ms=int(elapsed * 1000)) do_retry, reason = should_retry(result, attempt, max_retries) if do_retry: prev_error = reason clear_handoff(run_id) time.sleep(3) continue # No retry — return what we have break else: # Valid result log_event(run_id, args.agent, "complete", result.get("status", ""), attempt=attempt, elapsed_ms=int(elapsed * 1000), confidence=result.get("confidence")) break else: # Timeout log_event(run_id, args.agent, "timeout", "", attempt=attempt, elapsed_ms=int(elapsed * 1000)) do_retry, reason = should_retry(result, attempt, max_retries) if do_retry: prev_error = "timeout" continue result = { "status": "timeout", "result": None, "notes": f"Agent did not respond within {args.timeout}s " f"(attempt {attempt}/{max_retries})", } break else: # Fire and forget print(json.dumps({"status": "sent", "runId": run_id, "agent": args.agent})) sys.exit(0) # ── Output ─────────────────────────────────────────────────────────── if result is None: result = { "status": "error", "result": None, "notes": "No result after all attempts", } # Add metadata total_elapsed = time.time() - (attempt_start if 'attempt_start' in dir() else time.time()) result["runId"] = run_id result["agent"] = args.agent result["latencyMs"] = int(total_elapsed * 1000) if args.workflow_id: result["workflowRunId"] = args.workflow_id if args.step_id: result["stepId"] = args.step_id if args.format == "json": print(json.dumps(result, indent=2)) else: print(result.get("result", "")) status = result.get("status", "error") sys.exit(0 if status in ("complete", "partial") else 1) if __name__ == "__main__": main()