Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
657 lines
20 KiB
Python
657 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Luzia Queue Manager - Task Queue Management with Load Awareness
|
|
|
|
Implements:
|
|
- Priority queue for task management
|
|
- Request -> Queue -> Agent workflow
|
|
- Load tracking per agent
|
|
- Health checks for overloaded agents
|
|
- Automatic queue prioritization
|
|
|
|
Features:
|
|
1. SQLite-backed task queue with status tracking
|
|
2. Priority levels: critical, high, normal, low
|
|
3. Per-agent load tracking and health monitoring
|
|
4. Queue statistics and reporting
|
|
5. Graceful backpressure handling
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from enum import Enum
|
|
from dataclasses import dataclass, asdict
|
|
from threading import Lock
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskPriority(Enum):
|
|
"""Task priority levels"""
|
|
CRITICAL = 1
|
|
HIGH = 2
|
|
NORMAL = 3
|
|
LOW = 4
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
"""Task execution status"""
|
|
PENDING = "pending"
|
|
QUEUED = "queued"
|
|
ASSIGNED = "assigned"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
@dataclass
|
|
class QueuedTask:
|
|
"""Task in queue"""
|
|
id: str
|
|
project: str
|
|
task_description: str
|
|
priority: TaskPriority
|
|
status: TaskStatus
|
|
created_at: datetime
|
|
assigned_agent: Optional[str] = None
|
|
assigned_at: Optional[datetime] = None
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
result: Optional[str] = None
|
|
retry_count: int = 0
|
|
max_retries: int = 3
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class LuziaQueueManager:
|
|
"""Manages task queue with load awareness"""
|
|
|
|
def __init__(self, db_path: str = "/opt/server-agents/state/task_queue.db"):
|
|
"""
|
|
Initialize queue manager.
|
|
|
|
Args:
|
|
db_path: Path to task queue SQLite database
|
|
"""
|
|
self.db_path = db_path
|
|
self.lock = Lock()
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize database tables if they don't exist"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Queue table for pending/assigned tasks
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS queue (
|
|
id TEXT PRIMARY KEY,
|
|
project TEXT NOT NULL,
|
|
task_description TEXT NOT NULL,
|
|
priority INTEGER NOT NULL,
|
|
status TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
assigned_agent TEXT,
|
|
assigned_at TEXT,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
result TEXT,
|
|
retry_count INTEGER DEFAULT 0,
|
|
max_retries INTEGER DEFAULT 3,
|
|
metadata TEXT,
|
|
INDEX idx_priority_status (priority, status),
|
|
INDEX idx_project (project),
|
|
INDEX idx_agent (assigned_agent),
|
|
INDEX idx_created (created_at)
|
|
)
|
|
""")
|
|
|
|
# Agent health/load tracking
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS agent_stats (
|
|
agent_id TEXT PRIMARY KEY,
|
|
total_tasks INTEGER DEFAULT 0,
|
|
active_tasks INTEGER DEFAULT 0,
|
|
completed_tasks INTEGER DEFAULT 0,
|
|
failed_tasks INTEGER DEFAULT 0,
|
|
cpu_percent REAL DEFAULT 0.0,
|
|
memory_percent REAL DEFAULT 0.0,
|
|
last_heartbeat TEXT,
|
|
is_healthy INTEGER DEFAULT 1,
|
|
last_updated TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
# Task history for analytics
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS task_history (
|
|
id TEXT PRIMARY KEY,
|
|
project TEXT NOT NULL,
|
|
task_description TEXT NOT NULL,
|
|
priority INTEGER NOT NULL,
|
|
assigned_agent TEXT,
|
|
created_at TEXT NOT NULL,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
duration_seconds REAL,
|
|
status TEXT NOT NULL,
|
|
result TEXT,
|
|
exit_code INTEGER,
|
|
INDEX idx_project_date (project, created_at),
|
|
INDEX idx_agent_date (assigned_agent, completed_at)
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def enqueue_task(
|
|
self,
|
|
project: str,
|
|
task: str,
|
|
priority: TaskPriority = TaskPriority.NORMAL,
|
|
metadata: Optional[Dict] = None,
|
|
) -> str:
|
|
"""
|
|
Add task to queue.
|
|
|
|
Args:
|
|
project: Project name
|
|
task: Task description
|
|
priority: Task priority level
|
|
metadata: Optional metadata dict
|
|
|
|
Returns:
|
|
Task ID
|
|
"""
|
|
task_id = self._generate_task_id(project)
|
|
now = datetime.now().isoformat()
|
|
|
|
with self.lock:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO queue (
|
|
id, project, task_description, priority, status,
|
|
created_at, metadata
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
task_id,
|
|
project,
|
|
task,
|
|
priority.value,
|
|
TaskStatus.PENDING.value,
|
|
now,
|
|
json.dumps(metadata) if metadata else None
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
logger.info(f"Task {task_id} enqueued for {project} with priority {priority.name}")
|
|
return task_id
|
|
|
|
def get_pending_tasks(self, limit: int = 10) -> List[QueuedTask]:
|
|
"""
|
|
Get pending tasks ordered by priority and creation time.
|
|
|
|
Args:
|
|
limit: Max tasks to return
|
|
|
|
Returns:
|
|
List of pending tasks
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, project, task_description, priority, status,
|
|
created_at, assigned_agent, assigned_at, started_at,
|
|
completed_at, result, retry_count, max_retries, metadata
|
|
FROM queue
|
|
WHERE status = ?
|
|
ORDER BY priority ASC, created_at ASC
|
|
LIMIT ?
|
|
""", (TaskStatus.PENDING.value, limit))
|
|
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
return [self._row_to_task(row) for row in rows]
|
|
|
|
def assign_to_agent(self, task_id: str, agent_id: str) -> bool:
|
|
"""
|
|
Assign task to agent.
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
agent_id: Agent identifier
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
now = datetime.now().isoformat()
|
|
|
|
with self.lock:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE queue
|
|
SET status = ?, assigned_agent = ?, assigned_at = ?
|
|
WHERE id = ? AND status = ?
|
|
""", (
|
|
TaskStatus.ASSIGNED.value,
|
|
agent_id,
|
|
now,
|
|
task_id,
|
|
TaskStatus.PENDING.value
|
|
))
|
|
|
|
success = cursor.rowcount > 0
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
if success:
|
|
logger.info(f"Task {task_id} assigned to agent {agent_id}")
|
|
self._update_agent_stats(agent_id, increment_active=1)
|
|
else:
|
|
logger.warning(f"Failed to assign task {task_id} to {agent_id}")
|
|
|
|
return success
|
|
|
|
def mark_running(self, task_id: str) -> bool:
|
|
"""Mark task as running."""
|
|
now = datetime.now().isoformat()
|
|
|
|
with self.lock:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE queue
|
|
SET status = ?, started_at = ?
|
|
WHERE id = ?
|
|
""", (TaskStatus.RUNNING.value, now, task_id))
|
|
|
|
success = cursor.rowcount > 0
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return success
|
|
|
|
def mark_completed(self, task_id: str, result: Optional[str] = None) -> bool:
|
|
"""Mark task as completed."""
|
|
now = datetime.now().isoformat()
|
|
|
|
with self.lock:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Get task details for history
|
|
cursor.execute("SELECT * FROM queue WHERE id = ?", (task_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
conn.close()
|
|
return False
|
|
|
|
cursor.execute("""
|
|
UPDATE queue
|
|
SET status = ?, completed_at = ?, result = ?
|
|
WHERE id = ?
|
|
""", (TaskStatus.COMPLETED.value, now, result, task_id))
|
|
|
|
# Archive to history
|
|
self._archive_task_history(cursor, row, TaskStatus.COMPLETED.value)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
logger.info(f"Task {task_id} marked as completed")
|
|
return True
|
|
|
|
def mark_failed(self, task_id: str, reason: str = "") -> bool:
|
|
"""Mark task as failed and update retry count."""
|
|
now = datetime.now().isoformat()
|
|
|
|
with self.lock:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT retry_count, max_retries, assigned_agent
|
|
FROM queue WHERE id = ?
|
|
""", (task_id,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
return False
|
|
|
|
retry_count, max_retries, agent = row
|
|
|
|
# Check if we should retry
|
|
if retry_count < max_retries:
|
|
# Requeue for retry
|
|
cursor.execute("""
|
|
UPDATE queue
|
|
SET status = ?, retry_count = ?, assigned_agent = NULL,
|
|
assigned_at = NULL
|
|
WHERE id = ?
|
|
""", (TaskStatus.PENDING.value, retry_count + 1, task_id))
|
|
|
|
logger.info(f"Task {task_id} failed, requeuing (retry {retry_count + 1}/{max_retries})")
|
|
else:
|
|
# Mark as permanently failed
|
|
cursor.execute("""
|
|
UPDATE queue
|
|
SET status = ?, completed_at = ?, result = ?
|
|
WHERE id = ?
|
|
""", (TaskStatus.FAILED.value, now, reason, task_id))
|
|
|
|
logger.warning(f"Task {task_id} failed after {max_retries} retries")
|
|
|
|
if agent:
|
|
self._update_agent_stats(agent, increment_active=-1, increment_failed=1)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True
|
|
|
|
def get_queue_stats(self) -> Dict[str, Any]:
|
|
"""Get queue statistics."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Count by status
|
|
cursor.execute("""
|
|
SELECT status, COUNT(*) as count
|
|
FROM queue
|
|
GROUP BY status
|
|
""")
|
|
status_counts = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
# Count by priority
|
|
cursor.execute("""
|
|
SELECT priority, COUNT(*) as count
|
|
FROM queue
|
|
WHERE status IN (?, ?)
|
|
GROUP BY priority
|
|
""", (TaskStatus.PENDING.value, TaskStatus.QUEUED.value))
|
|
priority_counts = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
# Count by project
|
|
cursor.execute("""
|
|
SELECT project, COUNT(*) as count
|
|
FROM queue
|
|
WHERE status NOT IN (?, ?)
|
|
GROUP BY project
|
|
""", (TaskStatus.COMPLETED.value, TaskStatus.FAILED.value))
|
|
project_counts = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
# Age of oldest pending task
|
|
cursor.execute("""
|
|
SELECT created_at FROM queue
|
|
WHERE status = ?
|
|
ORDER BY created_at ASC
|
|
LIMIT 1
|
|
""", (TaskStatus.PENDING.value,))
|
|
|
|
oldest = cursor.fetchone()
|
|
oldest_age = None
|
|
if oldest:
|
|
created = datetime.fromisoformat(oldest[0])
|
|
oldest_age = (datetime.now() - created).total_seconds()
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"by_status": status_counts,
|
|
"by_priority": priority_counts,
|
|
"by_project": project_counts,
|
|
"oldest_pending_age_seconds": oldest_age,
|
|
"total_tasks": sum(status_counts.values()),
|
|
"pending_count": status_counts.get(TaskStatus.PENDING.value, 0),
|
|
"active_count": status_counts.get(TaskStatus.RUNNING.value, 0),
|
|
}
|
|
|
|
def update_agent_health(
|
|
self,
|
|
agent_id: str,
|
|
cpu_percent: float,
|
|
memory_percent: float,
|
|
active_tasks: int,
|
|
) -> None:
|
|
"""Update agent health metrics."""
|
|
now = datetime.now().isoformat()
|
|
|
|
with self.lock:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Check if agent exists
|
|
cursor.execute("SELECT 1 FROM agent_stats WHERE agent_id = ?", (agent_id,))
|
|
exists = cursor.fetchone()
|
|
|
|
if exists:
|
|
cursor.execute("""
|
|
UPDATE agent_stats
|
|
SET cpu_percent = ?, memory_percent = ?, active_tasks = ?,
|
|
last_heartbeat = ?, last_updated = ?
|
|
WHERE agent_id = ?
|
|
""", (cpu_percent, memory_percent, active_tasks, now, now, agent_id))
|
|
else:
|
|
cursor.execute("""
|
|
INSERT INTO agent_stats
|
|
(agent_id, cpu_percent, memory_percent, active_tasks,
|
|
last_heartbeat, last_updated)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (agent_id, cpu_percent, memory_percent, active_tasks, now, now))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_agent_stats(self, agent_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get agent statistics."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT agent_id, total_tasks, active_tasks, completed_tasks,
|
|
failed_tasks, cpu_percent, memory_percent, last_heartbeat,
|
|
is_healthy
|
|
FROM agent_stats
|
|
WHERE agent_id = ?
|
|
""", (agent_id,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
return {
|
|
"agent_id": row[0],
|
|
"total_tasks": row[1],
|
|
"active_tasks": row[2],
|
|
"completed_tasks": row[3],
|
|
"failed_tasks": row[4],
|
|
"cpu_percent": row[5],
|
|
"memory_percent": row[6],
|
|
"last_heartbeat": row[7],
|
|
"is_healthy": bool(row[8]),
|
|
}
|
|
|
|
def get_all_agent_stats(self) -> List[Dict[str, Any]]:
|
|
"""Get statistics for all agents."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT agent_id, total_tasks, active_tasks, completed_tasks,
|
|
failed_tasks, cpu_percent, memory_percent, last_heartbeat,
|
|
is_healthy
|
|
FROM agent_stats
|
|
ORDER BY active_tasks DESC
|
|
""")
|
|
|
|
stats = []
|
|
for row in cursor.fetchall():
|
|
stats.append({
|
|
"agent_id": row[0],
|
|
"total_tasks": row[1],
|
|
"active_tasks": row[2],
|
|
"completed_tasks": row[3],
|
|
"failed_tasks": row[4],
|
|
"cpu_percent": row[5],
|
|
"memory_percent": row[6],
|
|
"last_heartbeat": row[7],
|
|
"is_healthy": bool(row[8]),
|
|
})
|
|
|
|
conn.close()
|
|
return stats
|
|
|
|
def check_agent_health(self, timeout_seconds: int = 60) -> Dict[str, bool]:
|
|
"""Check health of all agents based on heartbeat."""
|
|
now = datetime.now()
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT agent_id, last_heartbeat FROM agent_stats")
|
|
health_status = {}
|
|
|
|
for agent_id, last_hb in cursor.fetchall():
|
|
if not last_hb:
|
|
health_status[agent_id] = False
|
|
else:
|
|
last_beat = datetime.fromisoformat(last_hb)
|
|
is_healthy = (now - last_beat).total_seconds() < timeout_seconds
|
|
health_status[agent_id] = is_healthy
|
|
|
|
# Update is_healthy flag
|
|
cursor.execute(
|
|
"UPDATE agent_stats SET is_healthy = ? WHERE agent_id = ?",
|
|
(int(is_healthy), agent_id)
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return health_status
|
|
|
|
# Helper methods
|
|
|
|
@staticmethod
|
|
def _generate_task_id(project: str) -> str:
|
|
"""Generate unique task ID."""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
import uuid
|
|
unique = str(uuid.uuid4())[:8]
|
|
return f"{project}_{timestamp}_{unique}"
|
|
|
|
def _row_to_task(self, row: Tuple) -> QueuedTask:
|
|
"""Convert database row to QueuedTask object."""
|
|
return QueuedTask(
|
|
id=row[0],
|
|
project=row[1],
|
|
task_description=row[2],
|
|
priority=TaskPriority(row[3]),
|
|
status=TaskStatus(row[4]),
|
|
created_at=datetime.fromisoformat(row[5]),
|
|
assigned_agent=row[6],
|
|
assigned_at=datetime.fromisoformat(row[7]) if row[7] else None,
|
|
started_at=datetime.fromisoformat(row[8]) if row[8] else None,
|
|
completed_at=datetime.fromisoformat(row[9]) if row[9] else None,
|
|
result=row[10],
|
|
retry_count=row[11],
|
|
max_retries=row[12],
|
|
metadata=json.loads(row[13]) if row[13] else None,
|
|
)
|
|
|
|
def _update_agent_stats(
|
|
self,
|
|
agent_id: str,
|
|
increment_active: int = 0,
|
|
increment_completed: int = 0,
|
|
increment_failed: int = 0,
|
|
):
|
|
"""Update agent statistics counters."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE agent_stats
|
|
SET active_tasks = MAX(0, active_tasks + ?),
|
|
completed_tasks = completed_tasks + ?,
|
|
failed_tasks = failed_tasks + ?,
|
|
last_updated = ?
|
|
WHERE agent_id = ?
|
|
""", (
|
|
increment_active,
|
|
increment_completed,
|
|
increment_failed,
|
|
datetime.now().isoformat(),
|
|
agent_id,
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def _archive_task_history(self, cursor, row: Tuple, final_status: str):
|
|
"""Archive completed task to history."""
|
|
# row format: id, project, task_description, priority, status,
|
|
# created_at, assigned_agent, assigned_at, started_at,
|
|
# completed_at, result, retry_count, max_retries, metadata
|
|
|
|
task_id = row[0]
|
|
project = row[1]
|
|
task_desc = row[2]
|
|
priority = row[3]
|
|
agent = row[6]
|
|
created_at = row[5]
|
|
started_at = row[8]
|
|
completed_at = row[9]
|
|
|
|
duration = None
|
|
if started_at and completed_at:
|
|
start = datetime.fromisoformat(started_at)
|
|
end = datetime.fromisoformat(completed_at)
|
|
duration = (end - start).total_seconds()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO task_history
|
|
(id, project, task_description, priority, assigned_agent,
|
|
created_at, started_at, completed_at, duration_seconds, status, result)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
task_id,
|
|
project,
|
|
task_desc,
|
|
priority,
|
|
agent,
|
|
created_at,
|
|
started_at,
|
|
completed_at,
|
|
duration,
|
|
final_status,
|
|
row[10], # result
|
|
))
|
|
|
|
|
|
# Module exports
|
|
__all__ = [
|
|
"LuziaQueueManager",
|
|
"QueuedTask",
|
|
"TaskPriority",
|
|
"TaskStatus",
|
|
]
|