#!/usr/bin/env python3 """ Conductor Task Recovery Auto-recovery for stalled conductor tasks: - Kill zombie processes - Release task locks - Update task status - Move to failed directory if unrecoverable """ import json import os import signal import time from pathlib import Path from datetime import datetime from typing import List, Dict class ConductorRecovery: """Recover from stalled conductor tasks.""" CONDUCTOR_ROOT = Path('/home/admin/conductor') HEARTBEAT_TIMEOUT_SECS = 300 def __init__(self): """Initialize conductor recovery.""" self.conductor_root = self.CONDUCTOR_ROOT self.active_dir = self.conductor_root / 'active' self.failed_dir = self.conductor_root / 'failed' def find_stalled_tasks(self) -> List[Dict]: """ Find all stalled tasks in conductor/active. Returns: List of stalled task metadata dicts """ stalled = [] if not self.active_dir.exists(): return stalled now = time.time() for task_dir in self.active_dir.iterdir(): if not task_dir.is_dir(): continue task_id = task_dir.name stall_reason = None stall_details = {} # Check heartbeat timeout heartbeat_file = task_dir / 'heartbeat.json' if heartbeat_file.exists(): try: hb = json.loads(heartbeat_file.read_text()) hb_age = now - hb.get('ts', 0) if hb_age > self.HEARTBEAT_TIMEOUT_SECS: stall_reason = 'heartbeat_timeout' stall_details = { 'heartbeat_age_secs': int(hb_age), 'last_step': hb.get('step', 'unknown') } except: pass # Check if process exists pid_file = task_dir / 'pid' if pid_file.exists() and not stall_reason: try: pid = int(pid_file.read_text().strip()) if not os.path.exists(f'/proc/{pid}'): stall_reason = 'process_not_found' stall_details = {'pid': pid} except: pass if stall_reason: stalled.append({ 'task_id': task_id, 'task_dir': str(task_dir), 'stall_reason': stall_reason, 'details': stall_details, 'timestamp': now }) return stalled def recover_stalled_task(self, task_id: str, dry_run: bool = True) -> Dict: """ Attempt to recover a single stalled task. Args: task_id: Task ID to recover dry_run: If True, preview actions without making changes Returns: Dict with recovery result """ task_dir = self.active_dir / task_id if not task_dir.exists(): return {'status': 'error', 'message': f'Task {task_id} not found'} actions = [] result_status = 'unknown' # 1. Kill zombie process (if exists) pid_file = task_dir / 'pid' if pid_file.exists(): try: pid = int(pid_file.read_text().strip()) if os.path.exists(f'/proc/{pid}'): actions.append(f"Kill process {pid}") if not dry_run: try: os.kill(pid, signal.SIGTERM) time.sleep(1) # Force kill if still exists if os.path.exists(f'/proc/{pid}'): os.kill(pid, signal.SIGKILL) except: pass else: actions.append(f"Process {pid} already terminated") except: pass # 2. Update heartbeat to current time (signal recovery attempt) heartbeat_file = task_dir / 'heartbeat.json' actions.append("Update heartbeat to current time") if not dry_run: hb_data = { 'ts': time.time(), 'step': 'recovery_attempt', 'recovered_at': datetime.now().isoformat() } heartbeat_file.write_text(json.dumps(hb_data, indent=2)) # 3. Update progress file progress_file = task_dir / 'progress.md' actions.append("Update progress with recovery note") if not dry_run: progress_content = f"""# Task Recovery **Recovered at:** {datetime.now().isoformat()} **Status:** Task was stalled, recovery attempted ## Original Progress (Previous content preserved) ## Recovery Actions - Process killed/terminated - Heartbeat reset - Progress file updated **Next step:** Monitor task progress. If still stalled, may need manual intervention. """ progress_file.write_text(progress_content) # 4. Update meta to mark recovery attempt meta_file = task_dir / 'meta.json' actions.append("Update metadata with recovery flag") if not dry_run: try: meta = json.loads(meta_file.read_text()) meta['recovery_attempts'] = meta.get('recovery_attempts', 0) + 1 meta['last_recovery'] = datetime.now().isoformat() meta_file.write_text(json.dumps(meta, indent=2)) except: pass # 5. Decision: Keep in active or move to failed if too many recovery attempts meta = json.loads(meta_file.read_text()) if meta_file.exists() else {} recovery_attempts = meta.get('recovery_attempts', 0) if recovery_attempts >= 3: result_status = 'moved_to_failed' actions.append("Move to failed (too many recovery attempts)") if not dry_run: self._move_task_to_failed(task_dir, task_id, "Exceeded maximum recovery attempts") else: result_status = 'recovered' actions.append("Keep in active (monitor progress)") return { 'task_id': task_id, 'status': result_status, 'actions': actions, 'dry_run': dry_run, 'timestamp': time.time() } def recover_all_stalled_tasks(self, dry_run: bool = True) -> Dict: """ Recover all stalled tasks. Args: dry_run: If True, preview without making changes Returns: Dict with batch recovery results """ stalled_tasks = self.find_stalled_tasks() if not stalled_tasks: return { 'total_stalled': 0, 'recovered': 0, 'moved_to_failed': 0, 'results': [], 'dry_run': dry_run, 'timestamp': time.time() } results = [] recovered_count = 0 moved_count = 0 for stalled in stalled_tasks: task_id = stalled['task_id'] result = self.recover_stalled_task(task_id, dry_run=dry_run) results.append(result) if result['status'] == 'recovered': recovered_count += 1 elif result['status'] == 'moved_to_failed': moved_count += 1 return { 'total_stalled': len(stalled_tasks), 'recovered': recovered_count, 'moved_to_failed': moved_count, 'results': results, 'dry_run': dry_run, 'timestamp': time.time() } def release_locks(self, task_id: str, dry_run: bool = True) -> Dict: """ Release any locks held by a task. Args: task_id: Task ID dry_run: If True, preview without making changes Returns: Dict with lock release results """ task_dir = self.active_dir / task_id if not task_dir.exists(): return {'status': 'error', 'message': f'Task {task_id} not found'} # Look for lock files lock_dir = task_dir / 'locks' released = [] if lock_dir.exists(): for lock_file in lock_dir.iterdir(): released.append(str(lock_file)) if not dry_run: lock_file.unlink() return { 'task_id': task_id, 'locks_released': len(released), 'lock_files': released, 'dry_run': dry_run, 'timestamp': time.time() } def validate_recovery(self, task_id: str) -> Dict: """ Validate that a task recovered successfully. Args: task_id: Task ID to validate Returns: Dict with validation result """ task_dir = self.active_dir / task_id if not task_dir.exists(): return {'status': 'not_found', 'task_id': task_id} # Check heartbeat is recent heartbeat_file = task_dir / 'heartbeat.json' is_alive = False if heartbeat_file.exists(): try: hb = json.loads(heartbeat_file.read_text()) hb_age = time.time() - hb.get('ts', 0) is_alive = hb_age < 300 # Consider alive if <5min old except: pass # Check for process process_running = False pid_file = task_dir / 'pid' if pid_file.exists(): try: pid = int(pid_file.read_text().strip()) process_running = os.path.exists(f'/proc/{pid}') except: pass # Overall recovery status recovery_status = 'recovered' if is_alive or process_running else 'stalled' return { 'task_id': task_id, 'recovery_status': recovery_status, 'heartbeat_alive': is_alive, 'process_running': process_running, 'timestamp': time.time() } def _move_task_to_failed(self, task_dir: Path, task_id: str, failure_reason: str) -> bool: """Move a task from active to failed.""" try: failed_task_dir = self.failed_dir / task_id failed_task_dir.mkdir(parents=True, exist_ok=True) # Copy all files for item in task_dir.iterdir(): if item.is_file(): import shutil shutil.copy2(item, failed_task_dir / item.name) # Update meta with failure reason meta_file = failed_task_dir / 'meta.json' if meta_file.exists(): meta = json.loads(meta_file.read_text()) else: meta = {} meta['failure_reason'] = failure_reason meta['moved_to_failed_at'] = datetime.now().isoformat() meta_file.write_text(json.dumps(meta, indent=2)) # Create error.txt error_file = failed_task_dir / 'error.txt' error_file.write_text(f"Task stalled: {failure_reason}\nMoved to failed: {datetime.now().isoformat()}") # Remove from active import shutil shutil.rmtree(task_dir) return True except Exception as e: print(f"Error moving task {task_id} to failed: {e}") return False if __name__ == '__main__': recovery = ConductorRecovery() print("=" * 70) print("FINDING STALLED TASKS") print("=" * 70) stalled = recovery.find_stalled_tasks() print(f"Found {len(stalled)} stalled task(s)") for task in stalled[:5]: print(f" - {task['task_id']}: {task['stall_reason']}") if stalled: print("\n" + "=" * 70) print("RECOVERY DRY RUN (preview only)") print("=" * 70) result = recovery.recover_all_stalled_tasks(dry_run=True) print(f"Would recover: {result['recovered']}") print(f"Would move to failed: {result['moved_to_failed']}") print("\nActions:") for r in result['results'][:1]: for action in r['actions']: print(f" - {action}")