#!/usr/bin/env python3 """ Responsive Dispatcher - Non-blocking Task Dispatch with Live Status Updates Implements: - Immediate job_id return on task dispatch - Background job status monitoring without blocking - Live progress feedback system - Concurrent task management - Status caching for fast retrieval Key Features: 1. Dispatch returns immediately with job_id 2. Background monitor updates job status files 3. CLI can poll status without blocking 4. Multiple concurrent tasks tracked independently 5. Status persisted to disk for resilience """ import json import os import subprocess import time from datetime import datetime from pathlib import Path from typing import Dict, Optional, Tuple, Any import threading import queue class ResponseiveDispatcher: """Non-blocking task dispatcher with background monitoring""" def __init__(self, jobs_dir: Path = None): self.jobs_dir = jobs_dir or Path("/var/lib/luzia/jobs") self.jobs_dir.mkdir(parents=True, exist_ok=True) self.monitoring_queue = queue.Queue() self.status_cache = {} # Local cache for fast retrieval self.cache_update_time = {} # Track cache freshness def dispatch_task(self, project: str, task: str, priority: int = 5) -> Tuple[str, Dict]: """ Dispatch a task immediately, returning job_id and initial status. Returns: (job_id, status_dict) """ job_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:] job_dir = self.jobs_dir / job_id # Create job directory atomically job_dir.mkdir(parents=True, exist_ok=True) # Write initial status initial_status = { "id": job_id, "project": project, "task": task[:200], # Truncate long tasks "status": "dispatched", "priority": priority, "dispatched_at": datetime.now().isoformat(), "progress": 0, "message": "Task queued for execution", } status_file = job_dir / "status.json" self._write_status(status_file, initial_status) # Queue for background monitoring self.monitoring_queue.put({ "job_id": job_id, "project": project, "task": task, "job_dir": str(job_dir), "priority": priority, }) # Update local cache self.status_cache[job_id] = initial_status self.cache_update_time[job_id] = time.time() return job_id, initial_status def get_status(self, job_id: str, use_cache: bool = True) -> Optional[Dict]: """ Get current status of a job. Args: job_id: Job ID to query use_cache: Use cached status if fresh (< 1 second old) Returns: Status dict or None if job not found """ # Check cache first if use_cache and job_id in self.status_cache: cache_age = time.time() - self.cache_update_time.get(job_id, 0) if cache_age < 1.0: # Cache valid for 1 second return self.status_cache[job_id] # Read from disk status_file = self.jobs_dir / job_id / "status.json" if not status_file.exists(): return None try: status = json.loads(status_file.read_text()) self.status_cache[job_id] = status self.cache_update_time[job_id] = time.time() return status except (json.JSONDecodeError, IOError): return None def update_status( self, job_id: str, status: str, progress: int = None, message: str = None, exit_code: int = None, ) -> bool: """ Update job status. Used by background monitor. Returns: True if update successful, False otherwise """ status_file = self.jobs_dir / job_id / "status.json" if not status_file.exists(): return False try: current = json.loads(status_file.read_text()) except (json.JSONDecodeError, IOError): return False # Update fields current["status"] = status if progress is not None: current["progress"] = min(100, max(0, progress)) if message: current["message"] = message if exit_code is not None: current["exit_code"] = exit_code current["updated_at"] = datetime.now().isoformat() self._write_status(status_file, current) # Update cache self.status_cache[job_id] = current self.cache_update_time[job_id] = time.time() return True def list_jobs(self, project: str = None, status_filter: str = None) -> list: """ List jobs, optionally filtered by project and status. Returns: List of job status dicts """ jobs = [] for job_dir in sorted(self.jobs_dir.iterdir(), reverse=True): if not job_dir.is_dir(): continue status = self.get_status(job_dir.name) if not status: continue # Apply filters if project and status.get("project") != project: continue if status_filter and status.get("status") != status_filter: continue jobs.append(status) return jobs[:50] # Return last 50 jobs def wait_for_job(self, job_id: str, timeout: int = None, poll_interval: float = 0.5): """ Wait for job to complete (blocking). Useful for critical operations that need synchronization. Args: job_id: Job ID to wait for timeout: Max seconds to wait (None = wait forever) poll_interval: Seconds between status checks Returns: Final status dict or None if timeout """ start_time = time.time() while True: status = self.get_status(job_id, use_cache=False) if not status: return None if status.get("status") in ["completed", "failed", "killed"]: return status if timeout: elapsed = time.time() - start_time if elapsed > timeout: return None time.sleep(poll_interval) def stream_status(self, job_id: str, interval: float = 0.5) -> None: """ Stream status updates to stdout without blocking main loop. Useful for long-running tasks. Args: job_id: Job ID to stream interval: Seconds between updates """ last_msg = None while True: status = self.get_status(job_id, use_cache=False) if not status: print(f"Job {job_id} not found") return # Print new messages msg = status.get("message") if msg and msg != last_msg: progress = status.get("progress", 0) print(f" [{progress}%] {msg}") last_msg = msg # Check if done if status.get("status") in ["completed", "failed", "killed"]: exit_code = status.get("exit_code", -1) print(f" [100%] {status['status'].title()} (exit {exit_code})") return time.sleep(interval) def start_background_monitor(self) -> threading.Thread: """ Start background monitor thread that processes queued jobs. Returns: Monitor thread (started, daemon=True) """ monitor = threading.Thread(target=self._monitor_loop, daemon=True) monitor.start() return monitor def _monitor_loop(self): """Background monitor loop - processes jobs from queue""" while True: try: # Get next job from queue with short timeout job_info = self.monitoring_queue.get(timeout=1.0) self._monitor_job(job_info) except queue.Empty: continue except Exception as e: print(f"[Monitor error] {e}", flush=True) def _monitor_job(self, job_info: Dict): """Monitor a single job's execution""" job_id = job_info["job_id"] project = job_info["project"] job_dir = Path(job_info["job_dir"]) # Update status: starting self.update_status(job_id, "starting", progress=5, message="Agent initialization") # Wait for agent to start (check for meta.json or output) max_wait = 30 for _ in range(max_wait): output_file = job_dir / "output.log" meta_file = job_dir / "meta.json" if output_file.exists() or meta_file.exists(): self.update_status(job_id, "running", progress=10, message="Agent running") break time.sleep(0.5) else: # Timeout waiting for agent to start self.update_status(job_id, "failed", progress=0, message="Agent failed to start") return # Monitor execution output_file = job_dir / "output.log" last_size = 0 stalled_count = 0 while True: # Check if completed if output_file.exists(): content = output_file.read_text() if "exit:" in content: # Parse exit code lines = content.strip().split("\n") for line in reversed(lines): if line.startswith("exit:"): exit_code = int(line.split(":")[1]) status = "completed" if exit_code == 0 else "failed" self.update_status( job_id, status, progress=100, message=f"Agent {status}", exit_code=exit_code, ) return # Update progress based on output size current_size = len(content) if current_size > last_size: progress = min(95, 10 + (current_size // 1000)) # Rough progress indicator self.update_status(job_id, "running", progress=progress) last_size = current_size stalled_count = 0 else: stalled_count += 1 if stalled_count > 30: # 30 * 1 second = 30 seconds with no output self.update_status( job_id, "stalled", progress=50, message="No output for 30 seconds" ) time.sleep(1.0) @staticmethod def _write_status(path: Path, data: Dict) -> None: """Write status atomically""" tmp_path = path.with_suffix(".json.tmp") with open(tmp_path, "w") as f: json.dump(data, f, indent=2) f.flush() os.fsync(f.fileno()) os.rename(tmp_path, path) # Helper function for quick dispatch def quick_dispatch(job_id: str, project: str, task: str) -> Dict: """Quick dispatch helper - returns status dict with job_id""" dispatcher = ResponseiveDispatcher() _, status = dispatcher.dispatch_task(project, task) return status