#!/usr/bin/env python3 """ Luzia Queue Manager - Task Queue Management with Load Awareness Implements: - Priority queue for task management - Request -> Queue -> Agent workflow - Load tracking per agent - Health checks for overloaded agents - Automatic queue prioritization Features: 1. SQLite-backed task queue with status tracking 2. Priority levels: critical, high, normal, low 3. Per-agent load tracking and health monitoring 4. Queue statistics and reporting 5. Graceful backpressure handling """ import sqlite3 import json import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from enum import Enum from dataclasses import dataclass, asdict from threading import Lock import logging logger = logging.getLogger(__name__) class TaskPriority(Enum): """Task priority levels""" CRITICAL = 1 HIGH = 2 NORMAL = 3 LOW = 4 class TaskStatus(Enum): """Task execution status""" PENDING = "pending" QUEUED = "queued" ASSIGNED = "assigned" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class QueuedTask: """Task in queue""" id: str project: str task_description: str priority: TaskPriority status: TaskStatus created_at: datetime assigned_agent: Optional[str] = None assigned_at: Optional[datetime] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None result: Optional[str] = None retry_count: int = 0 max_retries: int = 3 metadata: Optional[Dict[str, Any]] = None class LuziaQueueManager: """Manages task queue with load awareness""" def __init__(self, db_path: str = "/opt/server-agents/state/task_queue.db"): """ Initialize queue manager. Args: db_path: Path to task queue SQLite database """ self.db_path = db_path self.lock = Lock() self._init_db() def _init_db(self): """Initialize database tables if they don't exist""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Queue table for pending/assigned tasks cursor.execute(""" CREATE TABLE IF NOT EXISTS queue ( id TEXT PRIMARY KEY, project TEXT NOT NULL, task_description TEXT NOT NULL, priority INTEGER NOT NULL, status TEXT NOT NULL, created_at TEXT NOT NULL, assigned_agent TEXT, assigned_at TEXT, started_at TEXT, completed_at TEXT, result TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, metadata TEXT, INDEX idx_priority_status (priority, status), INDEX idx_project (project), INDEX idx_agent (assigned_agent), INDEX idx_created (created_at) ) """) # Agent health/load tracking cursor.execute(""" CREATE TABLE IF NOT EXISTS agent_stats ( agent_id TEXT PRIMARY KEY, total_tasks INTEGER DEFAULT 0, active_tasks INTEGER DEFAULT 0, completed_tasks INTEGER DEFAULT 0, failed_tasks INTEGER DEFAULT 0, cpu_percent REAL DEFAULT 0.0, memory_percent REAL DEFAULT 0.0, last_heartbeat TEXT, is_healthy INTEGER DEFAULT 1, last_updated TEXT NOT NULL ) """) # Task history for analytics cursor.execute(""" CREATE TABLE IF NOT EXISTS task_history ( id TEXT PRIMARY KEY, project TEXT NOT NULL, task_description TEXT NOT NULL, priority INTEGER NOT NULL, assigned_agent TEXT, created_at TEXT NOT NULL, started_at TEXT, completed_at TEXT, duration_seconds REAL, status TEXT NOT NULL, result TEXT, exit_code INTEGER, INDEX idx_project_date (project, created_at), INDEX idx_agent_date (assigned_agent, completed_at) ) """) conn.commit() conn.close() def enqueue_task( self, project: str, task: str, priority: TaskPriority = TaskPriority.NORMAL, metadata: Optional[Dict] = None, ) -> str: """ Add task to queue. Args: project: Project name task: Task description priority: Task priority level metadata: Optional metadata dict Returns: Task ID """ task_id = self._generate_task_id(project) now = datetime.now().isoformat() with self.lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT INTO queue ( id, project, task_description, priority, status, created_at, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( task_id, project, task, priority.value, TaskStatus.PENDING.value, now, json.dumps(metadata) if metadata else None )) conn.commit() conn.close() logger.info(f"Task {task_id} enqueued for {project} with priority {priority.name}") return task_id def get_pending_tasks(self, limit: int = 10) -> List[QueuedTask]: """ Get pending tasks ordered by priority and creation time. Args: limit: Max tasks to return Returns: List of pending tasks """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT id, project, task_description, priority, status, created_at, assigned_agent, assigned_at, started_at, completed_at, result, retry_count, max_retries, metadata FROM queue WHERE status = ? ORDER BY priority ASC, created_at ASC LIMIT ? """, (TaskStatus.PENDING.value, limit)) rows = cursor.fetchall() conn.close() return [self._row_to_task(row) for row in rows] def assign_to_agent(self, task_id: str, agent_id: str) -> bool: """ Assign task to agent. Args: task_id: Task ID agent_id: Agent identifier Returns: True if successful """ now = datetime.now().isoformat() with self.lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE queue SET status = ?, assigned_agent = ?, assigned_at = ? WHERE id = ? AND status = ? """, ( TaskStatus.ASSIGNED.value, agent_id, now, task_id, TaskStatus.PENDING.value )) success = cursor.rowcount > 0 conn.commit() conn.close() if success: logger.info(f"Task {task_id} assigned to agent {agent_id}") self._update_agent_stats(agent_id, increment_active=1) else: logger.warning(f"Failed to assign task {task_id} to {agent_id}") return success def mark_running(self, task_id: str) -> bool: """Mark task as running.""" now = datetime.now().isoformat() with self.lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE queue SET status = ?, started_at = ? WHERE id = ? """, (TaskStatus.RUNNING.value, now, task_id)) success = cursor.rowcount > 0 conn.commit() conn.close() return success def mark_completed(self, task_id: str, result: Optional[str] = None) -> bool: """Mark task as completed.""" now = datetime.now().isoformat() with self.lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get task details for history cursor.execute("SELECT * FROM queue WHERE id = ?", (task_id,)) row = cursor.fetchone() if not row: conn.close() return False cursor.execute(""" UPDATE queue SET status = ?, completed_at = ?, result = ? WHERE id = ? """, (TaskStatus.COMPLETED.value, now, result, task_id)) # Archive to history self._archive_task_history(cursor, row, TaskStatus.COMPLETED.value) conn.commit() conn.close() logger.info(f"Task {task_id} marked as completed") return True def mark_failed(self, task_id: str, reason: str = "") -> bool: """Mark task as failed and update retry count.""" now = datetime.now().isoformat() with self.lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT retry_count, max_retries, assigned_agent FROM queue WHERE id = ? """, (task_id,)) row = cursor.fetchone() if not row: conn.close() return False retry_count, max_retries, agent = row # Check if we should retry if retry_count < max_retries: # Requeue for retry cursor.execute(""" UPDATE queue SET status = ?, retry_count = ?, assigned_agent = NULL, assigned_at = NULL WHERE id = ? """, (TaskStatus.PENDING.value, retry_count + 1, task_id)) logger.info(f"Task {task_id} failed, requeuing (retry {retry_count + 1}/{max_retries})") else: # Mark as permanently failed cursor.execute(""" UPDATE queue SET status = ?, completed_at = ?, result = ? WHERE id = ? """, (TaskStatus.FAILED.value, now, reason, task_id)) logger.warning(f"Task {task_id} failed after {max_retries} retries") if agent: self._update_agent_stats(agent, increment_active=-1, increment_failed=1) conn.commit() conn.close() return True def get_queue_stats(self) -> Dict[str, Any]: """Get queue statistics.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Count by status cursor.execute(""" SELECT status, COUNT(*) as count FROM queue GROUP BY status """) status_counts = {row[0]: row[1] for row in cursor.fetchall()} # Count by priority cursor.execute(""" SELECT priority, COUNT(*) as count FROM queue WHERE status IN (?, ?) GROUP BY priority """, (TaskStatus.PENDING.value, TaskStatus.QUEUED.value)) priority_counts = {row[0]: row[1] for row in cursor.fetchall()} # Count by project cursor.execute(""" SELECT project, COUNT(*) as count FROM queue WHERE status NOT IN (?, ?) GROUP BY project """, (TaskStatus.COMPLETED.value, TaskStatus.FAILED.value)) project_counts = {row[0]: row[1] for row in cursor.fetchall()} # Age of oldest pending task cursor.execute(""" SELECT created_at FROM queue WHERE status = ? ORDER BY created_at ASC LIMIT 1 """, (TaskStatus.PENDING.value,)) oldest = cursor.fetchone() oldest_age = None if oldest: created = datetime.fromisoformat(oldest[0]) oldest_age = (datetime.now() - created).total_seconds() conn.close() return { "by_status": status_counts, "by_priority": priority_counts, "by_project": project_counts, "oldest_pending_age_seconds": oldest_age, "total_tasks": sum(status_counts.values()), "pending_count": status_counts.get(TaskStatus.PENDING.value, 0), "active_count": status_counts.get(TaskStatus.RUNNING.value, 0), } def update_agent_health( self, agent_id: str, cpu_percent: float, memory_percent: float, active_tasks: int, ) -> None: """Update agent health metrics.""" now = datetime.now().isoformat() with self.lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check if agent exists cursor.execute("SELECT 1 FROM agent_stats WHERE agent_id = ?", (agent_id,)) exists = cursor.fetchone() if exists: cursor.execute(""" UPDATE agent_stats SET cpu_percent = ?, memory_percent = ?, active_tasks = ?, last_heartbeat = ?, last_updated = ? WHERE agent_id = ? """, (cpu_percent, memory_percent, active_tasks, now, now, agent_id)) else: cursor.execute(""" INSERT INTO agent_stats (agent_id, cpu_percent, memory_percent, active_tasks, last_heartbeat, last_updated) VALUES (?, ?, ?, ?, ?, ?) """, (agent_id, cpu_percent, memory_percent, active_tasks, now, now)) conn.commit() conn.close() def get_agent_stats(self, agent_id: str) -> Optional[Dict[str, Any]]: """Get agent statistics.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT agent_id, total_tasks, active_tasks, completed_tasks, failed_tasks, cpu_percent, memory_percent, last_heartbeat, is_healthy FROM agent_stats WHERE agent_id = ? """, (agent_id,)) row = cursor.fetchone() conn.close() if not row: return None return { "agent_id": row[0], "total_tasks": row[1], "active_tasks": row[2], "completed_tasks": row[3], "failed_tasks": row[4], "cpu_percent": row[5], "memory_percent": row[6], "last_heartbeat": row[7], "is_healthy": bool(row[8]), } def get_all_agent_stats(self) -> List[Dict[str, Any]]: """Get statistics for all agents.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT agent_id, total_tasks, active_tasks, completed_tasks, failed_tasks, cpu_percent, memory_percent, last_heartbeat, is_healthy FROM agent_stats ORDER BY active_tasks DESC """) stats = [] for row in cursor.fetchall(): stats.append({ "agent_id": row[0], "total_tasks": row[1], "active_tasks": row[2], "completed_tasks": row[3], "failed_tasks": row[4], "cpu_percent": row[5], "memory_percent": row[6], "last_heartbeat": row[7], "is_healthy": bool(row[8]), }) conn.close() return stats def check_agent_health(self, timeout_seconds: int = 60) -> Dict[str, bool]: """Check health of all agents based on heartbeat.""" now = datetime.now() conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT agent_id, last_heartbeat FROM agent_stats") health_status = {} for agent_id, last_hb in cursor.fetchall(): if not last_hb: health_status[agent_id] = False else: last_beat = datetime.fromisoformat(last_hb) is_healthy = (now - last_beat).total_seconds() < timeout_seconds health_status[agent_id] = is_healthy # Update is_healthy flag cursor.execute( "UPDATE agent_stats SET is_healthy = ? WHERE agent_id = ?", (int(is_healthy), agent_id) ) conn.commit() conn.close() return health_status # Helper methods @staticmethod def _generate_task_id(project: str) -> str: """Generate unique task ID.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") import uuid unique = str(uuid.uuid4())[:8] return f"{project}_{timestamp}_{unique}" def _row_to_task(self, row: Tuple) -> QueuedTask: """Convert database row to QueuedTask object.""" return QueuedTask( id=row[0], project=row[1], task_description=row[2], priority=TaskPriority(row[3]), status=TaskStatus(row[4]), created_at=datetime.fromisoformat(row[5]), assigned_agent=row[6], assigned_at=datetime.fromisoformat(row[7]) if row[7] else None, started_at=datetime.fromisoformat(row[8]) if row[8] else None, completed_at=datetime.fromisoformat(row[9]) if row[9] else None, result=row[10], retry_count=row[11], max_retries=row[12], metadata=json.loads(row[13]) if row[13] else None, ) def _update_agent_stats( self, agent_id: str, increment_active: int = 0, increment_completed: int = 0, increment_failed: int = 0, ): """Update agent statistics counters.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE agent_stats SET active_tasks = MAX(0, active_tasks + ?), completed_tasks = completed_tasks + ?, failed_tasks = failed_tasks + ?, last_updated = ? WHERE agent_id = ? """, ( increment_active, increment_completed, increment_failed, datetime.now().isoformat(), agent_id, )) conn.commit() conn.close() def _archive_task_history(self, cursor, row: Tuple, final_status: str): """Archive completed task to history.""" # row format: id, project, task_description, priority, status, # created_at, assigned_agent, assigned_at, started_at, # completed_at, result, retry_count, max_retries, metadata task_id = row[0] project = row[1] task_desc = row[2] priority = row[3] agent = row[6] created_at = row[5] started_at = row[8] completed_at = row[9] duration = None if started_at and completed_at: start = datetime.fromisoformat(started_at) end = datetime.fromisoformat(completed_at) duration = (end - start).total_seconds() cursor.execute(""" INSERT INTO task_history (id, project, task_description, priority, assigned_agent, created_at, started_at, completed_at, duration_seconds, status, result) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, project, task_desc, priority, agent, created_at, started_at, completed_at, duration, final_status, row[10], # result )) # Module exports __all__ = [ "LuziaQueueManager", "QueuedTask", "TaskPriority", "TaskStatus", ]