#!/usr/bin/env python3 """ Task Watchdog - Monitor running tasks for stuck/failed states Key responsibilities: 1. Monitor heartbeats for running tasks 2. Detect and clean up stuck/orphaned tasks 3. Release stale locks automatically 4. Track task state transitions 5. Support cockpit "awaiting_human" state Usage: from task_watchdog import TaskWatchdog watchdog = TaskWatchdog() watchdog.run_check() # Single check watchdog.run_loop() # Continuous monitoring """ import json import os import time import signal from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import fcntl class TaskWatchdog: """Monitor running tasks for stuck/failed states.""" # Configuration CONDUCTOR_BASE = Path.home() / "conductor" ACTIVE_DIR = CONDUCTOR_BASE / "active" COMPLETED_DIR = CONDUCTOR_BASE / "completed" FAILED_DIR = CONDUCTOR_BASE / "failed" QUEUE_BASE = Path("/var/lib/luzia/queue") LOCKS_BASE = Path("/var/lib/luzia/locks") CAPACITY_FILE = QUEUE_BASE / "capacity.json" JOBS_DIR = Path("/var/log/luz-orchestrator/jobs") # Cockpit state directory COCKPIT_STATE_DIR = Path("/var/lib/luz-orchestrator/cockpits") # Timeouts HEARTBEAT_TIMEOUT_SECONDS = 300 # 5 minutes without heartbeat = stuck LOCK_TIMEOUT_SECONDS = 3600 # 1 hour lock timeout AWAITING_HUMAN_TIMEOUT = 86400 # 24 hours for human response # State transitions STATES = { 'pending': ['claimed', 'cancelled'], 'claimed': ['dispatched', 'failed', 'cancelled'], 'dispatched': ['running', 'failed'], 'running': ['completed', 'failed', 'awaiting_human', 'stuck'], 'awaiting_human': ['running', 'failed', 'cancelled'], 'stuck': ['failed', 'recovered'], 'completed': [], 'failed': ['retrying'], 'retrying': ['running', 'failed'], 'cancelled': [] } def __init__(self): """Initialize watchdog.""" self._ensure_dirs() def _ensure_dirs(self): """Ensure required directories exist.""" for d in [self.ACTIVE_DIR, self.COMPLETED_DIR, self.FAILED_DIR]: d.mkdir(parents=True, exist_ok=True) self.LOCKS_BASE.mkdir(parents=True, exist_ok=True) self.COCKPIT_STATE_DIR.mkdir(parents=True, exist_ok=True) def check_heartbeats(self) -> List[Dict]: """ Check all active tasks for stale heartbeats. Returns list of stuck tasks. """ stuck_tasks = [] now = time.time() if not self.ACTIVE_DIR.exists(): return stuck_tasks for task_dir in self.ACTIVE_DIR.iterdir(): if not task_dir.is_dir(): continue task_id = task_dir.name heartbeat_file = task_dir / "heartbeat.json" meta_file = task_dir / "meta.json" # Load task metadata meta = {} if meta_file.exists(): try: meta = json.loads(meta_file.read_text()) except: pass # Check if task is in awaiting_human state (from cockpit) if meta.get('status') == 'awaiting_human': # Check if awaiting too long awaiting_since = meta.get('awaiting_since', now) if now - awaiting_since > self.AWAITING_HUMAN_TIMEOUT: stuck_tasks.append({ 'task_id': task_id, 'reason': 'awaiting_human_timeout', 'last_heartbeat': None, 'meta': meta }) continue # Check heartbeat for running tasks if not heartbeat_file.exists(): # No heartbeat file = check task age created = meta.get('created_at') if created: try: created_ts = datetime.fromisoformat(created).timestamp() if now - created_ts > self.HEARTBEAT_TIMEOUT_SECONDS: stuck_tasks.append({ 'task_id': task_id, 'reason': 'no_heartbeat', 'last_heartbeat': None, 'meta': meta }) except: pass continue try: heartbeat = json.loads(heartbeat_file.read_text()) last_ts = heartbeat.get('ts', 0) if now - last_ts > self.HEARTBEAT_TIMEOUT_SECONDS: stuck_tasks.append({ 'task_id': task_id, 'reason': 'stale_heartbeat', 'last_heartbeat': last_ts, 'last_step': heartbeat.get('step'), 'meta': meta }) except Exception as e: stuck_tasks.append({ 'task_id': task_id, 'reason': 'heartbeat_read_error', 'error': str(e), 'meta': meta }) return stuck_tasks def cleanup_orphaned_tasks(self) -> List[str]: """ Clean up tasks where agent died. Moves stuck tasks to failed directory. Returns list of cleaned task IDs. """ cleaned = [] stuck_tasks = self.check_heartbeats() for task in stuck_tasks: task_id = task['task_id'] task_dir = self.ACTIVE_DIR / task_id if not task_dir.exists(): continue # Update meta with failure info meta_file = task_dir / "meta.json" meta = {} if meta_file.exists(): try: meta = json.loads(meta_file.read_text()) except: pass meta['status'] = 'failed' meta['failure_reason'] = task.get('reason', 'unknown') meta['failed_at'] = datetime.now().isoformat() meta['last_heartbeat'] = task.get('last_heartbeat') # Write updated meta try: with open(meta_file, 'w') as f: json.dump(meta, f, indent=2) except: pass # Move to failed directory failed_dest = self.FAILED_DIR / task_id try: if failed_dest.exists(): # Remove old failed version import shutil shutil.rmtree(failed_dest) task_dir.rename(failed_dest) cleaned.append(task_id) except Exception as e: print(f"Error moving task {task_id} to failed: {e}") return cleaned def release_stale_locks(self) -> List[str]: """ Release locks for dead tasks. Returns list of released lock IDs. """ released = [] now = time.time() if not self.LOCKS_BASE.exists(): return released for lock_file in self.LOCKS_BASE.glob("user_*.lock"): try: # Read lock metadata meta_file = lock_file.with_suffix('.json') if not meta_file.exists(): # Old lock without metadata - remove if lock file is old if now - lock_file.stat().st_mtime > self.LOCK_TIMEOUT_SECONDS: lock_file.unlink() released.append(lock_file.name) continue meta = json.loads(meta_file.read_text()) expires_at = meta.get('expires_at', 0) if now > expires_at: # Lock expired - remove both files lock_file.unlink() meta_file.unlink() released.append(meta.get('lock_id', lock_file.name)) except Exception as e: print(f"Error checking lock {lock_file}: {e}") return released def update_capacity(self, released_slots: int = 0) -> bool: """ Update capacity file to reflect released resources. Uses file locking for safety. """ if not self.CAPACITY_FILE.exists(): return False try: with open(self.CAPACITY_FILE, 'r+') as f: fcntl.flock(f, fcntl.LOCK_EX) try: capacity = json.load(f) # Update available slots current = capacity.get('slots', {}).get('available', 0) max_slots = capacity.get('slots', {}).get('max', 4) capacity['slots']['available'] = min(current + released_slots, max_slots) # Update timestamp capacity['last_updated'] = datetime.now().isoformat() # Write back f.seek(0) f.truncate() json.dump(capacity, f, indent=2) finally: fcntl.flock(f, fcntl.LOCK_UN) return True except Exception as e: print(f"Error updating capacity: {e}") return False def get_project_queue_status(self) -> Dict[str, Dict]: """ Get queue status per project. Returns dict of project -> {pending, running, awaiting_human} """ status = {} # Count pending tasks pending_dirs = [ self.QUEUE_BASE / "pending" / "high", self.QUEUE_BASE / "pending" / "normal" ] for pending_dir in pending_dirs: if not pending_dir.exists(): continue for task_file in pending_dir.glob("*.json"): try: task = json.loads(task_file.read_text()) project = task.get('project', 'unknown') if project not in status: status[project] = {'pending': 0, 'running': 0, 'awaiting_human': 0} status[project]['pending'] += 1 except: pass # Count active tasks if self.ACTIVE_DIR.exists(): for task_dir in self.ACTIVE_DIR.iterdir(): if not task_dir.is_dir(): continue meta_file = task_dir / "meta.json" if meta_file.exists(): try: meta = json.loads(meta_file.read_text()) project = meta.get('project', 'unknown') if project not in status: status[project] = {'pending': 0, 'running': 0, 'awaiting_human': 0} if meta.get('status') == 'awaiting_human': status[project]['awaiting_human'] += 1 else: status[project]['running'] += 1 except: pass # Check cockpit states if self.COCKPIT_STATE_DIR.exists(): for state_file in self.COCKPIT_STATE_DIR.glob("*.json"): try: state = json.loads(state_file.read_text()) project = state.get('project') if project and state.get('awaiting_response'): if project not in status: status[project] = {'pending': 0, 'running': 0, 'awaiting_human': 0} status[project]['awaiting_human'] += 1 except: pass return status def is_project_blocked(self, project: str) -> Tuple[bool, Optional[str]]: """ Check if a project queue is blocked. Returns (is_blocked, reason) """ # Check cockpit state cockpit_state = self.COCKPIT_STATE_DIR / f"{project}.json" if cockpit_state.exists(): try: state = json.loads(cockpit_state.read_text()) if state.get('awaiting_response'): return True, "awaiting_human_cockpit" except: pass # Check active tasks for awaiting_human if self.ACTIVE_DIR.exists(): for task_dir in self.ACTIVE_DIR.iterdir(): if not task_dir.is_dir(): continue meta_file = task_dir / "meta.json" if meta_file.exists(): try: meta = json.loads(meta_file.read_text()) if meta.get('project') == project and meta.get('status') == 'awaiting_human': return True, f"awaiting_human_task:{task_dir.name}" except: pass return False, None def set_task_awaiting_human(self, task_id: str, question: str = None) -> bool: """ Mark a task as awaiting human response. This blocks the project queue. """ task_dir = self.ACTIVE_DIR / task_id meta_file = task_dir / "meta.json" if not meta_file.exists(): return False try: meta = json.loads(meta_file.read_text()) meta['status'] = 'awaiting_human' meta['awaiting_since'] = time.time() if question: meta['awaiting_question'] = question with open(meta_file, 'w') as f: json.dump(meta, f, indent=2) return True except Exception as e: print(f"Error setting task awaiting: {e}") return False def resume_task(self, task_id: str, answer: str = None) -> bool: """ Resume a task that was awaiting human response. """ task_dir = self.ACTIVE_DIR / task_id meta_file = task_dir / "meta.json" if not meta_file.exists(): return False try: meta = json.loads(meta_file.read_text()) if meta.get('status') != 'awaiting_human': return False meta['status'] = 'running' meta['resumed_at'] = datetime.now().isoformat() if answer: meta['human_response'] = answer # Update heartbeat heartbeat_file = task_dir / "heartbeat.json" with open(heartbeat_file, 'w') as f: json.dump({'ts': time.time(), 'step': 'Resumed after human response'}, f) with open(meta_file, 'w') as f: json.dump(meta, f, indent=2) return True except Exception as e: print(f"Error resuming task: {e}") return False def run_check(self) -> Dict: """ Run a single watchdog check. Returns summary of actions taken. """ summary = { 'timestamp': datetime.now().isoformat(), 'stuck_tasks': [], 'cleaned_tasks': [], 'released_locks': [], 'project_status': {} } # Check for stuck tasks stuck = self.check_heartbeats() summary['stuck_tasks'] = [t['task_id'] for t in stuck] # Clean up orphaned tasks cleaned = self.cleanup_orphaned_tasks() summary['cleaned_tasks'] = cleaned # Release stale locks released = self.release_stale_locks() summary['released_locks'] = released # Update capacity if we cleaned anything if cleaned or released: self.update_capacity(released_slots=len(cleaned)) # Get project queue status summary['project_status'] = self.get_project_queue_status() return summary def run_loop(self, interval_seconds: int = 60): """ Run continuous watchdog monitoring. """ print(f"Starting Task Watchdog (interval: {interval_seconds}s)") def handle_signal(signum, frame): print("\nWatchdog shutting down...") exit(0) signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) while True: try: summary = self.run_check() # Log if any actions were taken if summary['stuck_tasks'] or summary['cleaned_tasks'] or summary['released_locks']: print(f"[{datetime.now().isoformat()}] Watchdog check:") if summary['stuck_tasks']: print(f" Stuck tasks: {summary['stuck_tasks']}") if summary['cleaned_tasks']: print(f" Cleaned: {summary['cleaned_tasks']}") if summary['released_locks']: print(f" Released locks: {summary['released_locks']}") time.sleep(interval_seconds) except Exception as e: print(f"Watchdog error: {e}") time.sleep(interval_seconds) def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser(description='Task Watchdog') parser.add_argument('command', nargs='?', default='check', choices=['check', 'daemon', 'status', 'stuck', 'clean'], help='Command to run') parser.add_argument('--interval', type=int, default=60, help='Check interval for daemon mode (seconds)') args = parser.parse_args() watchdog = TaskWatchdog() if args.command == 'check': summary = watchdog.run_check() print(json.dumps(summary, indent=2)) elif args.command == 'daemon': watchdog.run_loop(interval_seconds=args.interval) elif args.command == 'status': status = watchdog.get_project_queue_status() print("Project Queue Status:") print(json.dumps(status, indent=2)) elif args.command == 'stuck': stuck = watchdog.check_heartbeats() if stuck: print(f"Found {len(stuck)} stuck tasks:") for t in stuck: print(f" - {t['task_id']}: {t['reason']}") else: print("No stuck tasks found") elif args.command == 'clean': cleaned = watchdog.cleanup_orphaned_tasks() released = watchdog.release_stale_locks() print(f"Cleaned {len(cleaned)} orphaned tasks") print(f"Released {len(released)} stale locks") if __name__ == '__main__': main()