- New CockpitQueueDispatcher: per-project serialized task queues - LoadMonitor: checks system load/memory before dispatching - Parallel execution across projects with round-robin fairness - CLI commands: cockpit queue, cockpit dispatch Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
428 lines
14 KiB
Python
428 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cockpit Queue Dispatcher - Load-aware background task dispatcher
|
|
|
|
Integrates:
|
|
- ProjectQueueScheduler: Per-project sequential, cross-project parallel
|
|
- Cockpit: Docker-based Claude sessions
|
|
- Load monitoring: Check system resources before dispatching
|
|
|
|
Architecture:
|
|
TaskQueue (per-project)
|
|
↓
|
|
CockpitQueueDispatcher
|
|
├─ Check system load
|
|
├─ Select next task (round-robin)
|
|
└─ Dispatch to cockpit (non-blocking)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import psutil
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
import threading
|
|
import queue
|
|
|
|
# Import scheduler and cockpit
|
|
try:
|
|
from project_queue_scheduler import ProjectQueueScheduler
|
|
from cockpit import (
|
|
cockpit_start, cockpit_dispatch_task, cockpit_status,
|
|
load_state, save_state, container_running, get_container_name
|
|
)
|
|
except ImportError as e:
|
|
raise ImportError(f"Required module not found: {e}")
|
|
|
|
|
|
class LoadMonitor:
|
|
"""Monitor system load for dispatch decisions."""
|
|
|
|
def __init__(self, max_load: float = 4.0, max_memory_pct: float = 85.0):
|
|
"""
|
|
Initialize load monitor.
|
|
|
|
Args:
|
|
max_load: Maximum 1-minute load average (default: 4.0)
|
|
max_memory_pct: Maximum memory usage percent (default: 85%)
|
|
"""
|
|
self.max_load = max_load
|
|
self.max_memory_pct = max_memory_pct
|
|
|
|
def can_dispatch(self) -> tuple[bool, str]:
|
|
"""
|
|
Check if system can handle new task dispatch.
|
|
|
|
Returns:
|
|
Tuple of (can_dispatch: bool, reason: str)
|
|
"""
|
|
# Check load average
|
|
load1, load5, load15 = os.getloadavg()
|
|
if load1 > self.max_load:
|
|
return False, f"Load too high: {load1:.1f} (max: {self.max_load})"
|
|
|
|
# Check memory
|
|
mem = psutil.virtual_memory()
|
|
if mem.percent > self.max_memory_pct:
|
|
return False, f"Memory too high: {mem.percent:.1f}% (max: {self.max_memory_pct}%)"
|
|
|
|
return True, "OK"
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get current system stats."""
|
|
load1, load5, load15 = os.getloadavg()
|
|
mem = psutil.virtual_memory()
|
|
|
|
return {
|
|
"load_1m": load1,
|
|
"load_5m": load5,
|
|
"load_15m": load15,
|
|
"memory_used_pct": mem.percent,
|
|
"memory_available_gb": mem.available / (1024**3),
|
|
"can_dispatch": self.can_dispatch()[0],
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
|
|
class CockpitQueueDispatcher:
|
|
"""
|
|
Background dispatcher for cockpit tasks.
|
|
|
|
Features:
|
|
- Per-project task queues (serialized)
|
|
- Cross-project parallelism
|
|
- Load-aware dispatching
|
|
- Non-blocking dispatch with result tracking
|
|
"""
|
|
|
|
STATE_DIR = Path("/var/lib/luz-orchestrator/dispatcher")
|
|
QUEUE_DIR = Path("/var/lib/luz-orchestrator/task_queue")
|
|
|
|
def __init__(self, config: dict, max_concurrent_projects: int = 4):
|
|
"""
|
|
Initialize dispatcher.
|
|
|
|
Args:
|
|
config: Luzia config dict
|
|
max_concurrent_projects: Max projects running simultaneously
|
|
"""
|
|
self.config = config
|
|
self.max_concurrent = max_concurrent_projects
|
|
self.scheduler = ProjectQueueScheduler()
|
|
self.load_monitor = LoadMonitor()
|
|
|
|
# Ensure directories exist
|
|
self.STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
self.QUEUE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# In-memory task queues per project
|
|
self.project_queues: Dict[str, queue.Queue] = {}
|
|
|
|
# Running dispatchers per project
|
|
self.running_projects: Dict[str, threading.Thread] = {}
|
|
|
|
# Results storage
|
|
self.results: Dict[str, Dict] = {}
|
|
|
|
def enqueue_task(self, project: str, task: str, context: str = "",
|
|
priority: str = "normal") -> str:
|
|
"""
|
|
Add task to project queue.
|
|
|
|
Args:
|
|
project: Target project name
|
|
task: Task description
|
|
context: Project context
|
|
priority: "high" or "normal"
|
|
|
|
Returns:
|
|
task_id for tracking
|
|
"""
|
|
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
|
|
|
|
task_data = {
|
|
"id": task_id,
|
|
"project": project,
|
|
"task": task,
|
|
"context": context,
|
|
"priority": priority,
|
|
"queued_at": datetime.now().isoformat(),
|
|
"status": "pending"
|
|
}
|
|
|
|
# Write to disk queue
|
|
queue_file = self.QUEUE_DIR / project / f"{task_id}.json"
|
|
queue_file.parent.mkdir(parents=True, exist_ok=True)
|
|
queue_file.write_text(json.dumps(task_data, indent=2))
|
|
|
|
# Also add to in-memory queue if exists
|
|
if project not in self.project_queues:
|
|
self.project_queues[project] = queue.Queue()
|
|
self.project_queues[project].put(task_data)
|
|
|
|
return task_id
|
|
|
|
def get_pending_tasks(self, project: str = None) -> List[Dict]:
|
|
"""Get pending tasks, optionally filtered by project."""
|
|
tasks = []
|
|
|
|
if project:
|
|
project_dir = self.QUEUE_DIR / project
|
|
if project_dir.exists():
|
|
for task_file in sorted(project_dir.glob("*.json")):
|
|
try:
|
|
task = json.loads(task_file.read_text())
|
|
if task.get("status") == "pending":
|
|
tasks.append(task)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
else:
|
|
# All projects
|
|
for project_dir in self.QUEUE_DIR.iterdir():
|
|
if project_dir.is_dir():
|
|
for task_file in sorted(project_dir.glob("*.json")):
|
|
try:
|
|
task = json.loads(task_file.read_text())
|
|
if task.get("status") == "pending":
|
|
tasks.append(task)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
return tasks
|
|
|
|
def dispatch_task_async(self, project: str, task_data: Dict) -> threading.Thread:
|
|
"""
|
|
Dispatch a task to cockpit asynchronously.
|
|
|
|
Args:
|
|
project: Project name
|
|
task_data: Task dict with id, task, context
|
|
|
|
Returns:
|
|
Thread running the dispatch
|
|
"""
|
|
def _dispatch():
|
|
task_id = task_data["id"]
|
|
task = task_data["task"]
|
|
context = task_data.get("context", "")
|
|
|
|
try:
|
|
# Update task status
|
|
self._update_task_status(project, task_id, "running")
|
|
|
|
# Dispatch to cockpit (this is blocking within the thread)
|
|
result = cockpit_dispatch_task(
|
|
project=project,
|
|
task=task,
|
|
context=context,
|
|
config=self.config,
|
|
show_output=False, # Don't print, we're async
|
|
timeout=600
|
|
)
|
|
|
|
# Store result
|
|
self.results[task_id] = result
|
|
|
|
# Update status based on result
|
|
if result.get("awaiting_response"):
|
|
self._update_task_status(project, task_id, "awaiting_human")
|
|
elif result.get("timed_out"):
|
|
self._update_task_status(project, task_id, "running")
|
|
else:
|
|
self._update_task_status(project, task_id, "completed")
|
|
# Release project slot
|
|
self.scheduler.release_task(project)
|
|
|
|
except Exception as e:
|
|
self._update_task_status(project, task_id, "failed", str(e))
|
|
self.scheduler.release_task(project)
|
|
|
|
finally:
|
|
# Clean up running tracker
|
|
if project in self.running_projects:
|
|
del self.running_projects[project]
|
|
|
|
thread = threading.Thread(target=_dispatch, daemon=True)
|
|
thread.start()
|
|
self.running_projects[project] = thread
|
|
|
|
return thread
|
|
|
|
def _update_task_status(self, project: str, task_id: str,
|
|
status: str, error: str = None) -> None:
|
|
"""Update task status in queue file."""
|
|
task_file = self.QUEUE_DIR / project / f"{task_id}.json"
|
|
if task_file.exists():
|
|
try:
|
|
task = json.loads(task_file.read_text())
|
|
task["status"] = status
|
|
task["updated_at"] = datetime.now().isoformat()
|
|
if error:
|
|
task["error"] = error
|
|
task_file.write_text(json.dumps(task, indent=2))
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
def run_dispatch_cycle(self) -> Dict[str, Any]:
|
|
"""
|
|
Run one dispatch cycle.
|
|
|
|
Checks load, selects available projects, dispatches tasks.
|
|
|
|
Returns:
|
|
Dict with cycle results
|
|
"""
|
|
cycle_start = datetime.now()
|
|
dispatched = []
|
|
skipped = []
|
|
|
|
# Check system load
|
|
can_dispatch, load_reason = self.load_monitor.can_dispatch()
|
|
if not can_dispatch:
|
|
return {
|
|
"cycle_time": cycle_start.isoformat(),
|
|
"dispatched": [],
|
|
"skipped": [],
|
|
"reason": f"Load check failed: {load_reason}"
|
|
}
|
|
|
|
# Get current running projects
|
|
running_count = len(self.running_projects)
|
|
available_slots = self.max_concurrent - running_count
|
|
|
|
if available_slots <= 0:
|
|
return {
|
|
"cycle_time": cycle_start.isoformat(),
|
|
"dispatched": [],
|
|
"skipped": [],
|
|
"reason": f"No slots available ({running_count}/{self.max_concurrent} running)"
|
|
}
|
|
|
|
# Get pending tasks by project
|
|
pending = self.get_pending_tasks()
|
|
tasks_by_project: Dict[str, List[Dict]] = {}
|
|
for task in pending:
|
|
proj = task["project"]
|
|
if proj not in tasks_by_project:
|
|
tasks_by_project[proj] = []
|
|
tasks_by_project[proj].append(task)
|
|
|
|
# Select projects to dispatch (round-robin)
|
|
for project in sorted(tasks_by_project.keys()):
|
|
if available_slots <= 0:
|
|
break
|
|
|
|
# Skip if project already running
|
|
if project in self.running_projects:
|
|
skipped.append({
|
|
"project": project,
|
|
"reason": "already running"
|
|
})
|
|
continue
|
|
|
|
# Check if cockpit is running
|
|
if not container_running(project):
|
|
# Start cockpit first
|
|
start_result = cockpit_start(project, self.config)
|
|
if not start_result["success"]:
|
|
skipped.append({
|
|
"project": project,
|
|
"reason": f"failed to start cockpit: {start_result['message']}"
|
|
})
|
|
continue
|
|
|
|
# Get first pending task for this project
|
|
task_data = tasks_by_project[project][0]
|
|
|
|
# Claim slot via scheduler
|
|
if not self.scheduler.claim_task(task_data["id"], project):
|
|
skipped.append({
|
|
"project": project,
|
|
"reason": "failed to claim scheduler slot"
|
|
})
|
|
continue
|
|
|
|
# Dispatch async
|
|
self.dispatch_task_async(project, task_data)
|
|
dispatched.append({
|
|
"project": project,
|
|
"task_id": task_data["id"],
|
|
"task": task_data["task"][:50] + "..."
|
|
})
|
|
|
|
available_slots -= 1
|
|
|
|
return {
|
|
"cycle_time": cycle_start.isoformat(),
|
|
"dispatched": dispatched,
|
|
"skipped": skipped,
|
|
"running_count": len(self.running_projects),
|
|
"pending_count": len(pending),
|
|
"load_stats": self.load_monitor.get_stats()
|
|
}
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get dispatcher status."""
|
|
return {
|
|
"running_projects": list(self.running_projects.keys()),
|
|
"running_count": len(self.running_projects),
|
|
"max_concurrent": self.max_concurrent,
|
|
"pending_tasks": len(self.get_pending_tasks()),
|
|
"load_stats": self.load_monitor.get_stats(),
|
|
"scheduler_status": self.scheduler.get_scheduling_status()
|
|
}
|
|
|
|
|
|
def run_dispatcher_daemon(config: dict, interval: int = 10) -> None:
|
|
"""
|
|
Run dispatcher as a daemon.
|
|
|
|
Args:
|
|
config: Luzia config dict
|
|
interval: Seconds between dispatch cycles
|
|
"""
|
|
dispatcher = CockpitQueueDispatcher(config)
|
|
|
|
print(f"[CockpitQueueDispatcher] Started (interval: {interval}s)")
|
|
|
|
while True:
|
|
try:
|
|
result = dispatcher.run_dispatch_cycle()
|
|
|
|
if result.get("dispatched"):
|
|
for d in result["dispatched"]:
|
|
print(f"[DISPATCHED] {d['project']}: {d['task']}")
|
|
|
|
if result.get("reason"):
|
|
print(f"[CYCLE] {result['reason']}")
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Dispatch cycle failed: {e}")
|
|
|
|
time.sleep(interval)
|
|
|
|
|
|
def main():
|
|
"""Test dispatcher."""
|
|
import yaml
|
|
|
|
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
|
|
if config_path.exists():
|
|
config = yaml.safe_load(config_path.read_text())
|
|
else:
|
|
config = {"projects": {}}
|
|
|
|
dispatcher = CockpitQueueDispatcher(config)
|
|
|
|
print("=" * 60)
|
|
print("COCKPIT QUEUE DISPATCHER STATUS")
|
|
print("=" * 60)
|
|
print(json.dumps(dispatcher.get_status(), indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|