Files
luzia/lib/routine_validator.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

415 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Routine Validator
Validates maintenance routines and scheduled tasks:
- Cron job configuration
- Watchdog monitoring status
- Log rotation schedule
- Backup routine health
"""
import json
import subprocess
from pathlib import Path
from typing import List, Dict
from datetime import datetime
class RoutineValidator:
"""Validate orchestrator maintenance routines."""
ORCHESTRATOR_ROOT = Path('/opt/server-agents/orchestrator')
CRON_DIR = Path('/etc/cron.d')
SYSTEMD_DIR = Path('/etc/systemd/system')
def __init__(self):
"""Initialize routine validator."""
pass
def validate_cron_jobs(self) -> Dict:
"""
Validate cron job configuration.
Returns:
Dict with cron validation results
"""
results = {
'cron_jobs': [],
'status': 'unknown',
'issues': []
}
# Check for orchestrator cron jobs
orchestrator_cron = self.CRON_DIR / 'luzia-orchestrator'
if not orchestrator_cron.exists():
results['issues'].append("Orchestrator cron file not found")
results['status'] = 'missing'
return results
try:
content = orchestrator_cron.read_text()
lines = content.strip().split('\n')
# Parse cron entries
expected_jobs = {
'health_check': r'luzia health.*--full',
'cleanup': r'luzia cleanup',
'log_rotation': r'logrotate.*luzia',
'backup': r'backup.*create'
}
for job_name, pattern in expected_jobs.items():
import re
found = any(re.search(pattern, line) for line in lines if not line.startswith('#'))
results['cron_jobs'].append({
'name': job_name,
'configured': found
})
# Check cron syntax
try:
result = subprocess.run(
['crontab', '-l'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
results['status'] = 'valid'
else:
results['issues'].append("Cron syntax invalid")
results['status'] = 'invalid'
except subprocess.TimeoutExpired:
results['issues'].append("Cron check timeout")
except Exception as e:
results['issues'].append(f"Cannot read cron file: {e}")
results['status'] = 'error'
return results
def validate_systemd_services(self) -> Dict:
"""
Validate systemd service configuration for orchestrator.
Returns:
Dict with systemd validation results
"""
results = {
'services': [],
'status': 'unknown',
'issues': []
}
expected_services = [
'luzia-orchestrator',
'luzia-conductor',
'luzia-healthcheck'
]
for service_name in expected_services:
service_file = self.SYSTEMD_DIR / f'{service_name}.service'
service_status = {
'service': service_name,
'file_exists': service_file.exists(),
'enabled': False,
'running': False
}
if service_file.exists():
try:
# Check if enabled
result = subprocess.run(
['systemctl', 'is-enabled', service_name],
capture_output=True,
timeout=5
)
service_status['enabled'] = result.returncode == 0
# Check if running
result = subprocess.run(
['systemctl', 'is-active', service_name],
capture_output=True,
timeout=5
)
service_status['running'] = result.returncode == 0
except subprocess.TimeoutExpired:
results['issues'].append(f"Timeout checking {service_name}")
except Exception as e:
results['issues'].append(f"Cannot check {service_name}: {e}")
results['services'].append(service_status)
# Overall status
running_count = sum(1 for s in results['services'] if s['running'])
enabled_count = sum(1 for s in results['services'] if s['enabled'])
if running_count == len(expected_services):
results['status'] = 'healthy'
elif running_count > 0:
results['status'] = 'degraded'
else:
results['status'] = 'unhealthy'
return results
def validate_watchdog_monitoring(self) -> Dict:
"""
Validate watchdog monitoring configuration.
Returns:
Dict with watchdog status
"""
results = {
'watchdog_running': False,
'monitoring_targets': [],
'issues': [],
'last_check': None
}
# Check if watchdog script exists
watchdog_script = self.ORCHESTRATOR_ROOT / 'lib' / 'watchdog.py'
if not watchdog_script.exists():
results['issues'].append("Watchdog script not found")
return results
# Check if watchdog process is running
try:
result = subprocess.run(
['pgrep', '-f', 'watchdog.py'],
capture_output=True,
timeout=5
)
results['watchdog_running'] = result.returncode == 0
except Exception as e:
results['issues'].append(f"Cannot check watchdog status: {e}")
# Check monitoring targets
config_file = self.ORCHESTRATOR_ROOT / 'config.json'
if config_file.exists():
try:
config = json.loads(config_file.read_text())
monitoring = config.get('monitoring', {})
results['monitoring_targets'] = list(monitoring.keys())
except Exception as e:
results['issues'].append(f"Cannot read config: {e}")
# Check for recent watchdog logs
log_dir = Path('/var/log/luz-orchestrator')
if log_dir.exists():
watchdog_logs = list(log_dir.glob('*watchdog*'))
if watchdog_logs:
latest = max(watchdog_logs, key=lambda p: p.stat().st_mtime)
results['last_check'] = datetime.fromtimestamp(
latest.stat().st_mtime
).isoformat()
return results
def validate_log_rotation(self) -> Dict:
"""
Validate log rotation configuration.
Returns:
Dict with log rotation status
"""
results = {
'logrotate_configured': False,
'log_dirs': [],
'rotation_schedule': 'unknown',
'issues': []
}
# Check for logrotate config
logrotate_config = Path('/etc/logrotate.d/luzia')
if logrotate_config.exists():
results['logrotate_configured'] = True
try:
content = logrotate_config.read_text()
# Parse rotation schedule
if 'daily' in content:
results['rotation_schedule'] = 'daily'
elif 'weekly' in content:
results['rotation_schedule'] = 'weekly'
elif 'monthly' in content:
results['rotation_schedule'] = 'monthly'
except Exception as e:
results['issues'].append(f"Cannot read logrotate config: {e}")
else:
results['issues'].append("Logrotate configuration not found")
# Check log directories
log_dirs = [
'/var/log/luz-orchestrator',
'/var/log/luzia',
'/home/admin/conductor/logs'
]
for log_dir in log_dirs:
if Path(log_dir).exists():
results['log_dirs'].append(log_dir)
return results
def validate_backup_routine(self) -> Dict:
"""
Validate backup routine configuration.
Returns:
Dict with backup routine status
"""
results = {
'backup_enabled': False,
'backup_target': None,
'last_backup': None,
'backup_frequency': 'unknown',
'issues': []
}
# Check for backup script
backup_script = self.ORCHESTRATOR_ROOT / 'lib' / 'kg_maintainer.py' # Uses backup internally
if not backup_script.exists():
results['issues'].append("Backup script not found")
return results
# Check backup configuration
config_file = self.ORCHESTRATOR_ROOT / 'config.json'
if config_file.exists():
try:
config = json.loads(config_file.read_text())
backup_config = config.get('backup', {})
results['backup_enabled'] = backup_config.get('enabled', False)
results['backup_target'] = backup_config.get('target')
results['backup_frequency'] = backup_config.get('frequency', 'unknown')
if backup_config.get('enabled'):
results['backup_enabled'] = True
except Exception as e:
results['issues'].append(f"Cannot read backup config: {e}")
# Check for recent backups
backup_dir = Path('/var/backups/luz-orchestrator')
if backup_dir.exists():
backups = list(backup_dir.glob('backup_*'))
if backups:
latest = max(backups, key=lambda p: p.stat().st_mtime)
results['last_backup'] = datetime.fromtimestamp(
latest.stat().st_mtime
).isoformat()
else:
results['issues'].append("Backup directory not found")
return results
def generate_routine_validation_report(self) -> Dict:
"""
Generate comprehensive maintenance routine validation report.
Returns:
Dict with all routine validations and health score
"""
cron = self.validate_cron_jobs()
systemd = self.validate_systemd_services()
watchdog = self.validate_watchdog_monitoring()
logrotate = self.validate_log_rotation()
backup = self.validate_backup_routine()
# Calculate health score
health_score = 100
all_issues = []
if cron['status'] != 'valid':
health_score -= 20
all_issues.extend(cron['issues'])
if systemd['status'] != 'healthy':
health_score -= 25
all_issues.extend([f"Systemd: {s['service']} is {s.get('status', 'unknown')}"
for s in systemd['services'] if not s['running']])
if not watchdog['watchdog_running']:
health_score -= 15
all_issues.extend(watchdog['issues'])
if not logrotate['logrotate_configured']:
health_score -= 10
all_issues.extend(logrotate['issues'])
if not backup['backup_enabled']:
health_score -= 10
all_issues.append("Backups not enabled")
health_score = max(0, health_score)
return {
'health_score': round(health_score, 1),
'status': 'healthy' if health_score >= 80 else 'degraded' if health_score >= 60 else 'critical',
'cron_jobs': cron,
'systemd_services': systemd,
'watchdog': watchdog,
'log_rotation': logrotate,
'backup_routine': backup,
'total_issues': len(all_issues),
'issues': all_issues[:10], # First 10 issues
'recommendations': self._generate_recommendations(
cron, systemd, watchdog, logrotate, backup
),
'timestamp': datetime.now().isoformat()
}
def _generate_recommendations(self, cron, systemd, watchdog, logrotate, backup) -> List[str]:
"""Generate recommendations based on routine validation."""
recommendations = []
if cron['status'] != 'valid':
recommendations.append("Fix cron job configuration")
if systemd['status'] == 'unhealthy':
recommendations.append("Enable and start systemd services")
if not watchdog['watchdog_running']:
recommendations.append("Start watchdog monitoring process")
if not logrotate['logrotate_configured']:
recommendations.append("Configure log rotation")
if not backup['backup_enabled']:
recommendations.append("Enable backup routine")
if not recommendations:
recommendations.append("All maintenance routines configured and running")
return recommendations
if __name__ == '__main__':
validator = RoutineValidator()
print("=" * 70)
print("MAINTENANCE ROUTINE VALIDATION")
print("=" * 70)
report = validator.generate_routine_validation_report()
print(f"Health Score: {report['health_score']}/100 ({report['status'].upper()})")
print(f"\nCron Jobs: {report['cron_jobs']['status']}")
print(f"Systemd Services: {report['systemd_services']['status']}")
print(f"Watchdog: {'Running' if report['watchdog']['watchdog_running'] else 'Not running'}")
print(f"Log Rotation: {'Configured' if report['log_rotation']['logrotate_configured'] else 'Not configured'}")
print(f"Backups: {'Enabled' if report['backup_routine']['backup_enabled'] else 'Disabled'}")
print(f"\nIssues found: {report['total_issues']}")
for issue in report['issues']:
print(f" - {issue}")
print(f"\nRecommendations:")
for rec in report['recommendations']:
print(f" - {rec}")