- 8-agent OpenClaw cluster (Manager, Tech-Lead, Secretary, Auditor, Optimizer, Study-Builder, NX-Expert, Webster) - Orchestration engine: orchestrate.py (sync delegation + handoffs) - Workflow engine: YAML-defined multi-step pipelines - Agent workspaces: SOUL.md, AGENTS.md, MEMORY.md per agent - Shared skills: delegate, orchestrate, atomizer-protocols - Capability registry (AGENTS_REGISTRY.json) - Cluster management: cluster.sh, systemd template - All secrets replaced with env var references
171 lines
4.8 KiB
Python
171 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
atomizer_job_watcher.py — Windows Job Queue Service
|
|
Watches C:\\Atomizer\\job-queue\\pending\\ for new jobs.
|
|
Moves them through pending → running → completed/failed.
|
|
|
|
Usage:
|
|
python atomizer_job_watcher.py # Watch mode (continuous)
|
|
python atomizer_job_watcher.py --once # Process pending, then exit
|
|
|
|
Install as service (optional):
|
|
nssm install AtomizerJobWatcher "C:\\...\\python.exe" "C:\\Atomizer\\atomizer_job_watcher.py"
|
|
"""
|
|
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
JOB_QUEUE = Path(r"C:\Atomizer\job-queue")
|
|
PENDING = JOB_QUEUE / "pending"
|
|
RUNNING = JOB_QUEUE / "running"
|
|
COMPLETED = JOB_QUEUE / "completed"
|
|
FAILED = JOB_QUEUE / "failed"
|
|
|
|
# Update this to match your Conda/Python path
|
|
CONDA_PYTHON = r"C:\Users\antoi\anaconda3\envs\atomizer\python.exe"
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(JOB_QUEUE / "watcher.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
log = logging.getLogger("job-watcher")
|
|
|
|
|
|
def now_iso():
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def run_job(job_dir: Path):
|
|
"""Execute a single job."""
|
|
job_file = job_dir / "job.json"
|
|
if not job_file.exists():
|
|
log.warning(f"No job.json in {job_dir}, skipping")
|
|
return
|
|
|
|
with open(job_file) as f:
|
|
job = json.load(f)
|
|
|
|
job_id = job.get("job_id", job_dir.name)
|
|
log.info(f"Starting job: {job_id}")
|
|
|
|
# Move to running/
|
|
running_dir = RUNNING / job_dir.name
|
|
if running_dir.exists():
|
|
shutil.rmtree(running_dir)
|
|
shutil.move(str(job_dir), str(running_dir))
|
|
|
|
# Update status
|
|
job["status"] = "running"
|
|
job["status_updated_at"] = now_iso()
|
|
with open(running_dir / "job.json", "w") as f:
|
|
json.dump(job, f, indent=2)
|
|
|
|
# Execute
|
|
script = running_dir / job.get("script", "run_optimization.py")
|
|
args = [CONDA_PYTHON, str(script)] + job.get("args", [])
|
|
|
|
stdout_log = running_dir / "stdout.log"
|
|
stderr_log = running_dir / "stderr.log"
|
|
|
|
start_time = time.time()
|
|
try:
|
|
import os
|
|
env = {**os.environ, "ATOMIZER_JOB_ID": job_id}
|
|
|
|
result = subprocess.run(
|
|
args,
|
|
cwd=str(running_dir),
|
|
stdout=open(stdout_log, "w"),
|
|
stderr=open(stderr_log, "w"),
|
|
timeout=job.get("timeout_seconds", 86400), # 24h default
|
|
env=env
|
|
)
|
|
duration = time.time() - start_time
|
|
|
|
if result.returncode == 0:
|
|
job["status"] = "completed"
|
|
dest = COMPLETED / job_dir.name
|
|
else:
|
|
job["status"] = "failed"
|
|
job["error"] = f"Exit code: {result.returncode}"
|
|
dest = FAILED / job_dir.name
|
|
|
|
job["duration_seconds"] = round(duration, 1)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
job["status"] = "failed"
|
|
job["error"] = "Timeout exceeded"
|
|
job["duration_seconds"] = round(time.time() - start_time, 1)
|
|
dest = FAILED / job_dir.name
|
|
|
|
except Exception as e:
|
|
job["status"] = "failed"
|
|
job["error"] = str(e)
|
|
dest = FAILED / job_dir.name
|
|
|
|
job["status_updated_at"] = now_iso()
|
|
with open(running_dir / "job.json", "w") as f:
|
|
json.dump(job, f, indent=2)
|
|
|
|
if dest.exists():
|
|
shutil.rmtree(dest)
|
|
shutil.move(str(running_dir), str(dest))
|
|
log.info(f"Job {job_id}: {job['status']} ({job.get('duration_seconds', '?')}s)")
|
|
|
|
|
|
def process_pending():
|
|
"""Process all pending jobs."""
|
|
for job_dir in sorted(PENDING.iterdir()):
|
|
if job_dir.is_dir() and (job_dir / "job.json").exists():
|
|
run_job(job_dir)
|
|
|
|
|
|
def watch():
|
|
"""Watch for new jobs (polling mode — no watchdog dependency)."""
|
|
log.info(f"Job watcher started. Monitoring: {PENDING}")
|
|
seen = set()
|
|
|
|
while True:
|
|
try:
|
|
current = set()
|
|
for job_dir in PENDING.iterdir():
|
|
if job_dir.is_dir() and (job_dir / "job.json").exists():
|
|
current.add(job_dir.name)
|
|
if job_dir.name not in seen:
|
|
# Wait for Syncthing to finish syncing
|
|
time.sleep(5)
|
|
if (job_dir / "job.json").exists():
|
|
run_job(job_dir)
|
|
seen = current
|
|
except Exception as e:
|
|
log.error(f"Watch loop error: {e}")
|
|
|
|
time.sleep(10) # Poll every 10 seconds
|
|
|
|
|
|
def main():
|
|
for d in [PENDING, RUNNING, COMPLETED, FAILED]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
if "--once" in sys.argv:
|
|
process_pending()
|
|
else:
|
|
# Process existing pending first
|
|
process_pending()
|
|
# Then watch for new ones
|
|
watch()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|