Files
luzia/lib/conductor_lock_cleanup.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

238 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""
Conductor Lock Cleanup - Manages lock release when tasks complete
Handles:
- Releasing per-user locks when conductor tasks finish
- Detecting task completion (success/failure)
- Cleaning up stale locks from crashed agents
- Integration with conductor meta.json for lock tracking
This module is called by the watchdog and cleanup processes to ensure
locks are released even if an agent crashes.
"""
import json
import sys
from pathlib import Path
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
# Import the per-user queue manager
lib_path = Path(__file__).parent
if str(lib_path) not in sys.path:
sys.path.insert(0, str(lib_path))
from per_user_queue_manager import PerUserQueueManager
class ConductorLockCleanup:
"""Manages lock cleanup for conductor tasks."""
def __init__(self):
self.user_queue_manager = PerUserQueueManager()
def check_and_cleanup_conductor_locks(
self, project: str, conductor_base: str = None
) -> int:
"""
Check all conductors for a project and release completed task locks.
Args:
project: Project name
conductor_base: Base path for conductor directories (default /home/{project}/conductor)
Returns:
Count of locks released
"""
if conductor_base is None:
conductor_base = f"/home/{project}/conductor"
conductor_path = Path(conductor_base)
locks_released = 0
if not conductor_path.exists():
return locks_released
# Check active conductors
active_path = conductor_path / "active"
if active_path.exists():
for task_dir in active_path.iterdir():
if task_dir.is_dir():
released = self._check_task_directory(task_dir)
locks_released += released
# Check completed conductors (older than 1 hour)
completed_path = conductor_path / "completed"
if completed_path.exists():
for task_dir in completed_path.iterdir():
if task_dir.is_dir():
released = self._check_task_directory(task_dir)
locks_released += released
return locks_released
def _check_task_directory(self, task_dir: Path) -> int:
"""
Check a single task directory and release lock if task is complete.
Args:
task_dir: Path to task directory
Returns:
1 if lock was released, 0 otherwise
"""
meta_file = task_dir / "meta.json"
if not meta_file.exists():
return 0
try:
meta = json.loads(meta_file.read_text())
except Exception as e:
logger.error(f"Error reading meta.json in {task_dir}: {e}")
return 0
# Check if task is complete
status = meta.get("status", "unknown")
user = meta.get("user")
lock_id = meta.get("lock_id")
if not user or not lock_id:
# No lock info, nothing to clean up
return 0
# Task is complete if it's in a "final" state
final_states = {"completed", "failed", "cancelled", "error"}
if status not in final_states:
# Task is still running
return 0
# Task is complete, release the lock
released = self.user_queue_manager.release_lock(user, lock_id)
if released:
logger.info(
f"Released lock for user {user} (task {meta.get('id')}, "
f"status {status})"
)
# Update meta.json to mark lock as released
meta["lock_released"] = True
meta_file.write_text(json.dumps(meta, indent=2))
return 1
else:
logger.warning(
f"Failed to release lock for user {user} (task {meta.get('id')})"
)
return 0
def cleanup_stale_task_locks(self, max_age_seconds: int = 3600) -> int:
"""
Clean up locks for tasks that are stuck (no heartbeat updates).
Args:
max_age_seconds: Maximum age of task before lock is considered stale
Returns:
Count of stale locks cleaned
"""
locks_cleaned = 0
for lock_info in self.user_queue_manager.get_all_locks():
user = lock_info.get("user")
lock_id = lock_info.get("lock_id")
acquired_at = lock_info.get("acquired_at")
if not user or not lock_id or not acquired_at:
continue
# Check if lock is stale (no recent heartbeat)
from datetime import datetime, timedelta
try:
acquired_time = datetime.fromisoformat(acquired_at)
age = (datetime.now() - acquired_time).total_seconds()
if age > max_age_seconds:
# Try to clean up the lock
released = self.user_queue_manager.release_lock(user, lock_id)
if released:
logger.info(
f"Cleaned up stale lock for user {user} "
f"(age {age:.0f}s)"
)
locks_cleaned += 1
except Exception as e:
logger.error(f"Error processing lock for user {user}: {e}")
return locks_cleaned
def release_task_lock(self, user: str, task_id: str) -> bool:
"""
Release lock for a specific task.
Args:
user: Username
task_id: Task ID
Returns:
True if lock was released
"""
# Try to find and remove the lock by task_id pattern
lock_info = self.user_queue_manager.get_lock_info(user)
if not lock_info:
logger.warning(f"No active lock found for user {user}")
return False
if task_id not in lock_info.get("lock_id", ""):
logger.warning(
f"Task {task_id} doesn't match active lock for user {user}"
)
return False
lock_id = lock_info.get("lock_id")
return self.user_queue_manager.release_lock(user, lock_id)
# CLI interface
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO)
cleanup = ConductorLockCleanup()
if len(sys.argv) < 2:
print("Usage:")
print(" conductor_lock_cleanup.py check_project <project>")
print(" conductor_lock_cleanup.py cleanup_stale [max_age_seconds]")
print(" conductor_lock_cleanup.py release <user> <task_id>")
sys.exit(0)
cmd = sys.argv[1]
if cmd == "check_project" and len(sys.argv) > 2:
project = sys.argv[2]
count = cleanup.check_and_cleanup_conductor_locks(project)
print(f"Released {count} locks for project {project}")
elif cmd == "cleanup_stale":
max_age = int(sys.argv[2]) if len(sys.argv) > 2 else 3600
count = cleanup.cleanup_stale_task_locks(max_age)
print(f"Cleaned up {count} stale locks (max age {max_age}s)")
elif cmd == "release" and len(sys.argv) > 3:
user = sys.argv[2]
task_id = sys.argv[3]
released = cleanup.release_task_lock(user, task_id)
if released:
print(f"Released lock for user {user}, task {task_id}")
else:
print(f"Failed to release lock for user {user}, task {task_id}")
else:
print(f"Unknown command: {cmd}")
sys.exit(1)