diff --git a/lib/__pycache__/cockpit.cpython-310.pyc b/lib/__pycache__/cockpit.cpython-310.pyc index 90774eb..fecb15c 100644 Binary files a/lib/__pycache__/cockpit.cpython-310.pyc and b/lib/__pycache__/cockpit.cpython-310.pyc differ diff --git a/lib/__pycache__/cockpit_queue_dispatcher.cpython-310.pyc b/lib/__pycache__/cockpit_queue_dispatcher.cpython-310.pyc new file mode 100644 index 0000000..e3ac014 Binary files /dev/null and b/lib/__pycache__/cockpit_queue_dispatcher.cpython-310.pyc differ diff --git a/lib/cockpit.py b/lib/cockpit.py index ed6fb20..13fd461 100644 --- a/lib/cockpit.py +++ b/lib/cockpit.py @@ -695,6 +695,71 @@ def cockpit_attach_cmd(project: str) -> str: return f"docker exec -it {container_name} tmux attach-session -t agent" +def cockpit_queue_task(project: str, task: str, context: str = "", + priority: str = "normal") -> Dict: + """ + Queue a task for background dispatch. + + Tasks are queued per-project and dispatched serially within each project, + but in parallel across projects (with load awareness). + + Args: + project: Target project name + task: Task description + context: Project context + priority: "high" or "normal" + + Returns: {"success": bool, "task_id": str, "message": str} + """ + try: + from cockpit_queue_dispatcher import CockpitQueueDispatcher + import yaml + + config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") + if config_path.exists(): + config = yaml.safe_load(config_path.read_text()) + else: + config = {"projects": {}} + + dispatcher = CockpitQueueDispatcher(config) + task_id = dispatcher.enqueue_task(project, task, context, priority) + + return { + "success": True, + "task_id": task_id, + "message": f"Task queued for {project}", + "queue_position": len(dispatcher.get_pending_tasks(project)) + } + except ImportError: + return {"success": False, "message": "Queue dispatcher not available"} + except Exception as e: + return {"success": False, "message": str(e)} + + +def cockpit_queue_status() -> Dict: + """ + Get status of the task queue and dispatcher. + + Returns: {"success": bool, "status": dict} + """ + try: + from cockpit_queue_dispatcher import CockpitQueueDispatcher + import yaml + + config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") + if config_path.exists(): + config = yaml.safe_load(config_path.read_text()) + else: + config = {"projects": {}} + + dispatcher = CockpitQueueDispatcher(config) + return {"success": True, "status": dispatcher.get_status()} + except ImportError: + return {"success": False, "message": "Queue dispatcher not available"} + except Exception as e: + return {"success": False, "message": str(e)} + + def cockpit_dispatch_task(project: str, task: str, context: str, config: dict, show_output: bool = True, timeout: int = 600) -> Dict: """ @@ -1057,6 +1122,11 @@ def route_cockpit(config: dict, args: list, kwargs: dict) -> int: print(" output Get recent output") print(" status [project] Show cockpit status") print(" attach Show attach command") + print("") + print("Queue commands (per-project serialized, parallel across projects):") + print(" queue Queue task for background dispatch") + print(" queue --status Show dispatcher status") + print(" dispatch Run one dispatch cycle") return 0 subcommand = args[0] @@ -1161,5 +1231,49 @@ def route_cockpit(config: dict, args: list, kwargs: dict) -> int: print(f" {cmd}") return 0 + if subcommand == "queue": + if len(subargs) < 2: + print("Usage: luzia cockpit queue ") + print(" luzia cockpit queue --status") + return 1 + if subargs[0] == "--status": + result = cockpit_queue_status() + if result["success"]: + print(json.dumps(result["status"], indent=2)) + return 0 + print(f"Error: {result['message']}") + return 1 + + project = subargs[0] + task = " ".join(subargs[1:]) + result = cockpit_queue_task(project, task) + if result["success"]: + print(f"OK: {result['message']}") + print(f" Task ID: {result['task_id']}") + print(f" Queue position: {result.get('queue_position', 'unknown')}") + return 0 + print(f"Error: {result['message']}") + return 1 + + if subcommand == "dispatch": + # Run one dispatch cycle + try: + from cockpit_queue_dispatcher import CockpitQueueDispatcher + import yaml + + config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") + if config_path.exists(): + cfg = yaml.safe_load(config_path.read_text()) + else: + cfg = config + + dispatcher = CockpitQueueDispatcher(cfg) + result = dispatcher.run_dispatch_cycle() + print(json.dumps(result, indent=2)) + return 0 + except Exception as e: + print(f"Error: {e}") + return 1 + print(f"Unknown subcommand: {subcommand}") return 1 diff --git a/lib/cockpit_queue_dispatcher.py b/lib/cockpit_queue_dispatcher.py new file mode 100644 index 0000000..2acbe7f --- /dev/null +++ b/lib/cockpit_queue_dispatcher.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python3 +""" +Cockpit Queue Dispatcher - Load-aware background task dispatcher + +Integrates: +- ProjectQueueScheduler: Per-project sequential, cross-project parallel +- Cockpit: Docker-based Claude sessions +- Load monitoring: Check system resources before dispatching + +Architecture: + TaskQueue (per-project) + ↓ + CockpitQueueDispatcher + ├─ Check system load + ├─ Select next task (round-robin) + └─ Dispatch to cockpit (non-blocking) +""" + +import json +import os +import psutil +import subprocess +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Any +import threading +import queue + +# Import scheduler and cockpit +try: + from project_queue_scheduler import ProjectQueueScheduler + from cockpit import ( + cockpit_start, cockpit_dispatch_task, cockpit_status, + load_state, save_state, container_running, get_container_name + ) +except ImportError as e: + raise ImportError(f"Required module not found: {e}") + + +class LoadMonitor: + """Monitor system load for dispatch decisions.""" + + def __init__(self, max_load: float = 4.0, max_memory_pct: float = 85.0): + """ + Initialize load monitor. + + Args: + max_load: Maximum 1-minute load average (default: 4.0) + max_memory_pct: Maximum memory usage percent (default: 85%) + """ + self.max_load = max_load + self.max_memory_pct = max_memory_pct + + def can_dispatch(self) -> tuple[bool, str]: + """ + Check if system can handle new task dispatch. + + Returns: + Tuple of (can_dispatch: bool, reason: str) + """ + # Check load average + load1, load5, load15 = os.getloadavg() + if load1 > self.max_load: + return False, f"Load too high: {load1:.1f} (max: {self.max_load})" + + # Check memory + mem = psutil.virtual_memory() + if mem.percent > self.max_memory_pct: + return False, f"Memory too high: {mem.percent:.1f}% (max: {self.max_memory_pct}%)" + + return True, "OK" + + def get_stats(self) -> Dict[str, Any]: + """Get current system stats.""" + load1, load5, load15 = os.getloadavg() + mem = psutil.virtual_memory() + + return { + "load_1m": load1, + "load_5m": load5, + "load_15m": load15, + "memory_used_pct": mem.percent, + "memory_available_gb": mem.available / (1024**3), + "can_dispatch": self.can_dispatch()[0], + "timestamp": datetime.now().isoformat() + } + + +class CockpitQueueDispatcher: + """ + Background dispatcher for cockpit tasks. + + Features: + - Per-project task queues (serialized) + - Cross-project parallelism + - Load-aware dispatching + - Non-blocking dispatch with result tracking + """ + + STATE_DIR = Path("/var/lib/luz-orchestrator/dispatcher") + QUEUE_DIR = Path("/var/lib/luz-orchestrator/task_queue") + + def __init__(self, config: dict, max_concurrent_projects: int = 4): + """ + Initialize dispatcher. + + Args: + config: Luzia config dict + max_concurrent_projects: Max projects running simultaneously + """ + self.config = config + self.max_concurrent = max_concurrent_projects + self.scheduler = ProjectQueueScheduler() + self.load_monitor = LoadMonitor() + + # Ensure directories exist + self.STATE_DIR.mkdir(parents=True, exist_ok=True) + self.QUEUE_DIR.mkdir(parents=True, exist_ok=True) + + # In-memory task queues per project + self.project_queues: Dict[str, queue.Queue] = {} + + # Running dispatchers per project + self.running_projects: Dict[str, threading.Thread] = {} + + # Results storage + self.results: Dict[str, Dict] = {} + + def enqueue_task(self, project: str, task: str, context: str = "", + priority: str = "normal") -> str: + """ + Add task to project queue. + + Args: + project: Target project name + task: Task description + context: Project context + priority: "high" or "normal" + + Returns: + task_id for tracking + """ + task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:] + + task_data = { + "id": task_id, + "project": project, + "task": task, + "context": context, + "priority": priority, + "queued_at": datetime.now().isoformat(), + "status": "pending" + } + + # Write to disk queue + queue_file = self.QUEUE_DIR / project / f"{task_id}.json" + queue_file.parent.mkdir(parents=True, exist_ok=True) + queue_file.write_text(json.dumps(task_data, indent=2)) + + # Also add to in-memory queue if exists + if project not in self.project_queues: + self.project_queues[project] = queue.Queue() + self.project_queues[project].put(task_data) + + return task_id + + def get_pending_tasks(self, project: str = None) -> List[Dict]: + """Get pending tasks, optionally filtered by project.""" + tasks = [] + + if project: + project_dir = self.QUEUE_DIR / project + if project_dir.exists(): + for task_file in sorted(project_dir.glob("*.json")): + try: + task = json.loads(task_file.read_text()) + if task.get("status") == "pending": + tasks.append(task) + except (json.JSONDecodeError, IOError): + pass + else: + # All projects + for project_dir in self.QUEUE_DIR.iterdir(): + if project_dir.is_dir(): + for task_file in sorted(project_dir.glob("*.json")): + try: + task = json.loads(task_file.read_text()) + if task.get("status") == "pending": + tasks.append(task) + except (json.JSONDecodeError, IOError): + pass + + return tasks + + def dispatch_task_async(self, project: str, task_data: Dict) -> threading.Thread: + """ + Dispatch a task to cockpit asynchronously. + + Args: + project: Project name + task_data: Task dict with id, task, context + + Returns: + Thread running the dispatch + """ + def _dispatch(): + task_id = task_data["id"] + task = task_data["task"] + context = task_data.get("context", "") + + try: + # Update task status + self._update_task_status(project, task_id, "running") + + # Dispatch to cockpit (this is blocking within the thread) + result = cockpit_dispatch_task( + project=project, + task=task, + context=context, + config=self.config, + show_output=False, # Don't print, we're async + timeout=600 + ) + + # Store result + self.results[task_id] = result + + # Update status based on result + if result.get("awaiting_response"): + self._update_task_status(project, task_id, "awaiting_human") + elif result.get("timed_out"): + self._update_task_status(project, task_id, "running") + else: + self._update_task_status(project, task_id, "completed") + # Release project slot + self.scheduler.release_task(project) + + except Exception as e: + self._update_task_status(project, task_id, "failed", str(e)) + self.scheduler.release_task(project) + + finally: + # Clean up running tracker + if project in self.running_projects: + del self.running_projects[project] + + thread = threading.Thread(target=_dispatch, daemon=True) + thread.start() + self.running_projects[project] = thread + + return thread + + def _update_task_status(self, project: str, task_id: str, + status: str, error: str = None) -> None: + """Update task status in queue file.""" + task_file = self.QUEUE_DIR / project / f"{task_id}.json" + if task_file.exists(): + try: + task = json.loads(task_file.read_text()) + task["status"] = status + task["updated_at"] = datetime.now().isoformat() + if error: + task["error"] = error + task_file.write_text(json.dumps(task, indent=2)) + except (json.JSONDecodeError, IOError): + pass + + def run_dispatch_cycle(self) -> Dict[str, Any]: + """ + Run one dispatch cycle. + + Checks load, selects available projects, dispatches tasks. + + Returns: + Dict with cycle results + """ + cycle_start = datetime.now() + dispatched = [] + skipped = [] + + # Check system load + can_dispatch, load_reason = self.load_monitor.can_dispatch() + if not can_dispatch: + return { + "cycle_time": cycle_start.isoformat(), + "dispatched": [], + "skipped": [], + "reason": f"Load check failed: {load_reason}" + } + + # Get current running projects + running_count = len(self.running_projects) + available_slots = self.max_concurrent - running_count + + if available_slots <= 0: + return { + "cycle_time": cycle_start.isoformat(), + "dispatched": [], + "skipped": [], + "reason": f"No slots available ({running_count}/{self.max_concurrent} running)" + } + + # Get pending tasks by project + pending = self.get_pending_tasks() + tasks_by_project: Dict[str, List[Dict]] = {} + for task in pending: + proj = task["project"] + if proj not in tasks_by_project: + tasks_by_project[proj] = [] + tasks_by_project[proj].append(task) + + # Select projects to dispatch (round-robin) + for project in sorted(tasks_by_project.keys()): + if available_slots <= 0: + break + + # Skip if project already running + if project in self.running_projects: + skipped.append({ + "project": project, + "reason": "already running" + }) + continue + + # Check if cockpit is running + if not container_running(project): + # Start cockpit first + start_result = cockpit_start(project, self.config) + if not start_result["success"]: + skipped.append({ + "project": project, + "reason": f"failed to start cockpit: {start_result['message']}" + }) + continue + + # Get first pending task for this project + task_data = tasks_by_project[project][0] + + # Claim slot via scheduler + if not self.scheduler.claim_task(task_data["id"], project): + skipped.append({ + "project": project, + "reason": "failed to claim scheduler slot" + }) + continue + + # Dispatch async + self.dispatch_task_async(project, task_data) + dispatched.append({ + "project": project, + "task_id": task_data["id"], + "task": task_data["task"][:50] + "..." + }) + + available_slots -= 1 + + return { + "cycle_time": cycle_start.isoformat(), + "dispatched": dispatched, + "skipped": skipped, + "running_count": len(self.running_projects), + "pending_count": len(pending), + "load_stats": self.load_monitor.get_stats() + } + + def get_status(self) -> Dict[str, Any]: + """Get dispatcher status.""" + return { + "running_projects": list(self.running_projects.keys()), + "running_count": len(self.running_projects), + "max_concurrent": self.max_concurrent, + "pending_tasks": len(self.get_pending_tasks()), + "load_stats": self.load_monitor.get_stats(), + "scheduler_status": self.scheduler.get_scheduling_status() + } + + +def run_dispatcher_daemon(config: dict, interval: int = 10) -> None: + """ + Run dispatcher as a daemon. + + Args: + config: Luzia config dict + interval: Seconds between dispatch cycles + """ + dispatcher = CockpitQueueDispatcher(config) + + print(f"[CockpitQueueDispatcher] Started (interval: {interval}s)") + + while True: + try: + result = dispatcher.run_dispatch_cycle() + + if result.get("dispatched"): + for d in result["dispatched"]: + print(f"[DISPATCHED] {d['project']}: {d['task']}") + + if result.get("reason"): + print(f"[CYCLE] {result['reason']}") + + except Exception as e: + print(f"[ERROR] Dispatch cycle failed: {e}") + + time.sleep(interval) + + +def main(): + """Test dispatcher.""" + import yaml + + config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") + if config_path.exists(): + config = yaml.safe_load(config_path.read_text()) + else: + config = {"projects": {}} + + dispatcher = CockpitQueueDispatcher(config) + + print("=" * 60) + print("COCKPIT QUEUE DISPATCHER STATUS") + print("=" * 60) + print(json.dumps(dispatcher.get_status(), indent=2)) + + +if __name__ == "__main__": + main() diff --git a/tests/__pycache__/test_integrations.cpython-310-pytest-9.0.2.pyc b/tests/__pycache__/test_integrations.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 0000000..d0bf641 Binary files /dev/null and b/tests/__pycache__/test_integrations.cpython-310-pytest-9.0.2.pyc differ