#!/usr/bin/env python3 """ Luzia Load Balancer - Intelligent Task Distribution and Load Management Implements: - CPU and memory tracking per agent - Task queue depth monitoring - Least-loaded agent selection - Load threshold enforcement - Auto-scaling decisions - Backpressure handling - Queue saturation detection Features: 1. Multi-dimensional load tracking (CPU, memory, queue depth) 2. Weighted scoring for agent selection 3. Load threshold enforcement (80% max utilization) 4. Backpressure handling when overloaded 5. Auto-scale recommendations 6. Health-based agent exclusion """ import psutil import os import subprocess from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass from enum import Enum import logging import json logger = logging.getLogger(__name__) class LoadLevel(Enum): """Load level classification""" LOW = "low" # < 40% utilization MODERATE = "moderate" # 40-70% utilization HIGH = "high" # 70-85% utilization CRITICAL = "critical" # > 85% utilization @dataclass class AgentLoad: """Agent load metrics""" agent_id: str cpu_percent: float memory_percent: float task_count: int is_healthy: bool last_heartbeat: Optional[datetime] load_level: LoadLevel utilization_score: float class LuziaLoadBalancer: """Intelligent load balancing for agent tasks""" # Configuration constants LOAD_THRESHOLD = 0.80 # 80% max utilization HIGH_LOAD_THRESHOLD = 0.70 MODERATE_LOAD_THRESHOLD = 0.40 HEALTH_TIMEOUT_SECONDS = 60 # Weights for load calculation CPU_WEIGHT = 0.40 MEMORY_WEIGHT = 0.35 QUEUE_WEIGHT = 0.25 def __init__(self, queue_manager=None): """ Initialize load balancer. Args: queue_manager: LuziaQueueManager instance for accessing task counts """ self.queue_manager = queue_manager self.load_cache = {} # Cache load scores self.cache_ttl = 5 # seconds def select_agent( self, available_agents: List[str], exclude_unhealthy: bool = True, ) -> Optional[str]: """ Select the best agent based on current load. Args: available_agents: List of agent IDs to consider exclude_unhealthy: Skip agents that haven't sent heartbeat Returns: Agent ID or None if no suitable agent found """ if not available_agents: return None # Get load metrics for all agents loads = [] for agent_id in available_agents: load = self.get_agent_load(agent_id) if not load: continue # Skip unhealthy agents if requested if exclude_unhealthy and not load.is_healthy: logger.info(f"Agent {agent_id} unhealthy, skipping") continue # Skip if over threshold if load.utilization_score > self.LOAD_THRESHOLD: logger.info(f"Agent {agent_id} over threshold ({load.utilization_score:.1%}), skipping") continue loads.append(load) if not loads: logger.warning(f"No suitable agents found for {available_agents}") return None # Sort by utilization (ascending) and return lowest best = min(loads, key=lambda x: x.utilization_score) logger.info( f"Selected agent {best.agent_id} with {best.utilization_score:.1%} utilization" ) return best.agent_id def get_agent_load(self, agent_id: str) -> Optional[AgentLoad]: """ Get current load metrics for an agent. Args: agent_id: Agent identifier Returns: AgentLoad object or None if agent not found """ # Check cache first if agent_id in self.load_cache: cached = self.load_cache[agent_id] if (datetime.now() - cached["timestamp"]).total_seconds() < self.cache_ttl: return cached["load"] # Fetch from queue manager if not self.queue_manager: return None stats = self.queue_manager.get_agent_stats(agent_id) if not stats: return None # Check health last_hb = stats.get("last_heartbeat") is_healthy = True if last_hb: last_beat = datetime.fromisoformat(last_hb) is_healthy = (datetime.now() - last_beat).total_seconds() < self.HEALTH_TIMEOUT_SECONDS else: is_healthy = stats.get("is_healthy", False) # Get metrics cpu_percent = stats.get("cpu_percent", 0.0) memory_percent = stats.get("memory_percent", 0.0) task_count = stats.get("active_tasks", 0) # Calculate load score (0-1) # Normalize metrics to 0-1 range cpu_norm = min(1.0, cpu_percent / 100.0) mem_norm = min(1.0, memory_percent / 100.0) # Assume max 10 concurrent tasks per agent queue_norm = min(1.0, task_count / 10.0) utilization = ( cpu_norm * self.CPU_WEIGHT + mem_norm * self.MEMORY_WEIGHT + queue_norm * self.QUEUE_WEIGHT ) # Classify load level if utilization < self.MODERATE_LOAD_THRESHOLD: load_level = LoadLevel.LOW elif utilization < self.HIGH_LOAD_THRESHOLD: load_level = LoadLevel.MODERATE elif utilization < self.LOAD_THRESHOLD: load_level = LoadLevel.HIGH else: load_level = LoadLevel.CRITICAL load = AgentLoad( agent_id=agent_id, cpu_percent=cpu_percent, memory_percent=memory_percent, task_count=task_count, is_healthy=is_healthy, last_heartbeat=datetime.fromisoformat(last_hb) if last_hb else None, load_level=load_level, utilization_score=utilization, ) # Cache the result self.load_cache[agent_id] = { "load": load, "timestamp": datetime.now(), } return load def get_cluster_load(self) -> Dict[str, Any]: """Get overall cluster load metrics.""" if not self.queue_manager: return {"error": "Queue manager not initialized"} all_agents = self.queue_manager.get_all_agent_stats() if not all_agents: return { "total_agents": 0, "healthy_agents": 0, "average_utilization": 0.0, "cluster_load_level": LoadLevel.LOW.value, "recommendation": "No agents available", } loads = [] healthy_count = 0 for agent_stat in all_agents: load = self.get_agent_load(agent_stat["agent_id"]) if load: loads.append(load) if load.is_healthy: healthy_count += 1 if not loads: return { "total_agents": len(all_agents), "healthy_agents": 0, "average_utilization": 0.0, "cluster_load_level": LoadLevel.CRITICAL.value, "recommendation": "All agents unhealthy", } avg_util = sum(l.utilization_score for l in loads) / len(loads) # Determine cluster load level if avg_util < self.MODERATE_LOAD_THRESHOLD: cluster_level = LoadLevel.LOW elif avg_util < self.HIGH_LOAD_THRESHOLD: cluster_level = LoadLevel.MODERATE elif avg_util < self.LOAD_THRESHOLD: cluster_level = LoadLevel.HIGH else: cluster_level = LoadLevel.CRITICAL # Auto-scale recommendations recommendation = self._get_scaling_recommendation(len(loads), healthy_count, avg_util) return { "total_agents": len(loads), "healthy_agents": healthy_count, "average_utilization": avg_util, "cluster_load_level": cluster_level.value, "agents": [ { "id": load.agent_id, "cpu": load.cpu_percent, "memory": load.memory_percent, "tasks": load.task_count, "healthy": load.is_healthy, "utilization": load.utilization_score, "level": load.load_level.value, } for load in sorted(loads, key=lambda x: x.utilization_score, reverse=True) ], "recommendation": recommendation, } def check_backpressure(self) -> Tuple[bool, str]: """ Check if system is under backpressure (overloaded). Returns: (is_backpressured, reason) """ if not self.queue_manager: return False, "" queue_stats = self.queue_manager.get_queue_stats() cluster = self.get_cluster_load() # Check queue depth pending_count = queue_stats.get("pending_count", 0) if pending_count > 50: return True, f"Queue depth too high ({pending_count} pending)" # Check cluster load cluster_util = cluster.get("average_utilization", 0) if cluster_util > self.LOAD_THRESHOLD: healthy = cluster.get("healthy_agents", 0) if healthy < 2: # Not enough agents return True, f"Insufficient healthy agents ({healthy}) with {cluster_util:.1%} utilization" return False, "" def should_add_agent(self) -> bool: """Determine if cluster should add more agents.""" cluster = self.get_cluster_load() avg_util = cluster.get("average_utilization", 0) healthy_count = cluster.get("healthy_agents", 0) # Add if: high utilization AND few agents return avg_util > 0.70 and healthy_count < 10 def should_remove_agent(self) -> Optional[str]: """ Determine if any agent should be removed. Returns: Agent ID to remove or None """ cluster = self.get_cluster_load() avg_util = cluster.get("average_utilization", 0) agents = cluster.get("agents", []) # Only remove if: low overall load AND multiple agents if avg_util < 0.30 and len(agents) > 2: # Find lowest-load agent lowest = min(agents, key=lambda x: x["utilization"]) if lowest["utilization"] < 0.10: return lowest["id"] return None def get_recommendations(self) -> Dict[str, Any]: """Get comprehensive system recommendations.""" is_backpressured, backpressure_reason = self.check_backpressure() should_add = self.should_add_agent() should_remove = self.should_remove_agent() recommendations = [] if is_backpressured: recommendations.append(f"URGENT: Backpressure detected: {backpressure_reason}") if should_add: recommendations.append("SCALE UP: Consider adding more agents to handle load") if should_remove: recommendations.append(f"SCALE DOWN: Consider removing agent {should_remove} (idle)") cluster = self.get_cluster_load() avg_util = cluster.get("average_utilization", 0) if avg_util > 0.85: recommendations.append("WARNING: High cluster utilization, monitor for bottlenecks") elif avg_util < 0.20: recommendations.append("INFO: Low cluster utilization, system is underutilized") return { "backpressured": is_backpressured, "backpressure_reason": backpressure_reason, "should_add_agent": should_add, "should_remove_agent": should_remove, "recommendations": recommendations, "cluster": cluster, } # Helper methods def _get_scaling_recommendation( self, total_agents: int, healthy_agents: int, avg_utilization: float ) -> str: """Get scaling recommendation based on metrics.""" if healthy_agents < total_agents * 0.5: return "ALERT: >50% of agents are unhealthy" if avg_utilization > self.LOAD_THRESHOLD: return "SCALE UP: Add agents to handle high load" if avg_utilization < 0.20 and total_agents > 2: return "SCALE DOWN: Remove idle agents to save resources" return "Cluster is operating normally" def clear_cache(self): """Clear load cache to force refresh on next call.""" self.load_cache.clear() # Utility functions for system-level metrics def get_system_load() -> Tuple[float, float]: """ Get system-level CPU and memory usage. Returns: (cpu_percent, memory_percent) """ try: cpu = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory().percent return cpu, memory except Exception as e: logger.error(f"Error getting system metrics: {e}") return 0.0, 0.0 def get_process_load(pid: int) -> Tuple[float, float]: """ Get CPU and memory usage for a specific process. Args: pid: Process ID Returns: (cpu_percent, memory_percent) """ try: process = psutil.Process(pid) cpu = process.cpu_percent(interval=0.1) memory = process.memory_percent() return cpu, memory except Exception as e: logger.error(f"Error getting process {pid} metrics: {e}") return 0.0, 0.0 def get_active_agent_count() -> int: """Get count of active agent processes.""" try: # Look for claude or agent processes count = 0 for proc in psutil.process_iter(['name', 'cmdline']): try: if 'claude' in proc.name().lower() or 'agent' in proc.name().lower(): count += 1 except: pass return count except Exception as e: logger.error(f"Error counting agents: {e}") return 0 # Module exports __all__ = [ "LuziaLoadBalancer", "AgentLoad", "LoadLevel", "get_system_load", "get_process_load", "get_active_agent_count", ]