#!/usr/bin/env python3 """ Conductor Lock Cleanup - Manages lock release when tasks complete Handles: - Releasing per-user locks when conductor tasks finish - Detecting task completion (success/failure) - Cleaning up stale locks from crashed agents - Integration with conductor meta.json for lock tracking This module is called by the watchdog and cleanup processes to ensure locks are released even if an agent crashes. """ import json import sys from pathlib import Path from typing import Optional, Dict, Any import logging logger = logging.getLogger(__name__) # Import the per-user queue manager lib_path = Path(__file__).parent if str(lib_path) not in sys.path: sys.path.insert(0, str(lib_path)) from per_user_queue_manager import PerUserQueueManager class ConductorLockCleanup: """Manages lock cleanup for conductor tasks.""" def __init__(self): self.user_queue_manager = PerUserQueueManager() def check_and_cleanup_conductor_locks( self, project: str, conductor_base: str = None ) -> int: """ Check all conductors for a project and release completed task locks. Args: project: Project name conductor_base: Base path for conductor directories (default /home/{project}/conductor) Returns: Count of locks released """ if conductor_base is None: conductor_base = f"/home/{project}/conductor" conductor_path = Path(conductor_base) locks_released = 0 if not conductor_path.exists(): return locks_released # Check active conductors active_path = conductor_path / "active" if active_path.exists(): for task_dir in active_path.iterdir(): if task_dir.is_dir(): released = self._check_task_directory(task_dir) locks_released += released # Check completed conductors (older than 1 hour) completed_path = conductor_path / "completed" if completed_path.exists(): for task_dir in completed_path.iterdir(): if task_dir.is_dir(): released = self._check_task_directory(task_dir) locks_released += released return locks_released def _check_task_directory(self, task_dir: Path) -> int: """ Check a single task directory and release lock if task is complete. Args: task_dir: Path to task directory Returns: 1 if lock was released, 0 otherwise """ meta_file = task_dir / "meta.json" if not meta_file.exists(): return 0 try: meta = json.loads(meta_file.read_text()) except Exception as e: logger.error(f"Error reading meta.json in {task_dir}: {e}") return 0 # Check if task is complete status = meta.get("status", "unknown") user = meta.get("user") lock_id = meta.get("lock_id") if not user or not lock_id: # No lock info, nothing to clean up return 0 # Task is complete if it's in a "final" state final_states = {"completed", "failed", "cancelled", "error"} if status not in final_states: # Task is still running return 0 # Task is complete, release the lock released = self.user_queue_manager.release_lock(user, lock_id) if released: logger.info( f"Released lock for user {user} (task {meta.get('id')}, " f"status {status})" ) # Update meta.json to mark lock as released meta["lock_released"] = True meta_file.write_text(json.dumps(meta, indent=2)) return 1 else: logger.warning( f"Failed to release lock for user {user} (task {meta.get('id')})" ) return 0 def cleanup_stale_task_locks(self, max_age_seconds: int = 3600) -> int: """ Clean up locks for tasks that are stuck (no heartbeat updates). Args: max_age_seconds: Maximum age of task before lock is considered stale Returns: Count of stale locks cleaned """ locks_cleaned = 0 for lock_info in self.user_queue_manager.get_all_locks(): user = lock_info.get("user") lock_id = lock_info.get("lock_id") acquired_at = lock_info.get("acquired_at") if not user or not lock_id or not acquired_at: continue # Check if lock is stale (no recent heartbeat) from datetime import datetime, timedelta try: acquired_time = datetime.fromisoformat(acquired_at) age = (datetime.now() - acquired_time).total_seconds() if age > max_age_seconds: # Try to clean up the lock released = self.user_queue_manager.release_lock(user, lock_id) if released: logger.info( f"Cleaned up stale lock for user {user} " f"(age {age:.0f}s)" ) locks_cleaned += 1 except Exception as e: logger.error(f"Error processing lock for user {user}: {e}") return locks_cleaned def release_task_lock(self, user: str, task_id: str) -> bool: """ Release lock for a specific task. Args: user: Username task_id: Task ID Returns: True if lock was released """ # Try to find and remove the lock by task_id pattern lock_info = self.user_queue_manager.get_lock_info(user) if not lock_info: logger.warning(f"No active lock found for user {user}") return False if task_id not in lock_info.get("lock_id", ""): logger.warning( f"Task {task_id} doesn't match active lock for user {user}" ) return False lock_id = lock_info.get("lock_id") return self.user_queue_manager.release_lock(user, lock_id) # CLI interface if __name__ == "__main__": import sys logging.basicConfig(level=logging.INFO) cleanup = ConductorLockCleanup() if len(sys.argv) < 2: print("Usage:") print(" conductor_lock_cleanup.py check_project ") print(" conductor_lock_cleanup.py cleanup_stale [max_age_seconds]") print(" conductor_lock_cleanup.py release ") sys.exit(0) cmd = sys.argv[1] if cmd == "check_project" and len(sys.argv) > 2: project = sys.argv[2] count = cleanup.check_and_cleanup_conductor_locks(project) print(f"Released {count} locks for project {project}") elif cmd == "cleanup_stale": max_age = int(sys.argv[2]) if len(sys.argv) > 2 else 3600 count = cleanup.cleanup_stale_task_locks(max_age) print(f"Cleaned up {count} stale locks (max age {max_age}s)") elif cmd == "release" and len(sys.argv) > 3: user = sys.argv[2] task_id = sys.argv[3] released = cleanup.release_task_lock(user, task_id) if released: print(f"Released lock for user {user}, task {task_id}") else: print(f"Failed to release lock for user {user}, task {task_id}") else: print(f"Unknown command: {cmd}") sys.exit(1)