Files
luzia/lib/luzia_load_balancer.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

460 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Luzia Load Balancer - Intelligent Task Distribution and Load Management
Implements:
- CPU and memory tracking per agent
- Task queue depth monitoring
- Least-loaded agent selection
- Load threshold enforcement
- Auto-scaling decisions
- Backpressure handling
- Queue saturation detection
Features:
1. Multi-dimensional load tracking (CPU, memory, queue depth)
2. Weighted scoring for agent selection
3. Load threshold enforcement (80% max utilization)
4. Backpressure handling when overloaded
5. Auto-scale recommendations
6. Health-based agent exclusion
"""
import psutil
import os
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
import logging
import json
logger = logging.getLogger(__name__)
class LoadLevel(Enum):
"""Load level classification"""
LOW = "low" # < 40% utilization
MODERATE = "moderate" # 40-70% utilization
HIGH = "high" # 70-85% utilization
CRITICAL = "critical" # > 85% utilization
@dataclass
class AgentLoad:
"""Agent load metrics"""
agent_id: str
cpu_percent: float
memory_percent: float
task_count: int
is_healthy: bool
last_heartbeat: Optional[datetime]
load_level: LoadLevel
utilization_score: float
class LuziaLoadBalancer:
"""Intelligent load balancing for agent tasks"""
# Configuration constants
LOAD_THRESHOLD = 0.80 # 80% max utilization
HIGH_LOAD_THRESHOLD = 0.70
MODERATE_LOAD_THRESHOLD = 0.40
HEALTH_TIMEOUT_SECONDS = 60
# Weights for load calculation
CPU_WEIGHT = 0.40
MEMORY_WEIGHT = 0.35
QUEUE_WEIGHT = 0.25
def __init__(self, queue_manager=None):
"""
Initialize load balancer.
Args:
queue_manager: LuziaQueueManager instance for accessing task counts
"""
self.queue_manager = queue_manager
self.load_cache = {} # Cache load scores
self.cache_ttl = 5 # seconds
def select_agent(
self,
available_agents: List[str],
exclude_unhealthy: bool = True,
) -> Optional[str]:
"""
Select the best agent based on current load.
Args:
available_agents: List of agent IDs to consider
exclude_unhealthy: Skip agents that haven't sent heartbeat
Returns:
Agent ID or None if no suitable agent found
"""
if not available_agents:
return None
# Get load metrics for all agents
loads = []
for agent_id in available_agents:
load = self.get_agent_load(agent_id)
if not load:
continue
# Skip unhealthy agents if requested
if exclude_unhealthy and not load.is_healthy:
logger.info(f"Agent {agent_id} unhealthy, skipping")
continue
# Skip if over threshold
if load.utilization_score > self.LOAD_THRESHOLD:
logger.info(f"Agent {agent_id} over threshold ({load.utilization_score:.1%}), skipping")
continue
loads.append(load)
if not loads:
logger.warning(f"No suitable agents found for {available_agents}")
return None
# Sort by utilization (ascending) and return lowest
best = min(loads, key=lambda x: x.utilization_score)
logger.info(
f"Selected agent {best.agent_id} with {best.utilization_score:.1%} utilization"
)
return best.agent_id
def get_agent_load(self, agent_id: str) -> Optional[AgentLoad]:
"""
Get current load metrics for an agent.
Args:
agent_id: Agent identifier
Returns:
AgentLoad object or None if agent not found
"""
# Check cache first
if agent_id in self.load_cache:
cached = self.load_cache[agent_id]
if (datetime.now() - cached["timestamp"]).total_seconds() < self.cache_ttl:
return cached["load"]
# Fetch from queue manager
if not self.queue_manager:
return None
stats = self.queue_manager.get_agent_stats(agent_id)
if not stats:
return None
# Check health
last_hb = stats.get("last_heartbeat")
is_healthy = True
if last_hb:
last_beat = datetime.fromisoformat(last_hb)
is_healthy = (datetime.now() - last_beat).total_seconds() < self.HEALTH_TIMEOUT_SECONDS
else:
is_healthy = stats.get("is_healthy", False)
# Get metrics
cpu_percent = stats.get("cpu_percent", 0.0)
memory_percent = stats.get("memory_percent", 0.0)
task_count = stats.get("active_tasks", 0)
# Calculate load score (0-1)
# Normalize metrics to 0-1 range
cpu_norm = min(1.0, cpu_percent / 100.0)
mem_norm = min(1.0, memory_percent / 100.0)
# Assume max 10 concurrent tasks per agent
queue_norm = min(1.0, task_count / 10.0)
utilization = (
cpu_norm * self.CPU_WEIGHT +
mem_norm * self.MEMORY_WEIGHT +
queue_norm * self.QUEUE_WEIGHT
)
# Classify load level
if utilization < self.MODERATE_LOAD_THRESHOLD:
load_level = LoadLevel.LOW
elif utilization < self.HIGH_LOAD_THRESHOLD:
load_level = LoadLevel.MODERATE
elif utilization < self.LOAD_THRESHOLD:
load_level = LoadLevel.HIGH
else:
load_level = LoadLevel.CRITICAL
load = AgentLoad(
agent_id=agent_id,
cpu_percent=cpu_percent,
memory_percent=memory_percent,
task_count=task_count,
is_healthy=is_healthy,
last_heartbeat=datetime.fromisoformat(last_hb) if last_hb else None,
load_level=load_level,
utilization_score=utilization,
)
# Cache the result
self.load_cache[agent_id] = {
"load": load,
"timestamp": datetime.now(),
}
return load
def get_cluster_load(self) -> Dict[str, Any]:
"""Get overall cluster load metrics."""
if not self.queue_manager:
return {"error": "Queue manager not initialized"}
all_agents = self.queue_manager.get_all_agent_stats()
if not all_agents:
return {
"total_agents": 0,
"healthy_agents": 0,
"average_utilization": 0.0,
"cluster_load_level": LoadLevel.LOW.value,
"recommendation": "No agents available",
}
loads = []
healthy_count = 0
for agent_stat in all_agents:
load = self.get_agent_load(agent_stat["agent_id"])
if load:
loads.append(load)
if load.is_healthy:
healthy_count += 1
if not loads:
return {
"total_agents": len(all_agents),
"healthy_agents": 0,
"average_utilization": 0.0,
"cluster_load_level": LoadLevel.CRITICAL.value,
"recommendation": "All agents unhealthy",
}
avg_util = sum(l.utilization_score for l in loads) / len(loads)
# Determine cluster load level
if avg_util < self.MODERATE_LOAD_THRESHOLD:
cluster_level = LoadLevel.LOW
elif avg_util < self.HIGH_LOAD_THRESHOLD:
cluster_level = LoadLevel.MODERATE
elif avg_util < self.LOAD_THRESHOLD:
cluster_level = LoadLevel.HIGH
else:
cluster_level = LoadLevel.CRITICAL
# Auto-scale recommendations
recommendation = self._get_scaling_recommendation(len(loads), healthy_count, avg_util)
return {
"total_agents": len(loads),
"healthy_agents": healthy_count,
"average_utilization": avg_util,
"cluster_load_level": cluster_level.value,
"agents": [
{
"id": load.agent_id,
"cpu": load.cpu_percent,
"memory": load.memory_percent,
"tasks": load.task_count,
"healthy": load.is_healthy,
"utilization": load.utilization_score,
"level": load.load_level.value,
}
for load in sorted(loads, key=lambda x: x.utilization_score, reverse=True)
],
"recommendation": recommendation,
}
def check_backpressure(self) -> Tuple[bool, str]:
"""
Check if system is under backpressure (overloaded).
Returns:
(is_backpressured, reason)
"""
if not self.queue_manager:
return False, ""
queue_stats = self.queue_manager.get_queue_stats()
cluster = self.get_cluster_load()
# Check queue depth
pending_count = queue_stats.get("pending_count", 0)
if pending_count > 50:
return True, f"Queue depth too high ({pending_count} pending)"
# Check cluster load
cluster_util = cluster.get("average_utilization", 0)
if cluster_util > self.LOAD_THRESHOLD:
healthy = cluster.get("healthy_agents", 0)
if healthy < 2: # Not enough agents
return True, f"Insufficient healthy agents ({healthy}) with {cluster_util:.1%} utilization"
return False, ""
def should_add_agent(self) -> bool:
"""Determine if cluster should add more agents."""
cluster = self.get_cluster_load()
avg_util = cluster.get("average_utilization", 0)
healthy_count = cluster.get("healthy_agents", 0)
# Add if: high utilization AND few agents
return avg_util > 0.70 and healthy_count < 10
def should_remove_agent(self) -> Optional[str]:
"""
Determine if any agent should be removed.
Returns:
Agent ID to remove or None
"""
cluster = self.get_cluster_load()
avg_util = cluster.get("average_utilization", 0)
agents = cluster.get("agents", [])
# Only remove if: low overall load AND multiple agents
if avg_util < 0.30 and len(agents) > 2:
# Find lowest-load agent
lowest = min(agents, key=lambda x: x["utilization"])
if lowest["utilization"] < 0.10:
return lowest["id"]
return None
def get_recommendations(self) -> Dict[str, Any]:
"""Get comprehensive system recommendations."""
is_backpressured, backpressure_reason = self.check_backpressure()
should_add = self.should_add_agent()
should_remove = self.should_remove_agent()
recommendations = []
if is_backpressured:
recommendations.append(f"URGENT: Backpressure detected: {backpressure_reason}")
if should_add:
recommendations.append("SCALE UP: Consider adding more agents to handle load")
if should_remove:
recommendations.append(f"SCALE DOWN: Consider removing agent {should_remove} (idle)")
cluster = self.get_cluster_load()
avg_util = cluster.get("average_utilization", 0)
if avg_util > 0.85:
recommendations.append("WARNING: High cluster utilization, monitor for bottlenecks")
elif avg_util < 0.20:
recommendations.append("INFO: Low cluster utilization, system is underutilized")
return {
"backpressured": is_backpressured,
"backpressure_reason": backpressure_reason,
"should_add_agent": should_add,
"should_remove_agent": should_remove,
"recommendations": recommendations,
"cluster": cluster,
}
# Helper methods
def _get_scaling_recommendation(
self,
total_agents: int,
healthy_agents: int,
avg_utilization: float
) -> str:
"""Get scaling recommendation based on metrics."""
if healthy_agents < total_agents * 0.5:
return "ALERT: >50% of agents are unhealthy"
if avg_utilization > self.LOAD_THRESHOLD:
return "SCALE UP: Add agents to handle high load"
if avg_utilization < 0.20 and total_agents > 2:
return "SCALE DOWN: Remove idle agents to save resources"
return "Cluster is operating normally"
def clear_cache(self):
"""Clear load cache to force refresh on next call."""
self.load_cache.clear()
# Utility functions for system-level metrics
def get_system_load() -> Tuple[float, float]:
"""
Get system-level CPU and memory usage.
Returns:
(cpu_percent, memory_percent)
"""
try:
cpu = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory().percent
return cpu, memory
except Exception as e:
logger.error(f"Error getting system metrics: {e}")
return 0.0, 0.0
def get_process_load(pid: int) -> Tuple[float, float]:
"""
Get CPU and memory usage for a specific process.
Args:
pid: Process ID
Returns:
(cpu_percent, memory_percent)
"""
try:
process = psutil.Process(pid)
cpu = process.cpu_percent(interval=0.1)
memory = process.memory_percent()
return cpu, memory
except Exception as e:
logger.error(f"Error getting process {pid} metrics: {e}")
return 0.0, 0.0
def get_active_agent_count() -> int:
"""Get count of active agent processes."""
try:
# Look for claude or agent processes
count = 0
for proc in psutil.process_iter(['name', 'cmdline']):
try:
if 'claude' in proc.name().lower() or 'agent' in proc.name().lower():
count += 1
except:
pass
return count
except Exception as e:
logger.error(f"Error counting agents: {e}")
return 0
# Module exports
__all__ = [
"LuziaLoadBalancer",
"AgentLoad",
"LoadLevel",
"get_system_load",
"get_process_load",
"get_active_agent_count",
]