#!/usr/bin/env python3 """ Project Queue CLI Integration Provides high-level functions for CLI integration of ProjectQueueScheduler. Handles queue status display, task selection, and project-based dispatching. """ import json from pathlib import Path from typing import Dict, Any, Optional, Tuple from datetime import datetime try: from project_queue_scheduler import ProjectQueueScheduler except ImportError: raise ImportError("project_queue_scheduler.py not found in lib directory") class ProjectQueueCLI: """CLI integration for project-based queue scheduling.""" def __init__(self): """Initialize CLI with scheduler.""" self.scheduler = ProjectQueueScheduler() def get_queue_status(self, project: Optional[str] = None) -> str: """ Get queue status as formatted output. Args: project: Optional project name to filter Returns: Formatted status string """ if project: return self._format_project_status(project) else: return self._format_global_status() def _format_global_status(self) -> str: """Format global queue status.""" status = self.scheduler.get_scheduling_status() capacity = self.scheduler._read_capacity() lines = [] lines.append("=" * 70) lines.append("PROJECT-BASED TASK QUEUE STATUS") lines.append("=" * 70) lines.append("") # System capacity system = capacity.get("system", {}) slots = capacity.get("slots", {}) lines.append("SYSTEM CAPACITY:") lines.append(f" Slots: {slots.get('used', 0)}/{slots.get('max', 4)}") lines.append(f" CPU Load: {system.get('load_1m', 0):.2f} " f"(target: <{self.scheduler.config.get('max_cpu_load', 0.8) * (system.get('cpu_count', 4))})") lines.append(f" Memory: {system.get('memory_used_pct', 0)}% " f"(target: <{self.scheduler.config.get('max_memory_pct', 85)}%)") lines.append("") # Queue status total_pending = status.get("total_pending", 0) active_count = status.get("active_count", 0) pending_by_project = status.get("pending_by_project", {}) active_tasks = status.get("active_tasks", {}) lines.append("QUEUE STATUS:") lines.append(f" Pending: {total_pending} tasks across {len(pending_by_project)} projects") lines.append(f" Active: {active_count} projects with running tasks") lines.append("") # By project if pending_by_project or active_tasks: lines.append("BY PROJECT:") all_projects = set(list(pending_by_project.keys()) + list(active_tasks.keys())) for project in sorted(all_projects): pending = pending_by_project.get(project, 0) active = "✓ running" if project in active_tasks else "-" lines.append(f" {project:20s} pending={pending:2d} {active}") else: lines.append(" (queue is empty)") lines.append("") lines.append(f"Scheduling Algorithm: {status.get('scheduling_algorithm', 'unknown')}") lines.append(f"Last Updated: {status.get('timestamp', 'unknown')}") lines.append("=" * 70) return "\n".join(lines) def _format_project_status(self, project: str) -> str: """Format project-specific queue status.""" proj_status = self.scheduler.get_project_queue_status(project) capacity = self.scheduler._read_capacity() lines = [] lines.append("=" * 70) lines.append(f"PROJECT QUEUE: {project}") lines.append("=" * 70) lines.append("") # Project-specific info pending_count = proj_status.get("pending_count", 0) is_running = proj_status.get("is_running", False) active_task = proj_status.get("active_task") lines.append("PROJECT STATUS:") lines.append(f" Pending Tasks: {pending_count}") lines.append(f" Status: {'🔄 executing' if is_running else '⏳ waiting'}") if active_task: lines.append(f" Running Task: {active_task.get('id', '?')}") lines.append("") # Pending tasks pending_tasks = proj_status.get("pending_tasks", []) if pending_tasks: lines.append(f"PENDING TASKS ({len(pending_tasks)}):") for task in pending_tasks[:10]: task_id = task.get("id", "?")[:8] priority = task.get("priority", 5) priority_label = "HIGH" if priority <= 3 else "normal" prompt = task.get("prompt", "")[:50] lines.append(f" [{priority_label}] {task_id} - {prompt}...") if len(pending_tasks) > 10: lines.append(f" ... and {len(pending_tasks) - 10} more") else: lines.append(" (no pending tasks)") lines.append("") lines.append("=" * 70) return "\n".join(lines) def get_next_task_info(self) -> Optional[Dict[str, Any]]: """ Get next executable task with scheduling info. Returns: Dict with task details and scheduling info, or None """ task = self.scheduler.select_next_executable_task() if not task: return None status = self.scheduler.get_scheduling_status() return { "task": task, "can_execute": True, "project": task.get("project"), "scheduling_info": { "total_pending": status.get("total_pending"), "active_projects": len(status.get("active_tasks", {})), "algorithm": status.get("scheduling_algorithm") } } def claim_and_get_next(self, task_id: str, project: str) -> Dict[str, Any]: """ Claim a task and prepare for dispatch. Args: task_id: Task to claim project: Project name Returns: Status dict indicating success/failure """ claimed = self.scheduler.claim_task(task_id, project) return { "success": claimed, "task_id": task_id, "project": project, "message": ( f"Task {task_id} claimed for {project}" if claimed else f"Project {project} already has active task" ) } def release_and_show_next(self, project: str) -> Dict[str, Any]: """ Release completed task and show next available. Args: project: Project name Returns: Status dict with released task and next task info """ released = self.scheduler.release_task(project) next_task_info = self.get_next_task_info() return { "success": released, "project": project, "task_released": released, "next_task": next_task_info.get("task") if next_task_info else None, "scheduling_status": next_task_info.get("scheduling_info") if next_task_info else None } def get_statistics(self) -> Dict[str, Any]: """Get comprehensive queue statistics.""" status = self.scheduler.get_scheduling_status() capacity = self.scheduler._read_capacity() pending_by_project = status.get("pending_by_project", {}) stats = { "timestamp": datetime.now().isoformat(), "queue": { "total_pending": status.get("total_pending", 0), "active_projects": len(status.get("active_tasks", {})), "projects_with_pending": len(pending_by_project), "by_project": pending_by_project }, "capacity": { "slots_used": capacity.get("slots", {}).get("used", 0), "slots_max": capacity.get("slots", {}).get("max", 4), "load_1m": capacity.get("system", {}).get("load_1m", 0), "memory_pct": capacity.get("system", {}).get("memory_used_pct", 0) }, "config": { "algorithm": "per-project-sequential", "max_concurrent_slots": self.scheduler.config.get("max_concurrent_slots", 4), "max_cpu_load": self.scheduler.config.get("max_cpu_load", 0.8), "max_memory_pct": self.scheduler.config.get("max_memory_pct", 85) } } return stats # Convenience functions for CLI integration def get_queue_status(project: Optional[str] = None) -> str: """CLI: Get queue status""" cli = ProjectQueueCLI() return cli.get_queue_status(project) def get_next_task() -> Optional[Dict[str, Any]]: """CLI: Get next executable task""" cli = ProjectQueueCLI() return cli.get_next_task_info() def claim_task(task_id: str, project: str) -> Dict[str, Any]: """CLI: Claim a task for execution""" cli = ProjectQueueCLI() return cli.claim_and_get_next(task_id, project) def release_task(project: str) -> Dict[str, Any]: """CLI: Release a completed task""" cli = ProjectQueueCLI() return cli.release_and_show_next(project) def get_stats() -> Dict[str, Any]: """CLI: Get queue statistics""" cli = ProjectQueueCLI() return cli.get_statistics() def main(): """Test harness""" import sys cli = ProjectQueueCLI() if len(sys.argv) > 1 and sys.argv[1] == "--stats": stats = cli.get_statistics() print(json.dumps(stats, indent=2)) else: project = sys.argv[1] if len(sys.argv) > 1 else None print(cli.get_queue_status(project)) if __name__ == "__main__": main()