#!/usr/bin/env python3 """ atomizer_job_watcher.py — Windows Job Queue Service Watches C:\\Atomizer\\job-queue\\pending\\ for new jobs. Moves them through pending → running → completed/failed. Usage: python atomizer_job_watcher.py # Watch mode (continuous) python atomizer_job_watcher.py --once # Process pending, then exit Install as service (optional): nssm install AtomizerJobWatcher "C:\\...\\python.exe" "C:\\Atomizer\\atomizer_job_watcher.py" """ import json import shutil import subprocess import sys import time import logging from pathlib import Path from datetime import datetime, timezone JOB_QUEUE = Path(r"C:\Atomizer\job-queue") PENDING = JOB_QUEUE / "pending" RUNNING = JOB_QUEUE / "running" COMPLETED = JOB_QUEUE / "completed" FAILED = JOB_QUEUE / "failed" # Update this to match your Conda/Python path CONDA_PYTHON = r"C:\Users\antoi\anaconda3\envs\atomizer\python.exe" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(JOB_QUEUE / "watcher.log"), logging.StreamHandler() ] ) log = logging.getLogger("job-watcher") def now_iso(): return datetime.now(timezone.utc).isoformat() def run_job(job_dir: Path): """Execute a single job.""" job_file = job_dir / "job.json" if not job_file.exists(): log.warning(f"No job.json in {job_dir}, skipping") return with open(job_file) as f: job = json.load(f) job_id = job.get("job_id", job_dir.name) log.info(f"Starting job: {job_id}") # Move to running/ running_dir = RUNNING / job_dir.name if running_dir.exists(): shutil.rmtree(running_dir) shutil.move(str(job_dir), str(running_dir)) # Update status job["status"] = "running" job["status_updated_at"] = now_iso() with open(running_dir / "job.json", "w") as f: json.dump(job, f, indent=2) # Execute script = running_dir / job.get("script", "run_optimization.py") args = [CONDA_PYTHON, str(script)] + job.get("args", []) stdout_log = running_dir / "stdout.log" stderr_log = running_dir / "stderr.log" start_time = time.time() try: import os env = {**os.environ, "ATOMIZER_JOB_ID": job_id} result = subprocess.run( args, cwd=str(running_dir), stdout=open(stdout_log, "w"), stderr=open(stderr_log, "w"), timeout=job.get("timeout_seconds", 86400), # 24h default env=env ) duration = time.time() - start_time if result.returncode == 0: job["status"] = "completed" dest = COMPLETED / job_dir.name else: job["status"] = "failed" job["error"] = f"Exit code: {result.returncode}" dest = FAILED / job_dir.name job["duration_seconds"] = round(duration, 1) except subprocess.TimeoutExpired: job["status"] = "failed" job["error"] = "Timeout exceeded" job["duration_seconds"] = round(time.time() - start_time, 1) dest = FAILED / job_dir.name except Exception as e: job["status"] = "failed" job["error"] = str(e) dest = FAILED / job_dir.name job["status_updated_at"] = now_iso() with open(running_dir / "job.json", "w") as f: json.dump(job, f, indent=2) if dest.exists(): shutil.rmtree(dest) shutil.move(str(running_dir), str(dest)) log.info(f"Job {job_id}: {job['status']} ({job.get('duration_seconds', '?')}s)") def process_pending(): """Process all pending jobs.""" for job_dir in sorted(PENDING.iterdir()): if job_dir.is_dir() and (job_dir / "job.json").exists(): run_job(job_dir) def watch(): """Watch for new jobs (polling mode — no watchdog dependency).""" log.info(f"Job watcher started. Monitoring: {PENDING}") seen = set() while True: try: current = set() for job_dir in PENDING.iterdir(): if job_dir.is_dir() and (job_dir / "job.json").exists(): current.add(job_dir.name) if job_dir.name not in seen: # Wait for Syncthing to finish syncing time.sleep(5) if (job_dir / "job.json").exists(): run_job(job_dir) seen = current except Exception as e: log.error(f"Watch loop error: {e}") time.sleep(10) # Poll every 10 seconds def main(): for d in [PENDING, RUNNING, COMPLETED, FAILED]: d.mkdir(parents=True, exist_ok=True) if "--once" in sys.argv: process_pending() else: # Process existing pending first process_pending() # Then watch for new ones watch() if __name__ == "__main__": main()