#!/usr/bin/env python3 """ Conductor Maintainer Maintains conductor task tracking system through: - Archival of old completed/failed tasks - Cleanup of temporary files - State consistency validation - Log rotation """ import json import shutil import os from pathlib import Path from typing import List, Dict from datetime import datetime, timedelta class ConductorMaintainer: """Maintain conductor task tracking system.""" CONDUCTOR_ROOT = Path('/home/admin/conductor') ARCHIVE_DIR = CONDUCTOR_ROOT / 'archive' ARCHIVE_THRESHOLD_DAYS = 30 # Archive tasks older than 30 days def __init__(self): """Initialize conductor maintainer.""" self.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) def find_archivable_tasks(self, days_old: int = 30) -> Dict: """ Find completed/failed tasks ready for archival. Args: days_old: Archive tasks older than N days Returns: Dict with tasks to archive """ cutoff_time = datetime.now() - timedelta(days=days_old) archivable = { 'completed': [], 'failed': [], 'total_count': 0, 'estimated_space_mb': 0 } for status_dir in [self.CONDUCTOR_ROOT / 'completed', self.CONDUCTOR_ROOT / 'failed']: if not status_dir.exists(): continue for task_dir in status_dir.iterdir(): if not task_dir.is_dir(): continue try: mtime = datetime.fromtimestamp(task_dir.stat().st_mtime) if mtime < cutoff_time: task_info = { 'task_id': task_dir.name, 'path': str(task_dir), 'age_days': (datetime.now() - mtime).days, 'size_mb': self._get_dir_size_mb(task_dir) } if 'completed' in str(status_dir): archivable['completed'].append(task_info) else: archivable['failed'].append(task_info) archivable['total_count'] += 1 archivable['estimated_space_mb'] += task_info['size_mb'] except Exception: pass return archivable def archive_tasks(self, tasks: List[Dict] = None, dry_run: bool = True) -> Dict: """ Archive old tasks to archive directory. Args: tasks: List of tasks to archive. If None, auto-detect. dry_run: If True, preview only Returns: Dict with archival result """ if tasks is None: archivable = self.find_archivable_tasks(days_old=self.ARCHIVE_THRESHOLD_DAYS) tasks = archivable['completed'] + archivable['failed'] result = { 'tasks_to_archive': len(tasks), 'archived': 0, 'failed': 0, 'actions': [], 'dry_run': dry_run } for task_info in tasks: task_id = task_info['task_id'] source_path = Path(task_info['path']) # Create archive subdirectory archive_path = self.ARCHIVE_DIR / datetime.now().strftime('%Y-%m') / task_id if not dry_run: try: archive_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source_path), str(archive_path)) result['actions'].append(f"Archived {task_id}") result['archived'] += 1 except Exception as e: result['actions'].append(f"Failed to archive {task_id}: {e}") result['failed'] += 1 else: result['actions'].append(f"Would archive {task_id} to {archive_path}") result['archived'] += 1 result['status'] = 'success' if result['failed'] == 0 else 'partial' return result def cleanup_stale_lock_files(self, dry_run: bool = True) -> Dict: """ Clean up stale lock files. Args: dry_run: If True, preview only Returns: Dict with cleanup result """ result = { 'locks_removed': 0, 'actions': [], 'dry_run': dry_run } locks_dir = self.CONDUCTOR_ROOT / 'locks' if not locks_dir.exists(): return result cutoff_time = datetime.now() - timedelta(hours=1) for lock_file in locks_dir.glob('*.lock'): try: mtime = datetime.fromtimestamp(lock_file.stat().st_mtime) if mtime < cutoff_time: result['actions'].append(f"Remove stale lock: {lock_file.name}") if not dry_run: lock_file.unlink() result['locks_removed'] += 1 except Exception as e: result['actions'].append(f"Error cleaning {lock_file.name}: {e}") result['status'] = 'success' return result def cleanup_temp_files(self, dry_run: bool = True) -> Dict: """ Clean up temporary task files. Args: dry_run: If True, preview only Returns: Dict with cleanup result """ result = { 'files_removed': 0, 'space_freed_mb': 0, 'actions': [], 'dry_run': dry_run } # Patterns to remove temp_patterns = ['*.tmp', '*.swp', '*~', '.DS_Store'] for pattern in temp_patterns: for temp_file in self.CONDUCTOR_ROOT.rglob(pattern): if temp_file.is_file(): file_size_mb = temp_file.stat().st_size / (1024 * 1024) result['actions'].append(f"Remove {temp_file.name} ({file_size_mb:.1f}MB)") if not dry_run: try: temp_file.unlink() result['files_removed'] += 1 result['space_freed_mb'] += file_size_mb except Exception as e: result['actions'].append(f"Error removing {temp_file.name}: {e}") result['status'] = 'success' return result def validate_task_integrity(self) -> Dict: """ Validate integrity of all conductor tasks. Returns: Dict with validation results """ result = { 'total_tasks': 0, 'valid_tasks': 0, 'corrupted': [], 'missing_files': [], 'status': 'unknown' } required_files = { 'active': ['meta.json', 'heartbeat.json', 'progress.md'], 'completed': ['meta.json', 'result.json'], 'failed': ['meta.json', 'error.txt'] } for status in ['active', 'completed', 'failed']: status_dir = self.CONDUCTOR_ROOT / status if not status_dir.exists(): continue for task_dir in status_dir.iterdir(): if not task_dir.is_dir(): continue result['total_tasks'] += 1 task_id = task_dir.name # Check required files missing = [] for required_file in required_files[status]: if not (task_dir / required_file).exists(): missing.append(required_file) if missing: result['missing_files'].append({ 'task_id': task_id, 'missing': missing }) else: result['valid_tasks'] += 1 result['status'] = 'healthy' if len(result['corrupted']) == 0 and len(result['missing_files']) == 0 else 'degraded' return result def run_full_conductor_maintenance(self, dry_run: bool = True) -> Dict: """ Run comprehensive conductor maintenance. Args: dry_run: If True, preview only Returns: Dict with maintenance summary """ maintenance_result = { 'timestamp': datetime.now().isoformat(), 'dry_run': dry_run, 'actions_completed': [], 'summary': {} } # 1. Find and archive old tasks archivable = self.find_archivable_tasks(days_old=self.ARCHIVE_THRESHOLD_DAYS) archive_result = self.archive_tasks( tasks=archivable['completed'] + archivable['failed'], dry_run=dry_run ) maintenance_result['actions_completed'].append(f"Archived {archive_result['archived']} tasks") maintenance_result['summary']['tasks_archived'] = archive_result['archived'] maintenance_result['summary']['space_freed_mb'] = archivable['estimated_space_mb'] # 2. Clean up lock files locks_result = self.cleanup_stale_lock_files(dry_run=dry_run) maintenance_result['actions_completed'].append(f"Cleaned {locks_result['locks_removed']} lock files") maintenance_result['summary']['locks_removed'] = locks_result['locks_removed'] # 3. Clean up temp files temp_result = self.cleanup_temp_files(dry_run=dry_run) maintenance_result['actions_completed'].append(f"Removed {temp_result['files_removed']} temp files") maintenance_result['summary']['temp_files_removed'] = temp_result['files_removed'] maintenance_result['summary']['space_freed_temp_mb'] = temp_result['space_freed_mb'] # 4. Validate integrity integrity = self.validate_task_integrity() maintenance_result['summary']['total_tasks'] = integrity['total_tasks'] maintenance_result['summary']['valid_tasks'] = integrity['valid_tasks'] maintenance_result['summary']['corrupted_count'] = len(integrity['corrupted']) maintenance_result['status'] = 'success' return maintenance_result def _get_dir_size_mb(self, path: Path) -> float: """Get directory size in MB.""" total_size = 0 try: for dirpath, dirnames, filenames in os.walk(path): for filename in filenames: filepath = os.path.join(dirpath, filename) if os.path.exists(filepath): total_size += os.path.getsize(filepath) except Exception: pass return total_size / (1024 * 1024) if __name__ == '__main__': maintainer = ConductorMaintainer() print("=" * 70) print("CONDUCTOR MAINTENANCE DRY RUN") print("=" * 70) result = maintainer.run_full_conductor_maintenance(dry_run=True) print(f"\nStatus: {result['status']}") print(f"\nActions:") for action in result['actions_completed']: print(f" - {action}") print(f"\nSummary:") for key, value in result['summary'].items(): print(f" {key}: {value}")