#!/usr/bin/env python3 """ Queue Controller - Load-Aware Task Queue for Luzia Implements: - File-based task queue with priority tiers (high/normal) - Load-aware scheduling (CPU, memory, slot limits) - Fair share across projects (prevents starvation) - Atomic file operations (write to .tmp, fsync, rename) - File locking for capacity.json (fcntl.flock) Usage: from queue_controller import QueueController qc = QueueController() task_id, position = qc.enqueue("musica", "fix the bug", priority=5) # Or run as daemon qc.run_loop() """ import fcntl import json import os import re import subprocess import time import uuid from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any def validate_project_name(project: str) -> bool: """ Validate project name to prevent path traversal attacks. Rules: - Must be alphanumeric with hyphens/underscores only - Cannot contain path separators or dots - Must be 1-32 characters - Cannot start with hyphen or underscore """ if not project or len(project) > 32: return False # Only allow alphanumeric, hyphen, underscore; must start with letter if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', project): return False # Extra check: no path components if '/' in project or '\\' in project or '..' in project: return False return True class QueueController: """Load-aware task queue controller with fair share scheduling.""" QUEUE_BASE = Path("/var/lib/luzia/queue") CONFIG_FILE = QUEUE_BASE / "config.json" CAPACITY_FILE = QUEUE_BASE / "capacity.json" def __init__(self): self.config = self._load_config() self._ensure_dirs() def _ensure_dirs(self): """Create queue directory structure if needed.""" for subdir in ["pending/high", "pending/normal"]: (self.QUEUE_BASE / subdir).mkdir(parents=True, exist_ok=True) def _load_config(self) -> Dict[str, Any]: """Load queue configuration.""" if self.CONFIG_FILE.exists(): return json.loads(self.CONFIG_FILE.read_text()) return { "max_concurrent_slots": 4, "max_cpu_load": 0.8, "max_memory_pct": 85, "fair_share": {"enabled": True, "max_per_project": 2}, "poll_interval_ms": 1000, } # --- Atomic File Operations --- def _atomic_write_json(self, path: Path, data: Dict) -> None: """Write JSON atomically: write to .tmp, fsync, rename.""" tmp_path = path.with_suffix(".json.tmp") with open(tmp_path, "w") as f: json.dump(data, f, indent=2) f.flush() os.fsync(f.fileno()) os.rename(tmp_path, path) def _read_json_safe(self, path: Path, default: Dict = None) -> Dict: """Read JSON with fallback to default on error.""" if not path.exists(): return default or {} try: return json.loads(path.read_text()) except (json.JSONDecodeError, IOError): return default or {} # --- Capacity Management (with locking) --- def _read_capacity(self) -> Dict[str, Any]: """Read capacity.json with file locking.""" if not self.CAPACITY_FILE.exists(): return self._init_capacity() with open(self.CAPACITY_FILE, "r") as f: fcntl.flock(f.fileno(), fcntl.LOCK_SH) try: return json.load(f) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) def _update_capacity(self, updates: Dict[str, Any]) -> Dict[str, Any]: """Update capacity.json atomically with exclusive lock.""" # Get system stats load_1m, load_5m, _ = os.getloadavg() mem_info = self._get_memory_info() with open(self.CAPACITY_FILE, "r+") as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: capacity = json.load(f) # Update system stats capacity["updated_at"] = datetime.now().isoformat() capacity["system"]["load_1m"] = round(load_1m, 2) capacity["system"]["load_5m"] = round(load_5m, 2) capacity["system"]["memory_used_pct"] = mem_info["used_pct"] capacity["system"]["memory_available_mb"] = mem_info["available_mb"] # Apply updates for key, value in updates.items(): if key == "slots": capacity["slots"].update(value) elif key == "by_project": capacity["by_project"].update(value) else: capacity[key] = value # Recalculate available slots capacity["slots"]["available"] = ( capacity["slots"]["max"] - capacity["slots"]["used"] ) # Write back atomically f.seek(0) f.truncate() json.dump(capacity, f, indent=2) f.flush() os.fsync(f.fileno()) return capacity finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) def _init_capacity(self) -> Dict[str, Any]: """Initialize capacity.json with system info.""" cpu_count = os.cpu_count() or 4 mem_info = self._get_memory_info() capacity = { "updated_at": datetime.now().isoformat(), "system": { "cpu_count": cpu_count, "load_1m": 0.0, "load_5m": 0.0, "memory_total_mb": mem_info["total_mb"], "memory_used_pct": mem_info["used_pct"], "memory_available_mb": mem_info["available_mb"], }, "slots": { "max": self.config.get("max_concurrent_slots", 4), "used": 0, "available": self.config.get("max_concurrent_slots", 4), }, "by_project": {}, } self._atomic_write_json(self.CAPACITY_FILE, capacity) return capacity def _get_memory_info(self) -> Dict[str, int]: """Get memory info from /proc/meminfo.""" try: with open("/proc/meminfo") as f: lines = f.readlines() mem = {} for line in lines: parts = line.split() if len(parts) >= 2: key = parts[0].rstrip(":") value = int(parts[1]) # kB mem[key] = value total_mb = mem.get("MemTotal", 0) // 1024 available_mb = mem.get("MemAvailable", mem.get("MemFree", 0)) // 1024 used_pct = int(100 * (total_mb - available_mb) / total_mb) if total_mb else 0 return { "total_mb": total_mb, "available_mb": available_mb, "used_pct": used_pct, } except Exception: return {"total_mb": 8192, "available_mb": 4096, "used_pct": 50} # --- Enqueue --- def enqueue( self, project: str, prompt: str, priority: int = 5, skill_match: str = None, enqueued_by: str = None, ) -> Tuple[str, int]: """ Add task to queue. Args: project: Project name prompt: Task prompt priority: 1-10 (1-3 = high, 4-10 = normal) skill_match: Matched skill name (optional) enqueued_by: User who enqueued (optional) Returns: Tuple of (task_id, queue_position) Raises: ValueError: If project name is invalid """ # SECURITY: Validate project name to prevent path traversal if not validate_project_name(project): raise ValueError(f"Invalid project name: {project}") task_id = str(uuid.uuid4())[:8] tier = "high" if priority <= 3 else "normal" entry = { "id": task_id, "project": project, "priority": priority, "prompt": prompt, "skill_match": skill_match, "enqueued_at": datetime.now().isoformat(), "enqueued_by": enqueued_by or os.environ.get("USER", "unknown"), "status": "pending", } # Filename format: {priority}_{timestamp}_{project}_{task-id}.json filename = f"{priority}_{int(time.time())}_{project}_{task_id}.json" path = self.QUEUE_BASE / "pending" / tier / filename self._atomic_write_json(path, entry) position = self._get_queue_position(task_id, tier) return task_id, position def _get_queue_position(self, task_id: str, tier: str) -> int: """Get queue position for a task.""" # Count tasks ahead (high priority first, then by timestamp) position = 1 # High priority tasks for f in sorted((self.QUEUE_BASE / "pending" / "high").glob("*.json")): if task_id in f.name: return position position += 1 # Normal priority tasks (only count if task is in normal) if tier == "normal": for f in sorted((self.QUEUE_BASE / "pending" / "normal").glob("*.json")): if task_id in f.name: return position position += 1 return position # --- Capacity Check --- def _has_capacity(self, capacity: Dict) -> bool: """Check if system has capacity for new task.""" cpu_count = capacity["system"].get("cpu_count", 4) max_load = self.config["max_cpu_load"] * cpu_count return ( capacity["slots"]["available"] > 0 and capacity["system"]["load_5m"] < max_load and capacity["system"]["memory_used_pct"] < self.config["max_memory_pct"] ) # --- Fair Share Selection --- def _get_pending_tasks(self) -> List[Dict]: """Get all pending tasks sorted by priority and timestamp.""" tasks = [] # High priority first for f in sorted((self.QUEUE_BASE / "pending" / "high").glob("*.json")): task = self._read_json_safe(f) if task: task["_path"] = str(f) tasks.append(task) # Then normal priority for f in sorted((self.QUEUE_BASE / "pending" / "normal").glob("*.json")): task = self._read_json_safe(f) if task: task["_path"] = str(f) tasks.append(task) return tasks def _select_next_task(self, capacity: Dict) -> Optional[Dict]: """Fair share task selection across projects.""" pending = self._get_pending_tasks() if not pending: return None active_by_project = capacity.get("by_project", {}) max_per_project = self.config["fair_share"]["max_per_project"] if not self.config["fair_share"]["enabled"]: # No fair share: just take the first pending task return pending[0] # Group by project, filter those at limit eligible = {} for task in pending: project = task["project"] if active_by_project.get(project, 0) < max_per_project: if project not in eligible: eligible[project] = [] eligible[project].append(task) if not eligible: return None # Pick project with fewest active tasks, then oldest task from that project project = min(eligible.keys(), key=lambda p: active_by_project.get(p, 0)) return min(eligible[project], key=lambda t: t.get("enqueued_at", "")) # --- Dispatch --- def _dispatch(self, task: Dict) -> bool: """ Dispatch task to conductor and spawn container. Uses atomic task claiming to prevent race conditions: 1. Try to rename task file to .dispatching (atomic claim) 2. If rename fails, another controller claimed it 3. Only proceed with dispatch if claim succeeded Returns True if dispatch succeeded. """ project = task["project"] task_id = task["id"] task_path = Path(task.get("_path", "")) # SECURITY: Validate project name before using in path if not validate_project_name(project): print(f"[queue] Invalid project name: {project}") return False # Atomic claim: try to rename task file to .dispatching # This prevents race conditions where two controllers try to dispatch same task if task_path.exists(): dispatching_path = task_path.with_suffix(".json.dispatching") try: os.rename(task_path, dispatching_path) except FileNotFoundError: # Another controller already claimed this task print(f"[queue] Task {task_id} already claimed by another controller") return False except OSError as e: print(f"[queue] Failed to claim task {task_id}: {e}") return False else: print(f"[queue] Task file not found: {task_path}") return False # Create conductor directory conductor_dir = Path(f"/home/{project}/conductor/active/{task_id}") try: conductor_dir.mkdir(parents=True, exist_ok=True) except PermissionError: # Unclaim task on failure try: os.rename(dispatching_path, task_path) except OSError: pass print(f"[queue] Cannot create conductor dir for {project}: permission denied") return False # Write meta.json to conductor meta = { "id": task_id, "prompt": task["prompt"], "started": datetime.now().isoformat(), "status": "running", "skill": task.get("skill_match"), "zen_continuation_id": None, "dispatched_by": task.get("enqueued_by", "queue"), "priority": task.get("priority", 5), } self._atomic_write_json(conductor_dir / "meta.json", meta) # Write initial heartbeat heartbeat = {"ts": time.time(), "step": "Starting task"} self._atomic_write_json(conductor_dir / "heartbeat.json", heartbeat) # Write initial progress progress_md = f"""# Progress: {task_id} ## Milestones - [ ] Task started - [ ] Implementation in progress - [ ] Testing - [ ] Completed ## Current Status Task dispatched from queue. Last update: {datetime.now().strftime('%Y-%m-%d %H:%M')} """ (conductor_dir / "progress.md").write_text(progress_md) # Create dialogue directory (conductor_dir / "dialogue").mkdir(exist_ok=True) # Remove from queue (delete the .dispatching file we claimed earlier) try: dispatching_path.unlink() except FileNotFoundError: pass # Already cleaned up # Update capacity capacity = self._read_capacity() by_project = capacity.get("by_project", {}) by_project[project] = by_project.get(project, 0) + 1 self._update_capacity({ "slots": {"used": capacity["slots"]["used"] + 1}, "by_project": by_project, }) print(f"[queue] Dispatched {task_id} to {project}") # Spawn the actual agent via luzia's spawn_claude_agent job_id = self._spawn_agent(project, task, conductor_dir) if job_id: # Update conductor meta with job linkage meta["job_id"] = job_id meta["status"] = "running" self._atomic_write_json(conductor_dir / "meta.json", meta) print(f"[queue] Spawned agent job {job_id} for task {task_id}") else: print(f"[queue] Warning: Agent spawn failed for {task_id}") # Task is dispatched to conductor but agent didn't start # Watchdog will detect this via missing heartbeat updates return True def _spawn_agent(self, project: str, task: Dict, conductor_dir: Path) -> Optional[str]: """ Spawn Claude agent for the task using luzia infrastructure. NON-BLOCKING: Uses Popen to spawn agent in background instead of blocking with run(). This allows the dispatcher to spawn multiple tasks in quick succession. Returns job_id if spawn started successfully, None otherwise. """ try: # Import luzia spawn function dynamically import sys luzia_bin = Path("/opt/server-agents/orchestrator/bin") if str(luzia_bin) not in sys.path: sys.path.insert(0, str(luzia_bin)) # We can't import luzia directly (it's a script), so use subprocess import subprocess prompt = task.get("prompt", "") skill = task.get("skill_match", "") # Build the luzia command # Use 'luzia ' which routes through spawn_claude_agent cmd = [ "/opt/server-agents/orchestrator/bin/luzia", project, prompt ] # NON-BLOCKING: Use Popen instead of run() to spawn in background # This prevents the dispatcher loop from blocking on each dispatch proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env={ **os.environ, "LUZIA_QUEUE_DISPATCH": "1", # Signal this is from queue "LUZIA_CONDUCTOR_DIR": str(conductor_dir), } ) # Generate job_id immediately instead of waiting for subprocess output # Format: agent:project:HHMMSS-xxxx import time job_id = f"agent:{project}:{time.strftime('%H%M%S')}-{task.get('id', 'unknown')[:8]}" print(f"[queue] Spawned agent {job_id} in background (PID {proc.pid})") return job_id except Exception as e: print(f"[queue] Spawn error: {e}") return None # --- Daemon Loop --- def run_loop(self): """Main daemon loop - poll and dispatch.""" print(f"[queue] Starting queue controller daemon") print(f"[queue] Config: max_slots={self.config['max_concurrent_slots']}, " f"max_load={self.config['max_cpu_load']}, " f"max_mem={self.config['max_memory_pct']}%") poll_interval = self.config["poll_interval_ms"] / 1000 backpressure_sleep = self.config.get("backpressure", {}).get("sleep_ms", 5000) / 1000 while True: try: capacity = self._update_capacity({}) # Refresh system stats if self._has_capacity(capacity): task = self._select_next_task(capacity) if task: self._dispatch(task) else: # Backpressure: sleep longer when overloaded if self.config.get("backpressure", {}).get("enabled"): time.sleep(backpressure_sleep) continue time.sleep(poll_interval) except KeyboardInterrupt: print("\n[queue] Shutting down...") break except Exception as e: print(f"[queue] Error in loop: {e}") time.sleep(poll_interval * 5) # Back off on errors # --- Queue Status --- def get_queue_status(self, project: str = None) -> Dict[str, Any]: """Get queue status for display.""" capacity = self._read_capacity() pending_high = list((self.QUEUE_BASE / "pending" / "high").glob("*.json")) pending_normal = list((self.QUEUE_BASE / "pending" / "normal").glob("*.json")) # Filter by project if specified if project: pending_high = [f for f in pending_high if f"_{project}_" in f.name] pending_normal = [f for f in pending_normal if f"_{project}_" in f.name] # Load task details high_tasks = [self._read_json_safe(f) for f in sorted(pending_high)] normal_tasks = [self._read_json_safe(f) for f in sorted(pending_normal)] return { "pending": { "high": len(high_tasks), "normal": len(normal_tasks), "total": len(high_tasks) + len(normal_tasks), }, "active": { "slots_used": capacity["slots"]["used"], "slots_max": capacity["slots"]["max"], "by_project": capacity.get("by_project", {}), }, "system": { "load": capacity["system"]["load_5m"], "memory_pct": capacity["system"]["memory_used_pct"], }, "tasks": { "high": high_tasks, "normal": normal_tasks, }, } def clear_queue(self, project: str = None) -> int: """Clear pending tasks. Returns count of cleared tasks.""" count = 0 for tier in ["high", "normal"]: for f in (self.QUEUE_BASE / "pending" / tier).glob("*.json"): if project and f"_{project}_" not in f.name: continue f.unlink() count += 1 return count # CLI interface if __name__ == "__main__": import sys qc = QueueController() if len(sys.argv) < 2: print("Usage:") print(" queue_controller.py daemon Run queue daemon") print(" queue_controller.py status Show queue status") print(" queue_controller.py enqueue [priority]") print(" queue_controller.py clear [project]") sys.exit(0) cmd = sys.argv[1] if cmd == "daemon": qc.run_loop() elif cmd == "status": status = qc.get_queue_status() print(json.dumps(status, indent=2)) elif cmd == "enqueue" and len(sys.argv) >= 4: project = sys.argv[2] prompt = sys.argv[3] priority = int(sys.argv[4]) if len(sys.argv) > 4 else 5 task_id, position = qc.enqueue(project, prompt, priority) print(f"Task {task_id} queued (position {position})") elif cmd == "clear": project = sys.argv[2] if len(sys.argv) > 2 else None count = qc.clear_queue(project) print(f"Cleared {count} tasks") else: print(f"Unknown command: {cmd}") sys.exit(1)