Files
luzia/lib/conductor_health_checker.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

383 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Conductor Task Health Checker
Validates the health of the conductor task tracking system:
- Active task liveness (heartbeat validation)
- Completed/failed task integrity
- Stalled task detection
- Process state validation
"""
import json
import time
import os
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
class ConductorHealthChecker:
"""Check health of conductor task tracking system."""
CONDUCTOR_ROOT = Path('/home/admin/conductor')
HEARTBEAT_TIMEOUT_SECS = 300 # Tasks stalled if heartbeat >5min old
PROGRESS_TIMEOUT_SECS = 3600 # No progress update for 1 hour = stalled
def __init__(self):
"""Initialize conductor health checker."""
self.conductor_root = self.CONDUCTOR_ROOT
self.active_dir = self.conductor_root / 'active'
self.completed_dir = self.conductor_root / 'completed'
self.failed_dir = self.conductor_root / 'failed'
def validate_active_tasks(self, verbose: bool = False) -> Dict:
"""
Validate all active tasks in ~/conductor/active/.
Returns:
Dict with:
- 'total_active': Number of active tasks
- 'healthy': Count of healthy tasks
- 'stalled': List of stalled tasks
- 'issues': List of specific problems
- 'health_score': 0-100
"""
if not self.active_dir.exists():
return {
'total_active': 0,
'healthy': 0,
'stalled': [],
'issues': [],
'health_score': 100,
'status': 'healthy'
}
issues = []
stalled_tasks = []
healthy_count = 0
now = time.time()
for task_dir in self.active_dir.iterdir():
if not task_dir.is_dir():
continue
task_id = task_dir.name
task_issues = []
# Check for required files
meta_file = task_dir / 'meta.json'
heartbeat_file = task_dir / 'heartbeat.json'
progress_file = task_dir / 'progress.md'
# 1. Validate metadata
if not meta_file.exists():
task_issues.append(f"Missing meta.json")
else:
try:
meta = json.loads(meta_file.read_text())
except:
task_issues.append(f"Invalid meta.json JSON")
# 2. Check heartbeat (liveness signal)
if heartbeat_file.exists():
try:
hb = json.loads(heartbeat_file.read_text())
hb_age = now - hb.get('ts', 0)
if hb_age > self.HEARTBEAT_TIMEOUT_SECS:
stalled_tasks.append({
'task_id': task_id,
'reason': 'heartbeat_timeout',
'heartbeat_age_secs': int(hb_age),
'last_step': hb.get('step', 'unknown')
})
task_issues.append(f"Heartbeat stale ({int(hb_age)}s)")
except Exception as e:
task_issues.append(f"Invalid heartbeat.json: {e}")
else:
task_issues.append("Missing heartbeat.json")
# 3. Check progress file exists
if not progress_file.exists():
task_issues.append("Missing progress.md")
else:
# Check for progress updates
mtime = progress_file.stat().st_mtime
progress_age = now - mtime
if progress_age > self.PROGRESS_TIMEOUT_SECS:
task_issues.append(f"No progress update ({int(progress_age)}s)")
# 4. Check for process (if pid file exists)
pid_file = task_dir / 'pid'
if pid_file.exists():
try:
pid = int(pid_file.read_text().strip())
# Check if process still exists
if not os.path.exists(f'/proc/{pid}'):
stalled_tasks.append({
'task_id': task_id,
'reason': 'process_not_found',
'pid': pid
})
task_issues.append(f"Process {pid} not found")
except:
task_issues.append("Invalid pid file")
# Add task issues to global issues list
if task_issues:
issues.append({
'task_id': task_id,
'issues': task_issues
})
else:
healthy_count += 1
total_active = len(list(self.active_dir.iterdir()))
# Calculate health score
if total_active == 0:
health_score = 100
else:
health_score = (healthy_count / total_active) * 100
return {
'total_active': total_active,
'healthy': healthy_count,
'stalled_count': len(stalled_tasks),
'stalled': stalled_tasks,
'issues': issues,
'health_score': round(health_score, 1),
'status': 'healthy' if health_score >= 90 else 'degraded' if health_score >= 70 else 'critical',
'timestamp': now
}
def validate_completed_tasks(self) -> Dict:
"""
Validate completed tasks in ~/conductor/completed/.
Returns:
Dict with validation results
"""
if not self.completed_dir.exists():
return {
'total_completed': 0,
'valid': 0,
'issues': [],
'health_score': 100
}
issues = []
valid_count = 0
now = time.time()
for task_dir in self.completed_dir.iterdir():
if not task_dir.is_dir():
continue
task_id = task_dir.name
task_issues = []
# Check for result file
result_file = task_dir / 'result.json'
if not result_file.exists():
task_issues.append("Missing result.json")
# Check for completion timestamp
meta_file = task_dir / 'meta.json'
if meta_file.exists():
try:
meta = json.loads(meta_file.read_text())
if 'completed_at' not in meta:
task_issues.append("Missing completed_at timestamp")
except:
task_issues.append("Invalid meta.json")
if task_issues:
issues.append({
'task_id': task_id,
'issues': task_issues
})
else:
valid_count += 1
total_completed = len(list(self.completed_dir.iterdir()))
health_score = (valid_count / max(total_completed, 1)) * 100
return {
'total_completed': total_completed,
'valid': valid_count,
'issues': issues,
'health_score': round(health_score, 1),
'timestamp': now
}
def validate_failed_tasks(self) -> Dict:
"""
Validate failed tasks in ~/conductor/failed/.
Returns:
Dict with validation results
"""
if not self.failed_dir.exists():
return {
'total_failed': 0,
'valid': 0,
'issues': [],
'health_score': 100
}
issues = []
valid_count = 0
for task_dir in self.failed_dir.iterdir():
if not task_dir.is_dir():
continue
task_id = task_dir.name
task_issues = []
# Check for error documentation
error_file = task_dir / 'error.txt'
if not error_file.exists():
task_issues.append("Missing error.txt documentation")
# Check for meta with failure reason
meta_file = task_dir / 'meta.json'
if meta_file.exists():
try:
meta = json.loads(meta_file.read_text())
if 'failure_reason' not in meta:
task_issues.append("Missing failure_reason")
except:
task_issues.append("Invalid meta.json")
if task_issues:
issues.append({
'task_id': task_id,
'issues': task_issues
})
else:
valid_count += 1
total_failed = len(list(self.failed_dir.iterdir()))
health_score = (valid_count / max(total_failed, 1)) * 100
return {
'total_failed': total_failed,
'documented': valid_count,
'issues': issues,
'health_score': round(health_score, 1)
}
def check_system_capacity(self) -> Dict:
"""
Check system capacity constraints.
Returns:
Dict with capacity metrics
"""
# Count total tasks across all directories
total_tasks = 0
for d in [self.active_dir, self.completed_dir, self.failed_dir]:
if d.exists():
total_tasks += len(list(d.iterdir()))
# Estimate conductor directory size
conductor_size = 0
if self.conductor_root.exists():
for root, dirs, files in os.walk(self.conductor_root):
for f in files:
conductor_size += os.path.getsize(os.path.join(root, f))
conductor_size_mb = conductor_size / (1024 * 1024)
# Get disk usage
import shutil
total, used, free = shutil.disk_usage(str(self.conductor_root))
disk_usage_pct = (used / total) * 100
return {
'total_tasks': total_tasks,
'conductor_size_mb': round(conductor_size_mb, 1),
'disk_usage_pct': round(disk_usage_pct, 1),
'disk_status': 'critical' if disk_usage_pct > 90 else 'warning' if disk_usage_pct > 80 else 'healthy'
}
def generate_conductor_health_score(self) -> Dict:
"""
Generate comprehensive conductor health score.
Returns:
Dict with overall health assessment
"""
active = self.validate_active_tasks()
completed = self.validate_completed_tasks()
failed = self.validate_failed_tasks()
capacity = self.check_system_capacity()
# Weighted score
overall_score = (
active['health_score'] * 0.40 +
completed['health_score'] * 0.25 +
failed['health_score'] * 0.25 +
(100 - capacity['disk_usage_pct']) * 0.10 # Disk health
)
stalled_count = len(active.get('stalled', []))
return {
'overall_score': round(overall_score, 1),
'status': 'healthy' if overall_score >= 80 else 'degraded' if overall_score >= 60 else 'critical',
'active_health': active['health_score'],
'stalled_tasks': stalled_count,
'disk_usage_pct': capacity['disk_usage_pct'],
'total_tasks': capacity['total_tasks'],
'recommendations': self._generate_conductor_recommendations(
stalled_count, capacity['disk_usage_pct']
),
'timestamp': time.time()
}
def _generate_conductor_recommendations(self, stalled_count: int, disk_usage_pct: float) -> List[str]:
"""Generate recommendations based on conductor health."""
recommendations = []
if stalled_count > 0:
recommendations.append(f"[URGENT] Fix {stalled_count} stalled task(s): luzia health conductor --fix")
if disk_usage_pct > 85:
recommendations.append(f"[WARNING] Disk usage at {disk_usage_pct}%: Archive old tasks to free space")
if disk_usage_pct > 95:
recommendations.append("[CRITICAL] Disk usage critical: Immediate cleanup required")
if not recommendations:
recommendations.append("Conductor system healthy - no immediate action needed")
return recommendations
if __name__ == '__main__':
checker = ConductorHealthChecker()
print("=" * 70)
print("CONDUCTOR ACTIVE TASKS")
print("=" * 70)
active = checker.validate_active_tasks()
print(f"Total active: {active['total_active']}")
print(f"Healthy: {active['healthy']}")
print(f"Stalled: {len(active['stalled'])}")
print(f"Health score: {active['health_score']}/100")
print("\n" + "=" * 70)
print("CONDUCTOR OVERALL HEALTH")
print("=" * 70)
health = checker.generate_conductor_health_score()
print(f"Overall score: {health['overall_score']}/100 ({health['status'].upper()})")
print(f"Stalled tasks: {health['stalled_tasks']}")
print(f"Disk usage: {health['disk_usage_pct']}%")
print("\nRecommendations:")
for rec in health['recommendations']:
print(f" - {rec}")