Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
460 lines
14 KiB
Python
460 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Luzia Load Balancer - Intelligent Task Distribution and Load Management
|
|
|
|
Implements:
|
|
- CPU and memory tracking per agent
|
|
- Task queue depth monitoring
|
|
- Least-loaded agent selection
|
|
- Load threshold enforcement
|
|
- Auto-scaling decisions
|
|
- Backpressure handling
|
|
- Queue saturation detection
|
|
|
|
Features:
|
|
1. Multi-dimensional load tracking (CPU, memory, queue depth)
|
|
2. Weighted scoring for agent selection
|
|
3. Load threshold enforcement (80% max utilization)
|
|
4. Backpressure handling when overloaded
|
|
5. Auto-scale recommendations
|
|
6. Health-based agent exclusion
|
|
"""
|
|
|
|
import psutil
|
|
import os
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import logging
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LoadLevel(Enum):
|
|
"""Load level classification"""
|
|
LOW = "low" # < 40% utilization
|
|
MODERATE = "moderate" # 40-70% utilization
|
|
HIGH = "high" # 70-85% utilization
|
|
CRITICAL = "critical" # > 85% utilization
|
|
|
|
|
|
@dataclass
|
|
class AgentLoad:
|
|
"""Agent load metrics"""
|
|
agent_id: str
|
|
cpu_percent: float
|
|
memory_percent: float
|
|
task_count: int
|
|
is_healthy: bool
|
|
last_heartbeat: Optional[datetime]
|
|
load_level: LoadLevel
|
|
utilization_score: float
|
|
|
|
|
|
class LuziaLoadBalancer:
|
|
"""Intelligent load balancing for agent tasks"""
|
|
|
|
# Configuration constants
|
|
LOAD_THRESHOLD = 0.80 # 80% max utilization
|
|
HIGH_LOAD_THRESHOLD = 0.70
|
|
MODERATE_LOAD_THRESHOLD = 0.40
|
|
HEALTH_TIMEOUT_SECONDS = 60
|
|
|
|
# Weights for load calculation
|
|
CPU_WEIGHT = 0.40
|
|
MEMORY_WEIGHT = 0.35
|
|
QUEUE_WEIGHT = 0.25
|
|
|
|
def __init__(self, queue_manager=None):
|
|
"""
|
|
Initialize load balancer.
|
|
|
|
Args:
|
|
queue_manager: LuziaQueueManager instance for accessing task counts
|
|
"""
|
|
self.queue_manager = queue_manager
|
|
self.load_cache = {} # Cache load scores
|
|
self.cache_ttl = 5 # seconds
|
|
|
|
def select_agent(
|
|
self,
|
|
available_agents: List[str],
|
|
exclude_unhealthy: bool = True,
|
|
) -> Optional[str]:
|
|
"""
|
|
Select the best agent based on current load.
|
|
|
|
Args:
|
|
available_agents: List of agent IDs to consider
|
|
exclude_unhealthy: Skip agents that haven't sent heartbeat
|
|
|
|
Returns:
|
|
Agent ID or None if no suitable agent found
|
|
"""
|
|
if not available_agents:
|
|
return None
|
|
|
|
# Get load metrics for all agents
|
|
loads = []
|
|
for agent_id in available_agents:
|
|
load = self.get_agent_load(agent_id)
|
|
if not load:
|
|
continue
|
|
|
|
# Skip unhealthy agents if requested
|
|
if exclude_unhealthy and not load.is_healthy:
|
|
logger.info(f"Agent {agent_id} unhealthy, skipping")
|
|
continue
|
|
|
|
# Skip if over threshold
|
|
if load.utilization_score > self.LOAD_THRESHOLD:
|
|
logger.info(f"Agent {agent_id} over threshold ({load.utilization_score:.1%}), skipping")
|
|
continue
|
|
|
|
loads.append(load)
|
|
|
|
if not loads:
|
|
logger.warning(f"No suitable agents found for {available_agents}")
|
|
return None
|
|
|
|
# Sort by utilization (ascending) and return lowest
|
|
best = min(loads, key=lambda x: x.utilization_score)
|
|
logger.info(
|
|
f"Selected agent {best.agent_id} with {best.utilization_score:.1%} utilization"
|
|
)
|
|
return best.agent_id
|
|
|
|
def get_agent_load(self, agent_id: str) -> Optional[AgentLoad]:
|
|
"""
|
|
Get current load metrics for an agent.
|
|
|
|
Args:
|
|
agent_id: Agent identifier
|
|
|
|
Returns:
|
|
AgentLoad object or None if agent not found
|
|
"""
|
|
# Check cache first
|
|
if agent_id in self.load_cache:
|
|
cached = self.load_cache[agent_id]
|
|
if (datetime.now() - cached["timestamp"]).total_seconds() < self.cache_ttl:
|
|
return cached["load"]
|
|
|
|
# Fetch from queue manager
|
|
if not self.queue_manager:
|
|
return None
|
|
|
|
stats = self.queue_manager.get_agent_stats(agent_id)
|
|
if not stats:
|
|
return None
|
|
|
|
# Check health
|
|
last_hb = stats.get("last_heartbeat")
|
|
is_healthy = True
|
|
if last_hb:
|
|
last_beat = datetime.fromisoformat(last_hb)
|
|
is_healthy = (datetime.now() - last_beat).total_seconds() < self.HEALTH_TIMEOUT_SECONDS
|
|
else:
|
|
is_healthy = stats.get("is_healthy", False)
|
|
|
|
# Get metrics
|
|
cpu_percent = stats.get("cpu_percent", 0.0)
|
|
memory_percent = stats.get("memory_percent", 0.0)
|
|
task_count = stats.get("active_tasks", 0)
|
|
|
|
# Calculate load score (0-1)
|
|
# Normalize metrics to 0-1 range
|
|
cpu_norm = min(1.0, cpu_percent / 100.0)
|
|
mem_norm = min(1.0, memory_percent / 100.0)
|
|
# Assume max 10 concurrent tasks per agent
|
|
queue_norm = min(1.0, task_count / 10.0)
|
|
|
|
utilization = (
|
|
cpu_norm * self.CPU_WEIGHT +
|
|
mem_norm * self.MEMORY_WEIGHT +
|
|
queue_norm * self.QUEUE_WEIGHT
|
|
)
|
|
|
|
# Classify load level
|
|
if utilization < self.MODERATE_LOAD_THRESHOLD:
|
|
load_level = LoadLevel.LOW
|
|
elif utilization < self.HIGH_LOAD_THRESHOLD:
|
|
load_level = LoadLevel.MODERATE
|
|
elif utilization < self.LOAD_THRESHOLD:
|
|
load_level = LoadLevel.HIGH
|
|
else:
|
|
load_level = LoadLevel.CRITICAL
|
|
|
|
load = AgentLoad(
|
|
agent_id=agent_id,
|
|
cpu_percent=cpu_percent,
|
|
memory_percent=memory_percent,
|
|
task_count=task_count,
|
|
is_healthy=is_healthy,
|
|
last_heartbeat=datetime.fromisoformat(last_hb) if last_hb else None,
|
|
load_level=load_level,
|
|
utilization_score=utilization,
|
|
)
|
|
|
|
# Cache the result
|
|
self.load_cache[agent_id] = {
|
|
"load": load,
|
|
"timestamp": datetime.now(),
|
|
}
|
|
|
|
return load
|
|
|
|
def get_cluster_load(self) -> Dict[str, Any]:
|
|
"""Get overall cluster load metrics."""
|
|
if not self.queue_manager:
|
|
return {"error": "Queue manager not initialized"}
|
|
|
|
all_agents = self.queue_manager.get_all_agent_stats()
|
|
if not all_agents:
|
|
return {
|
|
"total_agents": 0,
|
|
"healthy_agents": 0,
|
|
"average_utilization": 0.0,
|
|
"cluster_load_level": LoadLevel.LOW.value,
|
|
"recommendation": "No agents available",
|
|
}
|
|
|
|
loads = []
|
|
healthy_count = 0
|
|
|
|
for agent_stat in all_agents:
|
|
load = self.get_agent_load(agent_stat["agent_id"])
|
|
if load:
|
|
loads.append(load)
|
|
if load.is_healthy:
|
|
healthy_count += 1
|
|
|
|
if not loads:
|
|
return {
|
|
"total_agents": len(all_agents),
|
|
"healthy_agents": 0,
|
|
"average_utilization": 0.0,
|
|
"cluster_load_level": LoadLevel.CRITICAL.value,
|
|
"recommendation": "All agents unhealthy",
|
|
}
|
|
|
|
avg_util = sum(l.utilization_score for l in loads) / len(loads)
|
|
|
|
# Determine cluster load level
|
|
if avg_util < self.MODERATE_LOAD_THRESHOLD:
|
|
cluster_level = LoadLevel.LOW
|
|
elif avg_util < self.HIGH_LOAD_THRESHOLD:
|
|
cluster_level = LoadLevel.MODERATE
|
|
elif avg_util < self.LOAD_THRESHOLD:
|
|
cluster_level = LoadLevel.HIGH
|
|
else:
|
|
cluster_level = LoadLevel.CRITICAL
|
|
|
|
# Auto-scale recommendations
|
|
recommendation = self._get_scaling_recommendation(len(loads), healthy_count, avg_util)
|
|
|
|
return {
|
|
"total_agents": len(loads),
|
|
"healthy_agents": healthy_count,
|
|
"average_utilization": avg_util,
|
|
"cluster_load_level": cluster_level.value,
|
|
"agents": [
|
|
{
|
|
"id": load.agent_id,
|
|
"cpu": load.cpu_percent,
|
|
"memory": load.memory_percent,
|
|
"tasks": load.task_count,
|
|
"healthy": load.is_healthy,
|
|
"utilization": load.utilization_score,
|
|
"level": load.load_level.value,
|
|
}
|
|
for load in sorted(loads, key=lambda x: x.utilization_score, reverse=True)
|
|
],
|
|
"recommendation": recommendation,
|
|
}
|
|
|
|
def check_backpressure(self) -> Tuple[bool, str]:
|
|
"""
|
|
Check if system is under backpressure (overloaded).
|
|
|
|
Returns:
|
|
(is_backpressured, reason)
|
|
"""
|
|
if not self.queue_manager:
|
|
return False, ""
|
|
|
|
queue_stats = self.queue_manager.get_queue_stats()
|
|
cluster = self.get_cluster_load()
|
|
|
|
# Check queue depth
|
|
pending_count = queue_stats.get("pending_count", 0)
|
|
if pending_count > 50:
|
|
return True, f"Queue depth too high ({pending_count} pending)"
|
|
|
|
# Check cluster load
|
|
cluster_util = cluster.get("average_utilization", 0)
|
|
if cluster_util > self.LOAD_THRESHOLD:
|
|
healthy = cluster.get("healthy_agents", 0)
|
|
if healthy < 2: # Not enough agents
|
|
return True, f"Insufficient healthy agents ({healthy}) with {cluster_util:.1%} utilization"
|
|
|
|
return False, ""
|
|
|
|
def should_add_agent(self) -> bool:
|
|
"""Determine if cluster should add more agents."""
|
|
cluster = self.get_cluster_load()
|
|
|
|
avg_util = cluster.get("average_utilization", 0)
|
|
healthy_count = cluster.get("healthy_agents", 0)
|
|
|
|
# Add if: high utilization AND few agents
|
|
return avg_util > 0.70 and healthy_count < 10
|
|
|
|
def should_remove_agent(self) -> Optional[str]:
|
|
"""
|
|
Determine if any agent should be removed.
|
|
|
|
Returns:
|
|
Agent ID to remove or None
|
|
"""
|
|
cluster = self.get_cluster_load()
|
|
|
|
avg_util = cluster.get("average_utilization", 0)
|
|
agents = cluster.get("agents", [])
|
|
|
|
# Only remove if: low overall load AND multiple agents
|
|
if avg_util < 0.30 and len(agents) > 2:
|
|
# Find lowest-load agent
|
|
lowest = min(agents, key=lambda x: x["utilization"])
|
|
if lowest["utilization"] < 0.10:
|
|
return lowest["id"]
|
|
|
|
return None
|
|
|
|
def get_recommendations(self) -> Dict[str, Any]:
|
|
"""Get comprehensive system recommendations."""
|
|
is_backpressured, backpressure_reason = self.check_backpressure()
|
|
should_add = self.should_add_agent()
|
|
should_remove = self.should_remove_agent()
|
|
|
|
recommendations = []
|
|
|
|
if is_backpressured:
|
|
recommendations.append(f"URGENT: Backpressure detected: {backpressure_reason}")
|
|
|
|
if should_add:
|
|
recommendations.append("SCALE UP: Consider adding more agents to handle load")
|
|
|
|
if should_remove:
|
|
recommendations.append(f"SCALE DOWN: Consider removing agent {should_remove} (idle)")
|
|
|
|
cluster = self.get_cluster_load()
|
|
avg_util = cluster.get("average_utilization", 0)
|
|
|
|
if avg_util > 0.85:
|
|
recommendations.append("WARNING: High cluster utilization, monitor for bottlenecks")
|
|
elif avg_util < 0.20:
|
|
recommendations.append("INFO: Low cluster utilization, system is underutilized")
|
|
|
|
return {
|
|
"backpressured": is_backpressured,
|
|
"backpressure_reason": backpressure_reason,
|
|
"should_add_agent": should_add,
|
|
"should_remove_agent": should_remove,
|
|
"recommendations": recommendations,
|
|
"cluster": cluster,
|
|
}
|
|
|
|
# Helper methods
|
|
|
|
def _get_scaling_recommendation(
|
|
self,
|
|
total_agents: int,
|
|
healthy_agents: int,
|
|
avg_utilization: float
|
|
) -> str:
|
|
"""Get scaling recommendation based on metrics."""
|
|
if healthy_agents < total_agents * 0.5:
|
|
return "ALERT: >50% of agents are unhealthy"
|
|
|
|
if avg_utilization > self.LOAD_THRESHOLD:
|
|
return "SCALE UP: Add agents to handle high load"
|
|
|
|
if avg_utilization < 0.20 and total_agents > 2:
|
|
return "SCALE DOWN: Remove idle agents to save resources"
|
|
|
|
return "Cluster is operating normally"
|
|
|
|
def clear_cache(self):
|
|
"""Clear load cache to force refresh on next call."""
|
|
self.load_cache.clear()
|
|
|
|
|
|
# Utility functions for system-level metrics
|
|
|
|
def get_system_load() -> Tuple[float, float]:
|
|
"""
|
|
Get system-level CPU and memory usage.
|
|
|
|
Returns:
|
|
(cpu_percent, memory_percent)
|
|
"""
|
|
try:
|
|
cpu = psutil.cpu_percent(interval=0.1)
|
|
memory = psutil.virtual_memory().percent
|
|
return cpu, memory
|
|
except Exception as e:
|
|
logger.error(f"Error getting system metrics: {e}")
|
|
return 0.0, 0.0
|
|
|
|
|
|
def get_process_load(pid: int) -> Tuple[float, float]:
|
|
"""
|
|
Get CPU and memory usage for a specific process.
|
|
|
|
Args:
|
|
pid: Process ID
|
|
|
|
Returns:
|
|
(cpu_percent, memory_percent)
|
|
"""
|
|
try:
|
|
process = psutil.Process(pid)
|
|
cpu = process.cpu_percent(interval=0.1)
|
|
memory = process.memory_percent()
|
|
return cpu, memory
|
|
except Exception as e:
|
|
logger.error(f"Error getting process {pid} metrics: {e}")
|
|
return 0.0, 0.0
|
|
|
|
|
|
def get_active_agent_count() -> int:
|
|
"""Get count of active agent processes."""
|
|
try:
|
|
# Look for claude or agent processes
|
|
count = 0
|
|
for proc in psutil.process_iter(['name', 'cmdline']):
|
|
try:
|
|
if 'claude' in proc.name().lower() or 'agent' in proc.name().lower():
|
|
count += 1
|
|
except:
|
|
pass
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"Error counting agents: {e}")
|
|
return 0
|
|
|
|
|
|
# Module exports
|
|
__all__ = [
|
|
"LuziaLoadBalancer",
|
|
"AgentLoad",
|
|
"LoadLevel",
|
|
"get_system_load",
|
|
"get_process_load",
|
|
"get_active_agent_count",
|
|
]
|