#!/usr/bin/env python3 """ Luzia Queue CLI Commands Provides command-line interface for queue management: - queue status: Show queue state and pending items - queue add: Add task to queue - queue flush: Process all pending requests - agents status: Show agent load distribution - agents allocate: Trigger rebalancing """ import json import sys from pathlib import Path from typing import Optional from datetime import datetime from tabulate import tabulate from luzia_queue_manager import LuziaQueueManager, TaskPriority, TaskStatus from luzia_load_balancer import LuziaLoadBalancer class QueueCLI: """Queue management CLI interface""" def __init__(self): """Initialize CLI with queue manager and load balancer""" self.queue_manager = LuziaQueueManager() self.load_balancer = LuziaLoadBalancer(self.queue_manager) def queue_status(self, verbose: bool = False) -> int: """ Show queue state and pending items. Returns: Exit code """ stats = self.queue_manager.get_queue_stats() print("\n" + "="*70) print("LUZIA QUEUE STATUS".center(70)) print("="*70 + "\n") # Overall stats total = stats.get("total_tasks", 0) pending = stats.get("pending_count", 0) active = stats.get("active_count", 0) print(f"Total Tasks: {total}") print(f"Pending: {pending}") print(f"Active: {active}") oldest_age = stats.get("oldest_pending_age_seconds") if oldest_age is not None: hours = int(oldest_age // 3600) mins = int((oldest_age % 3600) // 60) print(f"Oldest Pending: {hours}h {mins}m") # Status breakdown print("\nStatus Breakdown:") status_counts = stats.get("by_status", {}) for status, count in sorted(status_counts.items()): print(f" {status:12s}: {count:3d}") # Priority breakdown print("\nPriority Breakdown (pending + queued):") priority_counts = stats.get("by_priority", {}) priority_names = { 1: "CRITICAL", 2: "HIGH", 3: "NORMAL", 4: "LOW", } for priority_num in [1, 2, 3, 4]: count = priority_counts.get(priority_num, 0) name = priority_names.get(priority_num, "UNKNOWN") print(f" {name:8s}: {count:3d}") # Project breakdown if stats.get("by_project"): print("\nProject Breakdown:") for project, count in sorted(stats.get("by_project", {}).items(), key=lambda x: -x[1]): print(f" {project:20s}: {count:3d}") # Show pending tasks if requested if verbose and pending > 0: print("\n" + "-"*70) print("PENDING TASKS (Top 10):") print("-"*70) pending_tasks = self.queue_manager.get_pending_tasks(limit=10) table_data = [] for task in pending_tasks: created = datetime.fromisoformat(task.created_at) age_mins = int((datetime.now() - created).total_seconds() / 60) table_data.append([ task.id[:20], task.project, task.priority.name, task.status.value, f"{age_mins}m", task.task_description[:40], ]) print(tabulate( table_data, headers=["ID", "Project", "Priority", "Status", "Age", "Description"], tablefmt="simple", )) print("\n" + "="*70 + "\n") return 0 def queue_add(self, project: str, task: str, priority: str = "normal", metadata: Optional[str] = None) -> int: """ Add task to queue. Args: project: Project name task: Task description priority: Priority level (critical, high, normal, low) metadata: Optional JSON metadata Returns: Exit code """ # Validate priority priority_map = { "critical": TaskPriority.CRITICAL, "high": TaskPriority.HIGH, "normal": TaskPriority.NORMAL, "low": TaskPriority.LOW, } if priority.lower() not in priority_map: print(f"Error: Invalid priority '{priority}'. Must be one of: critical, high, normal, low") return 1 # Parse metadata if provided meta = None if metadata: try: meta = json.loads(metadata) except json.JSONDecodeError: print(f"Error: Invalid JSON metadata: {metadata}") return 1 # Enqueue task try: task_id = self.queue_manager.enqueue_task( project=project, task=task, priority=priority_map[priority.lower()], metadata=meta, ) print(f"Task added: {task_id}") return 0 except Exception as e: print(f"Error: Failed to add task: {e}") return 1 def queue_flush(self, dry_run: bool = False) -> int: """ Process all pending requests. Migrates pending requests from pending-requests.json to queue with appropriate priorities. Args: dry_run: If True, show what would be done without doing it Returns: Exit code """ pending_file = Path("/opt/server-agents/state/pending-requests.json") if not pending_file.exists(): print("Error: pending-requests.json not found") return 1 try: with open(pending_file) as f: data = json.load(f) except json.JSONDecodeError: print("Error: Invalid JSON in pending-requests.json") return 1 pending_list = data.get("pending", []) if not pending_list: print("No pending requests to process") return 0 # Process pending requests count = 0 for req in pending_list: req_id = req.get("id") req_type = req.get("type") user = req.get("user", "unknown") reason = req.get("reason", req.get("parameter", "No description")) # Determine priority based on request type and content priority = TaskPriority.NORMAL if "URGENT" in reason.upper(): priority = TaskPriority.HIGH elif req.get("status") == "approved": priority = TaskPriority.HIGH # Create task description task_desc = f"{req_type} from {user}: {reason[:100]}" # Create metadata metadata = { "original_request_id": req_id, "request_type": req_type, "user": user, "request_status": req.get("status"), "parameter": req.get("parameter"), } if dry_run: print(f"Would add: {task_desc[:60]}... (priority={priority.name})") else: task_id = self.queue_manager.enqueue_task( project=user, task=task_desc, priority=priority, metadata=metadata, ) print(f"Added: {task_id}") count += 1 if dry_run: print(f"\n(dry-run) Would add {len(pending_list)} tasks") else: print(f"\nSuccessfully added {count} tasks to queue") return 0 def agents_status(self, sort_by: str = "load") -> int: """ Show agent load distribution. Args: sort_by: Sort key (load, cpu, memory, tasks, health) Returns: Exit code """ cluster_info = self.load_balancer.get_cluster_load() print("\n" + "="*90) print("AGENT STATUS".center(90)) print("="*90 + "\n") print(f"Total Agents: {cluster_info.get('total_agents', 0)}") print(f"Healthy Agents: {cluster_info.get('healthy_agents', 0)}") print(f"Average Utilization: {cluster_info.get('average_utilization', 0):.1%}") print(f"Cluster Load Level: {cluster_info.get('cluster_load_level', 'unknown').upper()}") recommendation = cluster_info.get("recommendation", "") if recommendation: print(f"Recommendation: {recommendation}") # Agent details table agents = cluster_info.get("agents", []) if agents: print("\n" + "-"*90) print("AGENT DETAILS:") print("-"*90) # Sort agents if sort_by == "cpu": agents = sorted(agents, key=lambda x: x.get("cpu", 0), reverse=True) elif sort_by == "memory": agents = sorted(agents, key=lambda x: x.get("memory", 0), reverse=True) elif sort_by == "tasks": agents = sorted(agents, key=lambda x: x.get("tasks", 0), reverse=True) elif sort_by == "health": agents = sorted(agents, key=lambda x: not x.get("healthy", True)) # else: keep default sort by utilization table_data = [] for agent in agents: healthy_str = "YES" if agent.get("healthy") else "NO" table_data.append([ agent.get("id", "unknown")[:20], f"{agent.get('cpu', 0):.1f}%", f"{agent.get('memory', 0):.1f}%", agent.get("tasks", 0), healthy_str, f"{agent.get('utilization', 0):.1%}", agent.get("level", "unknown").upper(), ]) print(tabulate( table_data, headers=["Agent ID", "CPU", "Memory", "Tasks", "Healthy", "Util.", "Level"], tablefmt="simple", )) print("\n" + "="*90 + "\n") return 0 def agents_allocate(self) -> int: """ Trigger rebalancing and show recommendations. Returns: Exit code """ recommendations = self.load_balancer.get_recommendations() print("\n" + "="*70) print("LOAD BALANCER RECOMMENDATIONS".center(70)) print("="*70 + "\n") backpressured = recommendations.get("backpressured", False) if backpressured: print(f"BACKPRESSURE: {recommendations.get('backpressure_reason', 'Unknown')}") else: print("Backpressure: No") should_add = recommendations.get("should_add_agent", False) should_remove = recommendations.get("should_remove_agent", False) if should_add: print("Scale Action: ADD AGENTS") elif should_remove: print(f"Scale Action: REMOVE AGENT {should_remove}") else: print("Scale Action: No action required") print("\nRecommendations:") for rec in recommendations.get("recommendations", []): print(f" - {rec}") # Show cluster stats cluster = recommendations.get("cluster", {}) print(f"\nCluster Utilization: {cluster.get('average_utilization', 0):.1%}") print(f"Cluster Load Level: {cluster.get('cluster_load_level', 'unknown').upper()}") print("\n" + "="*70 + "\n") return 0 def main(): """CLI entry point""" if len(sys.argv) < 2: print("Usage: luzia-queue [options]") print("\nCommands:") print(" queue status [--verbose] Show queue state") print(" queue add [--priority LEVEL] [--metadata JSON]") print(" queue flush [--dry-run] Migrate pending requests to queue") print(" agents status [--sort-by KEY] Show agent load distribution") print(" agents allocate Show rebalancing recommendations") return 1 cli = QueueCLI() command = sys.argv[1] try: if command == "queue": if len(sys.argv) < 3: print("Usage: luzia-queue queue ") return 1 subcommand = sys.argv[2] if subcommand == "status": verbose = "--verbose" in sys.argv or "-v" in sys.argv return cli.queue_status(verbose=verbose) elif subcommand == "add": if len(sys.argv) < 5: print("Usage: luzia-queue queue add [--priority LEVEL] [--metadata JSON]") return 1 project = sys.argv[3] task = sys.argv[4] priority = "normal" metadata = None # Parse optional arguments i = 5 while i < len(sys.argv): if sys.argv[i] == "--priority" and i + 1 < len(sys.argv): priority = sys.argv[i + 1] i += 2 elif sys.argv[i] == "--metadata" and i + 1 < len(sys.argv): metadata = sys.argv[i + 1] i += 2 else: i += 1 return cli.queue_add(project, task, priority, metadata) elif subcommand == "flush": dry_run = "--dry-run" in sys.argv return cli.queue_flush(dry_run=dry_run) else: print(f"Unknown queue subcommand: {subcommand}") return 1 elif command == "agents": if len(sys.argv) < 3: print("Usage: luzia-queue agents ") return 1 subcommand = sys.argv[2] if subcommand == "status": sort_by = "load" if "--sort-by" in sys.argv: idx = sys.argv.index("--sort-by") if idx + 1 < len(sys.argv): sort_by = sys.argv[idx + 1] return cli.agents_status(sort_by=sort_by) elif subcommand == "allocate": return cli.agents_allocate() else: print(f"Unknown agents subcommand: {subcommand}") return 1 else: print(f"Unknown command: {command}") return 1 except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())