#!/usr/bin/env python3 """ Routine Validator Validates maintenance routines and scheduled tasks: - Cron job configuration - Watchdog monitoring status - Log rotation schedule - Backup routine health """ import json import subprocess from pathlib import Path from typing import List, Dict from datetime import datetime class RoutineValidator: """Validate orchestrator maintenance routines.""" ORCHESTRATOR_ROOT = Path('/opt/server-agents/orchestrator') CRON_DIR = Path('/etc/cron.d') SYSTEMD_DIR = Path('/etc/systemd/system') def __init__(self): """Initialize routine validator.""" pass def validate_cron_jobs(self) -> Dict: """ Validate cron job configuration. Returns: Dict with cron validation results """ results = { 'cron_jobs': [], 'status': 'unknown', 'issues': [] } # Check for orchestrator cron jobs orchestrator_cron = self.CRON_DIR / 'luzia-orchestrator' if not orchestrator_cron.exists(): results['issues'].append("Orchestrator cron file not found") results['status'] = 'missing' return results try: content = orchestrator_cron.read_text() lines = content.strip().split('\n') # Parse cron entries expected_jobs = { 'health_check': r'luzia health.*--full', 'cleanup': r'luzia cleanup', 'log_rotation': r'logrotate.*luzia', 'backup': r'backup.*create' } for job_name, pattern in expected_jobs.items(): import re found = any(re.search(pattern, line) for line in lines if not line.startswith('#')) results['cron_jobs'].append({ 'name': job_name, 'configured': found }) # Check cron syntax try: result = subprocess.run( ['crontab', '-l'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: results['status'] = 'valid' else: results['issues'].append("Cron syntax invalid") results['status'] = 'invalid' except subprocess.TimeoutExpired: results['issues'].append("Cron check timeout") except Exception as e: results['issues'].append(f"Cannot read cron file: {e}") results['status'] = 'error' return results def validate_systemd_services(self) -> Dict: """ Validate systemd service configuration for orchestrator. Returns: Dict with systemd validation results """ results = { 'services': [], 'status': 'unknown', 'issues': [] } expected_services = [ 'luzia-orchestrator', 'luzia-conductor', 'luzia-healthcheck' ] for service_name in expected_services: service_file = self.SYSTEMD_DIR / f'{service_name}.service' service_status = { 'service': service_name, 'file_exists': service_file.exists(), 'enabled': False, 'running': False } if service_file.exists(): try: # Check if enabled result = subprocess.run( ['systemctl', 'is-enabled', service_name], capture_output=True, timeout=5 ) service_status['enabled'] = result.returncode == 0 # Check if running result = subprocess.run( ['systemctl', 'is-active', service_name], capture_output=True, timeout=5 ) service_status['running'] = result.returncode == 0 except subprocess.TimeoutExpired: results['issues'].append(f"Timeout checking {service_name}") except Exception as e: results['issues'].append(f"Cannot check {service_name}: {e}") results['services'].append(service_status) # Overall status running_count = sum(1 for s in results['services'] if s['running']) enabled_count = sum(1 for s in results['services'] if s['enabled']) if running_count == len(expected_services): results['status'] = 'healthy' elif running_count > 0: results['status'] = 'degraded' else: results['status'] = 'unhealthy' return results def validate_watchdog_monitoring(self) -> Dict: """ Validate watchdog monitoring configuration. Returns: Dict with watchdog status """ results = { 'watchdog_running': False, 'monitoring_targets': [], 'issues': [], 'last_check': None } # Check if watchdog script exists watchdog_script = self.ORCHESTRATOR_ROOT / 'lib' / 'watchdog.py' if not watchdog_script.exists(): results['issues'].append("Watchdog script not found") return results # Check if watchdog process is running try: result = subprocess.run( ['pgrep', '-f', 'watchdog.py'], capture_output=True, timeout=5 ) results['watchdog_running'] = result.returncode == 0 except Exception as e: results['issues'].append(f"Cannot check watchdog status: {e}") # Check monitoring targets config_file = self.ORCHESTRATOR_ROOT / 'config.json' if config_file.exists(): try: config = json.loads(config_file.read_text()) monitoring = config.get('monitoring', {}) results['monitoring_targets'] = list(monitoring.keys()) except Exception as e: results['issues'].append(f"Cannot read config: {e}") # Check for recent watchdog logs log_dir = Path('/var/log/luz-orchestrator') if log_dir.exists(): watchdog_logs = list(log_dir.glob('*watchdog*')) if watchdog_logs: latest = max(watchdog_logs, key=lambda p: p.stat().st_mtime) results['last_check'] = datetime.fromtimestamp( latest.stat().st_mtime ).isoformat() return results def validate_log_rotation(self) -> Dict: """ Validate log rotation configuration. Returns: Dict with log rotation status """ results = { 'logrotate_configured': False, 'log_dirs': [], 'rotation_schedule': 'unknown', 'issues': [] } # Check for logrotate config logrotate_config = Path('/etc/logrotate.d/luzia') if logrotate_config.exists(): results['logrotate_configured'] = True try: content = logrotate_config.read_text() # Parse rotation schedule if 'daily' in content: results['rotation_schedule'] = 'daily' elif 'weekly' in content: results['rotation_schedule'] = 'weekly' elif 'monthly' in content: results['rotation_schedule'] = 'monthly' except Exception as e: results['issues'].append(f"Cannot read logrotate config: {e}") else: results['issues'].append("Logrotate configuration not found") # Check log directories log_dirs = [ '/var/log/luz-orchestrator', '/var/log/luzia', '/home/admin/conductor/logs' ] for log_dir in log_dirs: if Path(log_dir).exists(): results['log_dirs'].append(log_dir) return results def validate_backup_routine(self) -> Dict: """ Validate backup routine configuration. Returns: Dict with backup routine status """ results = { 'backup_enabled': False, 'backup_target': None, 'last_backup': None, 'backup_frequency': 'unknown', 'issues': [] } # Check for backup script backup_script = self.ORCHESTRATOR_ROOT / 'lib' / 'kg_maintainer.py' # Uses backup internally if not backup_script.exists(): results['issues'].append("Backup script not found") return results # Check backup configuration config_file = self.ORCHESTRATOR_ROOT / 'config.json' if config_file.exists(): try: config = json.loads(config_file.read_text()) backup_config = config.get('backup', {}) results['backup_enabled'] = backup_config.get('enabled', False) results['backup_target'] = backup_config.get('target') results['backup_frequency'] = backup_config.get('frequency', 'unknown') if backup_config.get('enabled'): results['backup_enabled'] = True except Exception as e: results['issues'].append(f"Cannot read backup config: {e}") # Check for recent backups backup_dir = Path('/var/backups/luz-orchestrator') if backup_dir.exists(): backups = list(backup_dir.glob('backup_*')) if backups: latest = max(backups, key=lambda p: p.stat().st_mtime) results['last_backup'] = datetime.fromtimestamp( latest.stat().st_mtime ).isoformat() else: results['issues'].append("Backup directory not found") return results def generate_routine_validation_report(self) -> Dict: """ Generate comprehensive maintenance routine validation report. Returns: Dict with all routine validations and health score """ cron = self.validate_cron_jobs() systemd = self.validate_systemd_services() watchdog = self.validate_watchdog_monitoring() logrotate = self.validate_log_rotation() backup = self.validate_backup_routine() # Calculate health score health_score = 100 all_issues = [] if cron['status'] != 'valid': health_score -= 20 all_issues.extend(cron['issues']) if systemd['status'] != 'healthy': health_score -= 25 all_issues.extend([f"Systemd: {s['service']} is {s.get('status', 'unknown')}" for s in systemd['services'] if not s['running']]) if not watchdog['watchdog_running']: health_score -= 15 all_issues.extend(watchdog['issues']) if not logrotate['logrotate_configured']: health_score -= 10 all_issues.extend(logrotate['issues']) if not backup['backup_enabled']: health_score -= 10 all_issues.append("Backups not enabled") health_score = max(0, health_score) return { 'health_score': round(health_score, 1), 'status': 'healthy' if health_score >= 80 else 'degraded' if health_score >= 60 else 'critical', 'cron_jobs': cron, 'systemd_services': systemd, 'watchdog': watchdog, 'log_rotation': logrotate, 'backup_routine': backup, 'total_issues': len(all_issues), 'issues': all_issues[:10], # First 10 issues 'recommendations': self._generate_recommendations( cron, systemd, watchdog, logrotate, backup ), 'timestamp': datetime.now().isoformat() } def _generate_recommendations(self, cron, systemd, watchdog, logrotate, backup) -> List[str]: """Generate recommendations based on routine validation.""" recommendations = [] if cron['status'] != 'valid': recommendations.append("Fix cron job configuration") if systemd['status'] == 'unhealthy': recommendations.append("Enable and start systemd services") if not watchdog['watchdog_running']: recommendations.append("Start watchdog monitoring process") if not logrotate['logrotate_configured']: recommendations.append("Configure log rotation") if not backup['backup_enabled']: recommendations.append("Enable backup routine") if not recommendations: recommendations.append("All maintenance routines configured and running") return recommendations if __name__ == '__main__': validator = RoutineValidator() print("=" * 70) print("MAINTENANCE ROUTINE VALIDATION") print("=" * 70) report = validator.generate_routine_validation_report() print(f"Health Score: {report['health_score']}/100 ({report['status'].upper()})") print(f"\nCron Jobs: {report['cron_jobs']['status']}") print(f"Systemd Services: {report['systemd_services']['status']}") print(f"Watchdog: {'Running' if report['watchdog']['watchdog_running'] else 'Not running'}") print(f"Log Rotation: {'Configured' if report['log_rotation']['logrotate_configured'] else 'Not configured'}") print(f"Backups: {'Enabled' if report['backup_routine']['backup_enabled'] else 'Disabled'}") print(f"\nIssues found: {report['total_issues']}") for issue in report['issues']: print(f" - {issue}") print(f"\nRecommendations:") for rec in report['recommendations']: print(f" - {rec}")