Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
539 lines
18 KiB
Python
539 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Task Watchdog - Monitor running tasks for stuck/failed states
|
|
|
|
Key responsibilities:
|
|
1. Monitor heartbeats for running tasks
|
|
2. Detect and clean up stuck/orphaned tasks
|
|
3. Release stale locks automatically
|
|
4. Track task state transitions
|
|
5. Support cockpit "awaiting_human" state
|
|
|
|
Usage:
|
|
from task_watchdog import TaskWatchdog
|
|
|
|
watchdog = TaskWatchdog()
|
|
watchdog.run_check() # Single check
|
|
watchdog.run_loop() # Continuous monitoring
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import signal
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple
|
|
import fcntl
|
|
|
|
|
|
class TaskWatchdog:
|
|
"""Monitor running tasks for stuck/failed states."""
|
|
|
|
# Configuration
|
|
CONDUCTOR_BASE = Path.home() / "conductor"
|
|
ACTIVE_DIR = CONDUCTOR_BASE / "active"
|
|
COMPLETED_DIR = CONDUCTOR_BASE / "completed"
|
|
FAILED_DIR = CONDUCTOR_BASE / "failed"
|
|
|
|
QUEUE_BASE = Path("/var/lib/luzia/queue")
|
|
LOCKS_BASE = Path("/var/lib/luzia/locks")
|
|
CAPACITY_FILE = QUEUE_BASE / "capacity.json"
|
|
|
|
JOBS_DIR = Path("/var/log/luz-orchestrator/jobs")
|
|
|
|
# Cockpit state directory
|
|
COCKPIT_STATE_DIR = Path("/var/lib/luz-orchestrator/cockpits")
|
|
|
|
# Timeouts
|
|
HEARTBEAT_TIMEOUT_SECONDS = 300 # 5 minutes without heartbeat = stuck
|
|
LOCK_TIMEOUT_SECONDS = 3600 # 1 hour lock timeout
|
|
AWAITING_HUMAN_TIMEOUT = 86400 # 24 hours for human response
|
|
|
|
# State transitions
|
|
STATES = {
|
|
'pending': ['claimed', 'cancelled'],
|
|
'claimed': ['dispatched', 'failed', 'cancelled'],
|
|
'dispatched': ['running', 'failed'],
|
|
'running': ['completed', 'failed', 'awaiting_human', 'stuck'],
|
|
'awaiting_human': ['running', 'failed', 'cancelled'],
|
|
'stuck': ['failed', 'recovered'],
|
|
'completed': [],
|
|
'failed': ['retrying'],
|
|
'retrying': ['running', 'failed'],
|
|
'cancelled': []
|
|
}
|
|
|
|
def __init__(self):
|
|
"""Initialize watchdog."""
|
|
self._ensure_dirs()
|
|
|
|
def _ensure_dirs(self):
|
|
"""Ensure required directories exist."""
|
|
for d in [self.ACTIVE_DIR, self.COMPLETED_DIR, self.FAILED_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
self.LOCKS_BASE.mkdir(parents=True, exist_ok=True)
|
|
self.COCKPIT_STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def check_heartbeats(self) -> List[Dict]:
|
|
"""
|
|
Check all active tasks for stale heartbeats.
|
|
Returns list of stuck tasks.
|
|
"""
|
|
stuck_tasks = []
|
|
now = time.time()
|
|
|
|
if not self.ACTIVE_DIR.exists():
|
|
return stuck_tasks
|
|
|
|
for task_dir in self.ACTIVE_DIR.iterdir():
|
|
if not task_dir.is_dir():
|
|
continue
|
|
|
|
task_id = task_dir.name
|
|
heartbeat_file = task_dir / "heartbeat.json"
|
|
meta_file = task_dir / "meta.json"
|
|
|
|
# Load task metadata
|
|
meta = {}
|
|
if meta_file.exists():
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
except:
|
|
pass
|
|
|
|
# Check if task is in awaiting_human state (from cockpit)
|
|
if meta.get('status') == 'awaiting_human':
|
|
# Check if awaiting too long
|
|
awaiting_since = meta.get('awaiting_since', now)
|
|
if now - awaiting_since > self.AWAITING_HUMAN_TIMEOUT:
|
|
stuck_tasks.append({
|
|
'task_id': task_id,
|
|
'reason': 'awaiting_human_timeout',
|
|
'last_heartbeat': None,
|
|
'meta': meta
|
|
})
|
|
continue
|
|
|
|
# Check heartbeat for running tasks
|
|
if not heartbeat_file.exists():
|
|
# No heartbeat file = check task age
|
|
created = meta.get('created_at')
|
|
if created:
|
|
try:
|
|
created_ts = datetime.fromisoformat(created).timestamp()
|
|
if now - created_ts > self.HEARTBEAT_TIMEOUT_SECONDS:
|
|
stuck_tasks.append({
|
|
'task_id': task_id,
|
|
'reason': 'no_heartbeat',
|
|
'last_heartbeat': None,
|
|
'meta': meta
|
|
})
|
|
except:
|
|
pass
|
|
continue
|
|
|
|
try:
|
|
heartbeat = json.loads(heartbeat_file.read_text())
|
|
last_ts = heartbeat.get('ts', 0)
|
|
|
|
if now - last_ts > self.HEARTBEAT_TIMEOUT_SECONDS:
|
|
stuck_tasks.append({
|
|
'task_id': task_id,
|
|
'reason': 'stale_heartbeat',
|
|
'last_heartbeat': last_ts,
|
|
'last_step': heartbeat.get('step'),
|
|
'meta': meta
|
|
})
|
|
except Exception as e:
|
|
stuck_tasks.append({
|
|
'task_id': task_id,
|
|
'reason': 'heartbeat_read_error',
|
|
'error': str(e),
|
|
'meta': meta
|
|
})
|
|
|
|
return stuck_tasks
|
|
|
|
def cleanup_orphaned_tasks(self) -> List[str]:
|
|
"""
|
|
Clean up tasks where agent died.
|
|
Moves stuck tasks to failed directory.
|
|
Returns list of cleaned task IDs.
|
|
"""
|
|
cleaned = []
|
|
stuck_tasks = self.check_heartbeats()
|
|
|
|
for task in stuck_tasks:
|
|
task_id = task['task_id']
|
|
task_dir = self.ACTIVE_DIR / task_id
|
|
|
|
if not task_dir.exists():
|
|
continue
|
|
|
|
# Update meta with failure info
|
|
meta_file = task_dir / "meta.json"
|
|
meta = {}
|
|
if meta_file.exists():
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
except:
|
|
pass
|
|
|
|
meta['status'] = 'failed'
|
|
meta['failure_reason'] = task.get('reason', 'unknown')
|
|
meta['failed_at'] = datetime.now().isoformat()
|
|
meta['last_heartbeat'] = task.get('last_heartbeat')
|
|
|
|
# Write updated meta
|
|
try:
|
|
with open(meta_file, 'w') as f:
|
|
json.dump(meta, f, indent=2)
|
|
except:
|
|
pass
|
|
|
|
# Move to failed directory
|
|
failed_dest = self.FAILED_DIR / task_id
|
|
try:
|
|
if failed_dest.exists():
|
|
# Remove old failed version
|
|
import shutil
|
|
shutil.rmtree(failed_dest)
|
|
task_dir.rename(failed_dest)
|
|
cleaned.append(task_id)
|
|
except Exception as e:
|
|
print(f"Error moving task {task_id} to failed: {e}")
|
|
|
|
return cleaned
|
|
|
|
def release_stale_locks(self) -> List[str]:
|
|
"""
|
|
Release locks for dead tasks.
|
|
Returns list of released lock IDs.
|
|
"""
|
|
released = []
|
|
now = time.time()
|
|
|
|
if not self.LOCKS_BASE.exists():
|
|
return released
|
|
|
|
for lock_file in self.LOCKS_BASE.glob("user_*.lock"):
|
|
try:
|
|
# Read lock metadata
|
|
meta_file = lock_file.with_suffix('.json')
|
|
if not meta_file.exists():
|
|
# Old lock without metadata - remove if lock file is old
|
|
if now - lock_file.stat().st_mtime > self.LOCK_TIMEOUT_SECONDS:
|
|
lock_file.unlink()
|
|
released.append(lock_file.name)
|
|
continue
|
|
|
|
meta = json.loads(meta_file.read_text())
|
|
expires_at = meta.get('expires_at', 0)
|
|
|
|
if now > expires_at:
|
|
# Lock expired - remove both files
|
|
lock_file.unlink()
|
|
meta_file.unlink()
|
|
released.append(meta.get('lock_id', lock_file.name))
|
|
except Exception as e:
|
|
print(f"Error checking lock {lock_file}: {e}")
|
|
|
|
return released
|
|
|
|
def update_capacity(self, released_slots: int = 0) -> bool:
|
|
"""
|
|
Update capacity file to reflect released resources.
|
|
Uses file locking for safety.
|
|
"""
|
|
if not self.CAPACITY_FILE.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(self.CAPACITY_FILE, 'r+') as f:
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
try:
|
|
capacity = json.load(f)
|
|
|
|
# Update available slots
|
|
current = capacity.get('slots', {}).get('available', 0)
|
|
max_slots = capacity.get('slots', {}).get('max', 4)
|
|
capacity['slots']['available'] = min(current + released_slots, max_slots)
|
|
|
|
# Update timestamp
|
|
capacity['last_updated'] = datetime.now().isoformat()
|
|
|
|
# Write back
|
|
f.seek(0)
|
|
f.truncate()
|
|
json.dump(capacity, f, indent=2)
|
|
finally:
|
|
fcntl.flock(f, fcntl.LOCK_UN)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error updating capacity: {e}")
|
|
return False
|
|
|
|
def get_project_queue_status(self) -> Dict[str, Dict]:
|
|
"""
|
|
Get queue status per project.
|
|
Returns dict of project -> {pending, running, awaiting_human}
|
|
"""
|
|
status = {}
|
|
|
|
# Count pending tasks
|
|
pending_dirs = [
|
|
self.QUEUE_BASE / "pending" / "high",
|
|
self.QUEUE_BASE / "pending" / "normal"
|
|
]
|
|
|
|
for pending_dir in pending_dirs:
|
|
if not pending_dir.exists():
|
|
continue
|
|
for task_file in pending_dir.glob("*.json"):
|
|
try:
|
|
task = json.loads(task_file.read_text())
|
|
project = task.get('project', 'unknown')
|
|
if project not in status:
|
|
status[project] = {'pending': 0, 'running': 0, 'awaiting_human': 0}
|
|
status[project]['pending'] += 1
|
|
except:
|
|
pass
|
|
|
|
# Count active tasks
|
|
if self.ACTIVE_DIR.exists():
|
|
for task_dir in self.ACTIVE_DIR.iterdir():
|
|
if not task_dir.is_dir():
|
|
continue
|
|
meta_file = task_dir / "meta.json"
|
|
if meta_file.exists():
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
project = meta.get('project', 'unknown')
|
|
if project not in status:
|
|
status[project] = {'pending': 0, 'running': 0, 'awaiting_human': 0}
|
|
|
|
if meta.get('status') == 'awaiting_human':
|
|
status[project]['awaiting_human'] += 1
|
|
else:
|
|
status[project]['running'] += 1
|
|
except:
|
|
pass
|
|
|
|
# Check cockpit states
|
|
if self.COCKPIT_STATE_DIR.exists():
|
|
for state_file in self.COCKPIT_STATE_DIR.glob("*.json"):
|
|
try:
|
|
state = json.loads(state_file.read_text())
|
|
project = state.get('project')
|
|
if project and state.get('awaiting_response'):
|
|
if project not in status:
|
|
status[project] = {'pending': 0, 'running': 0, 'awaiting_human': 0}
|
|
status[project]['awaiting_human'] += 1
|
|
except:
|
|
pass
|
|
|
|
return status
|
|
|
|
def is_project_blocked(self, project: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Check if a project queue is blocked.
|
|
Returns (is_blocked, reason)
|
|
"""
|
|
# Check cockpit state
|
|
cockpit_state = self.COCKPIT_STATE_DIR / f"{project}.json"
|
|
if cockpit_state.exists():
|
|
try:
|
|
state = json.loads(cockpit_state.read_text())
|
|
if state.get('awaiting_response'):
|
|
return True, "awaiting_human_cockpit"
|
|
except:
|
|
pass
|
|
|
|
# Check active tasks for awaiting_human
|
|
if self.ACTIVE_DIR.exists():
|
|
for task_dir in self.ACTIVE_DIR.iterdir():
|
|
if not task_dir.is_dir():
|
|
continue
|
|
meta_file = task_dir / "meta.json"
|
|
if meta_file.exists():
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
if meta.get('project') == project and meta.get('status') == 'awaiting_human':
|
|
return True, f"awaiting_human_task:{task_dir.name}"
|
|
except:
|
|
pass
|
|
|
|
return False, None
|
|
|
|
def set_task_awaiting_human(self, task_id: str, question: str = None) -> bool:
|
|
"""
|
|
Mark a task as awaiting human response.
|
|
This blocks the project queue.
|
|
"""
|
|
task_dir = self.ACTIVE_DIR / task_id
|
|
meta_file = task_dir / "meta.json"
|
|
|
|
if not meta_file.exists():
|
|
return False
|
|
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
meta['status'] = 'awaiting_human'
|
|
meta['awaiting_since'] = time.time()
|
|
if question:
|
|
meta['awaiting_question'] = question
|
|
|
|
with open(meta_file, 'w') as f:
|
|
json.dump(meta, f, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error setting task awaiting: {e}")
|
|
return False
|
|
|
|
def resume_task(self, task_id: str, answer: str = None) -> bool:
|
|
"""
|
|
Resume a task that was awaiting human response.
|
|
"""
|
|
task_dir = self.ACTIVE_DIR / task_id
|
|
meta_file = task_dir / "meta.json"
|
|
|
|
if not meta_file.exists():
|
|
return False
|
|
|
|
try:
|
|
meta = json.loads(meta_file.read_text())
|
|
if meta.get('status') != 'awaiting_human':
|
|
return False
|
|
|
|
meta['status'] = 'running'
|
|
meta['resumed_at'] = datetime.now().isoformat()
|
|
if answer:
|
|
meta['human_response'] = answer
|
|
|
|
# Update heartbeat
|
|
heartbeat_file = task_dir / "heartbeat.json"
|
|
with open(heartbeat_file, 'w') as f:
|
|
json.dump({'ts': time.time(), 'step': 'Resumed after human response'}, f)
|
|
|
|
with open(meta_file, 'w') as f:
|
|
json.dump(meta, f, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error resuming task: {e}")
|
|
return False
|
|
|
|
def run_check(self) -> Dict:
|
|
"""
|
|
Run a single watchdog check.
|
|
Returns summary of actions taken.
|
|
"""
|
|
summary = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'stuck_tasks': [],
|
|
'cleaned_tasks': [],
|
|
'released_locks': [],
|
|
'project_status': {}
|
|
}
|
|
|
|
# Check for stuck tasks
|
|
stuck = self.check_heartbeats()
|
|
summary['stuck_tasks'] = [t['task_id'] for t in stuck]
|
|
|
|
# Clean up orphaned tasks
|
|
cleaned = self.cleanup_orphaned_tasks()
|
|
summary['cleaned_tasks'] = cleaned
|
|
|
|
# Release stale locks
|
|
released = self.release_stale_locks()
|
|
summary['released_locks'] = released
|
|
|
|
# Update capacity if we cleaned anything
|
|
if cleaned or released:
|
|
self.update_capacity(released_slots=len(cleaned))
|
|
|
|
# Get project queue status
|
|
summary['project_status'] = self.get_project_queue_status()
|
|
|
|
return summary
|
|
|
|
def run_loop(self, interval_seconds: int = 60):
|
|
"""
|
|
Run continuous watchdog monitoring.
|
|
"""
|
|
print(f"Starting Task Watchdog (interval: {interval_seconds}s)")
|
|
|
|
def handle_signal(signum, frame):
|
|
print("\nWatchdog shutting down...")
|
|
exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
|
|
while True:
|
|
try:
|
|
summary = self.run_check()
|
|
|
|
# Log if any actions were taken
|
|
if summary['stuck_tasks'] or summary['cleaned_tasks'] or summary['released_locks']:
|
|
print(f"[{datetime.now().isoformat()}] Watchdog check:")
|
|
if summary['stuck_tasks']:
|
|
print(f" Stuck tasks: {summary['stuck_tasks']}")
|
|
if summary['cleaned_tasks']:
|
|
print(f" Cleaned: {summary['cleaned_tasks']}")
|
|
if summary['released_locks']:
|
|
print(f" Released locks: {summary['released_locks']}")
|
|
|
|
time.sleep(interval_seconds)
|
|
|
|
except Exception as e:
|
|
print(f"Watchdog error: {e}")
|
|
time.sleep(interval_seconds)
|
|
|
|
|
|
def main():
|
|
"""CLI entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Task Watchdog')
|
|
parser.add_argument('command', nargs='?', default='check',
|
|
choices=['check', 'daemon', 'status', 'stuck', 'clean'],
|
|
help='Command to run')
|
|
parser.add_argument('--interval', type=int, default=60,
|
|
help='Check interval for daemon mode (seconds)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
watchdog = TaskWatchdog()
|
|
|
|
if args.command == 'check':
|
|
summary = watchdog.run_check()
|
|
print(json.dumps(summary, indent=2))
|
|
|
|
elif args.command == 'daemon':
|
|
watchdog.run_loop(interval_seconds=args.interval)
|
|
|
|
elif args.command == 'status':
|
|
status = watchdog.get_project_queue_status()
|
|
print("Project Queue Status:")
|
|
print(json.dumps(status, indent=2))
|
|
|
|
elif args.command == 'stuck':
|
|
stuck = watchdog.check_heartbeats()
|
|
if stuck:
|
|
print(f"Found {len(stuck)} stuck tasks:")
|
|
for t in stuck:
|
|
print(f" - {t['task_id']}: {t['reason']}")
|
|
else:
|
|
print("No stuck tasks found")
|
|
|
|
elif args.command == 'clean':
|
|
cleaned = watchdog.cleanup_orphaned_tasks()
|
|
released = watchdog.release_stale_locks()
|
|
print(f"Cleaned {len(cleaned)} orphaned tasks")
|
|
print(f"Released {len(released)} stale locks")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|