#!/usr/bin/env python3 """ Conductor Task Health Checker Validates the health of the conductor task tracking system: - Active task liveness (heartbeat validation) - Completed/failed task integrity - Stalled task detection - Process state validation """ import json import time import os from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Tuple class ConductorHealthChecker: """Check health of conductor task tracking system.""" CONDUCTOR_ROOT = Path('/home/admin/conductor') HEARTBEAT_TIMEOUT_SECS = 300 # Tasks stalled if heartbeat >5min old PROGRESS_TIMEOUT_SECS = 3600 # No progress update for 1 hour = stalled def __init__(self): """Initialize conductor health checker.""" self.conductor_root = self.CONDUCTOR_ROOT self.active_dir = self.conductor_root / 'active' self.completed_dir = self.conductor_root / 'completed' self.failed_dir = self.conductor_root / 'failed' def validate_active_tasks(self, verbose: bool = False) -> Dict: """ Validate all active tasks in ~/conductor/active/. Returns: Dict with: - 'total_active': Number of active tasks - 'healthy': Count of healthy tasks - 'stalled': List of stalled tasks - 'issues': List of specific problems - 'health_score': 0-100 """ if not self.active_dir.exists(): return { 'total_active': 0, 'healthy': 0, 'stalled': [], 'issues': [], 'health_score': 100, 'status': 'healthy' } issues = [] stalled_tasks = [] healthy_count = 0 now = time.time() for task_dir in self.active_dir.iterdir(): if not task_dir.is_dir(): continue task_id = task_dir.name task_issues = [] # Check for required files meta_file = task_dir / 'meta.json' heartbeat_file = task_dir / 'heartbeat.json' progress_file = task_dir / 'progress.md' # 1. Validate metadata if not meta_file.exists(): task_issues.append(f"Missing meta.json") else: try: meta = json.loads(meta_file.read_text()) except: task_issues.append(f"Invalid meta.json JSON") # 2. Check heartbeat (liveness signal) if heartbeat_file.exists(): try: hb = json.loads(heartbeat_file.read_text()) hb_age = now - hb.get('ts', 0) if hb_age > self.HEARTBEAT_TIMEOUT_SECS: stalled_tasks.append({ 'task_id': task_id, 'reason': 'heartbeat_timeout', 'heartbeat_age_secs': int(hb_age), 'last_step': hb.get('step', 'unknown') }) task_issues.append(f"Heartbeat stale ({int(hb_age)}s)") except Exception as e: task_issues.append(f"Invalid heartbeat.json: {e}") else: task_issues.append("Missing heartbeat.json") # 3. Check progress file exists if not progress_file.exists(): task_issues.append("Missing progress.md") else: # Check for progress updates mtime = progress_file.stat().st_mtime progress_age = now - mtime if progress_age > self.PROGRESS_TIMEOUT_SECS: task_issues.append(f"No progress update ({int(progress_age)}s)") # 4. Check for process (if pid file exists) pid_file = task_dir / 'pid' if pid_file.exists(): try: pid = int(pid_file.read_text().strip()) # Check if process still exists if not os.path.exists(f'/proc/{pid}'): stalled_tasks.append({ 'task_id': task_id, 'reason': 'process_not_found', 'pid': pid }) task_issues.append(f"Process {pid} not found") except: task_issues.append("Invalid pid file") # Add task issues to global issues list if task_issues: issues.append({ 'task_id': task_id, 'issues': task_issues }) else: healthy_count += 1 total_active = len(list(self.active_dir.iterdir())) # Calculate health score if total_active == 0: health_score = 100 else: health_score = (healthy_count / total_active) * 100 return { 'total_active': total_active, 'healthy': healthy_count, 'stalled_count': len(stalled_tasks), 'stalled': stalled_tasks, 'issues': issues, 'health_score': round(health_score, 1), 'status': 'healthy' if health_score >= 90 else 'degraded' if health_score >= 70 else 'critical', 'timestamp': now } def validate_completed_tasks(self) -> Dict: """ Validate completed tasks in ~/conductor/completed/. Returns: Dict with validation results """ if not self.completed_dir.exists(): return { 'total_completed': 0, 'valid': 0, 'issues': [], 'health_score': 100 } issues = [] valid_count = 0 now = time.time() for task_dir in self.completed_dir.iterdir(): if not task_dir.is_dir(): continue task_id = task_dir.name task_issues = [] # Check for result file result_file = task_dir / 'result.json' if not result_file.exists(): task_issues.append("Missing result.json") # Check for completion timestamp meta_file = task_dir / 'meta.json' if meta_file.exists(): try: meta = json.loads(meta_file.read_text()) if 'completed_at' not in meta: task_issues.append("Missing completed_at timestamp") except: task_issues.append("Invalid meta.json") if task_issues: issues.append({ 'task_id': task_id, 'issues': task_issues }) else: valid_count += 1 total_completed = len(list(self.completed_dir.iterdir())) health_score = (valid_count / max(total_completed, 1)) * 100 return { 'total_completed': total_completed, 'valid': valid_count, 'issues': issues, 'health_score': round(health_score, 1), 'timestamp': now } def validate_failed_tasks(self) -> Dict: """ Validate failed tasks in ~/conductor/failed/. Returns: Dict with validation results """ if not self.failed_dir.exists(): return { 'total_failed': 0, 'valid': 0, 'issues': [], 'health_score': 100 } issues = [] valid_count = 0 for task_dir in self.failed_dir.iterdir(): if not task_dir.is_dir(): continue task_id = task_dir.name task_issues = [] # Check for error documentation error_file = task_dir / 'error.txt' if not error_file.exists(): task_issues.append("Missing error.txt documentation") # Check for meta with failure reason meta_file = task_dir / 'meta.json' if meta_file.exists(): try: meta = json.loads(meta_file.read_text()) if 'failure_reason' not in meta: task_issues.append("Missing failure_reason") except: task_issues.append("Invalid meta.json") if task_issues: issues.append({ 'task_id': task_id, 'issues': task_issues }) else: valid_count += 1 total_failed = len(list(self.failed_dir.iterdir())) health_score = (valid_count / max(total_failed, 1)) * 100 return { 'total_failed': total_failed, 'documented': valid_count, 'issues': issues, 'health_score': round(health_score, 1) } def check_system_capacity(self) -> Dict: """ Check system capacity constraints. Returns: Dict with capacity metrics """ # Count total tasks across all directories total_tasks = 0 for d in [self.active_dir, self.completed_dir, self.failed_dir]: if d.exists(): total_tasks += len(list(d.iterdir())) # Estimate conductor directory size conductor_size = 0 if self.conductor_root.exists(): for root, dirs, files in os.walk(self.conductor_root): for f in files: conductor_size += os.path.getsize(os.path.join(root, f)) conductor_size_mb = conductor_size / (1024 * 1024) # Get disk usage import shutil total, used, free = shutil.disk_usage(str(self.conductor_root)) disk_usage_pct = (used / total) * 100 return { 'total_tasks': total_tasks, 'conductor_size_mb': round(conductor_size_mb, 1), 'disk_usage_pct': round(disk_usage_pct, 1), 'disk_status': 'critical' if disk_usage_pct > 90 else 'warning' if disk_usage_pct > 80 else 'healthy' } def generate_conductor_health_score(self) -> Dict: """ Generate comprehensive conductor health score. Returns: Dict with overall health assessment """ active = self.validate_active_tasks() completed = self.validate_completed_tasks() failed = self.validate_failed_tasks() capacity = self.check_system_capacity() # Weighted score overall_score = ( active['health_score'] * 0.40 + completed['health_score'] * 0.25 + failed['health_score'] * 0.25 + (100 - capacity['disk_usage_pct']) * 0.10 # Disk health ) stalled_count = len(active.get('stalled', [])) return { 'overall_score': round(overall_score, 1), 'status': 'healthy' if overall_score >= 80 else 'degraded' if overall_score >= 60 else 'critical', 'active_health': active['health_score'], 'stalled_tasks': stalled_count, 'disk_usage_pct': capacity['disk_usage_pct'], 'total_tasks': capacity['total_tasks'], 'recommendations': self._generate_conductor_recommendations( stalled_count, capacity['disk_usage_pct'] ), 'timestamp': time.time() } def _generate_conductor_recommendations(self, stalled_count: int, disk_usage_pct: float) -> List[str]: """Generate recommendations based on conductor health.""" recommendations = [] if stalled_count > 0: recommendations.append(f"[URGENT] Fix {stalled_count} stalled task(s): luzia health conductor --fix") if disk_usage_pct > 85: recommendations.append(f"[WARNING] Disk usage at {disk_usage_pct}%: Archive old tasks to free space") if disk_usage_pct > 95: recommendations.append("[CRITICAL] Disk usage critical: Immediate cleanup required") if not recommendations: recommendations.append("Conductor system healthy - no immediate action needed") return recommendations if __name__ == '__main__': checker = ConductorHealthChecker() print("=" * 70) print("CONDUCTOR ACTIVE TASKS") print("=" * 70) active = checker.validate_active_tasks() print(f"Total active: {active['total_active']}") print(f"Healthy: {active['healthy']}") print(f"Stalled: {len(active['stalled'])}") print(f"Health score: {active['health_score']}/100") print("\n" + "=" * 70) print("CONDUCTOR OVERALL HEALTH") print("=" * 70) health = checker.generate_conductor_health_score() print(f"Overall score: {health['overall_score']}/100 ({health['status'].upper()})") print(f"Stalled tasks: {health['stalled_tasks']}") print(f"Disk usage: {health['disk_usage_pct']}%") print("\nRecommendations:") for rec in health['recommendations']: print(f" - {rec}")