#!/usr/bin/env python3 """ Queue Controller v2 - Load-Aware Task Queue with Per-User Locking Extends QueueController with per-user queue isolation: - Enforces single task per user at a time - Prevents concurrent agent edit conflicts - Fair scheduling across projects/users - Atomic locking and capacity tracking This integrates with PerUserQueueManager to enforce exclusive locks. """ import fcntl import json import os import re import subprocess import time import uuid from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any # Import the per-user queue manager import sys from pathlib import Path as PathlibPath lib_path = PathlibPath(__file__).parent if str(lib_path) not in sys.path: sys.path.insert(0, str(lib_path)) from per_user_queue_manager import PerUserQueueManager def validate_project_name(project: str) -> bool: """ Validate project name to prevent path traversal attacks. Rules: - Must be alphanumeric with hyphens/underscores only - Cannot contain path separators or dots - Must be 1-32 characters - Cannot start with hyphen or underscore """ if not project or len(project) > 32: return False # Only allow alphanumeric, hyphen, underscore; must start with letter if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', project): return False # Extra check: no path components if '/' in project or '\\' in project or '..' in project: return False return True class QueueControllerV2: """Load-aware task queue controller with per-user queue isolation.""" QUEUE_BASE = Path("/var/lib/luzia/queue") CONFIG_FILE = QUEUE_BASE / "config.json" CAPACITY_FILE = QUEUE_BASE / "capacity.json" def __init__(self): self.config = self._load_config() self.user_queue_manager = PerUserQueueManager() self._ensure_dirs() def _ensure_dirs(self): """Create queue directory structure if needed.""" for subdir in ["pending/high", "pending/normal"]: (self.QUEUE_BASE / subdir).mkdir(parents=True, exist_ok=True) def _load_config(self) -> Dict[str, Any]: """Load queue configuration.""" if self.CONFIG_FILE.exists(): return json.loads(self.CONFIG_FILE.read_text()) return { "max_concurrent_slots": 4, "max_cpu_load": 0.8, "max_memory_pct": 85, "fair_share": {"enabled": True, "max_per_project": 2}, "per_user_serialization": {"enabled": True, "lock_timeout_seconds": 3600}, "poll_interval_ms": 1000, } # --- Atomic File Operations --- def _atomic_write_json(self, path: Path, data: Dict) -> None: """Write JSON atomically: write to .tmp, fsync, rename.""" tmp_path = path.with_suffix(".json.tmp") with open(tmp_path, "w") as f: json.dump(data, f, indent=2) f.flush() os.fsync(f.fileno()) os.rename(tmp_path, path) def _read_json_safe(self, path: Path, default: Dict = None) -> Dict: """Read JSON with fallback to default on error.""" if not path.exists(): return default or {} try: return json.loads(path.read_text()) except (json.JSONDecodeError, IOError): return default or {} # --- Per-User Queue Methods --- def extract_user_from_project(self, project: str) -> str: """ Extract username from project name. For now, project name IS the username. This can be overridden if projects are under users (e.g., user/project format). Args: project: Project name Returns: Username """ # Current: project name is the username return project def can_user_execute_task(self, user: str) -> bool: """ Check if a user can execute a task (no active lock). Args: user: Username Returns: True if user has no active lock """ if not self.config.get("per_user_serialization", {}).get("enabled"): return True return not self.user_queue_manager.is_user_locked(user) def acquire_user_lock(self, user: str, task_id: str) -> Tuple[bool, Optional[str]]: """ Acquire per-user lock for task execution. Args: user: Username task_id: Task ID Returns: Tuple of (success: bool, lock_id: str or None) """ if not self.config.get("per_user_serialization", {}).get("enabled"): # Per-user serialization disabled return True, f"disabled_{task_id}" acquired, lock_id = self.user_queue_manager.acquire_lock( user, task_id, timeout=30 ) return acquired, lock_id def release_user_lock(self, user: str, lock_id: str) -> bool: """ Release per-user lock after task completion. Args: user: Username lock_id: Lock ID from acquire_user_lock Returns: True if lock was released """ if not self.config.get("per_user_serialization", {}).get("enabled"): return True return self.user_queue_manager.release_lock(user, lock_id) # --- Capacity Management (with locking) --- def _read_capacity(self) -> Dict[str, Any]: """Read capacity.json with file locking.""" if not self.CAPACITY_FILE.exists(): return self._init_capacity() with open(self.CAPACITY_FILE, "r") as f: fcntl.flock(f.fileno(), fcntl.LOCK_SH) try: return json.load(f) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) def _update_capacity(self, updates: Dict[str, Any]) -> Dict[str, Any]: """Update capacity.json atomically with exclusive lock.""" # Get system stats load_1m, load_5m, _ = os.getloadavg() mem_info = self._get_memory_info() with open(self.CAPACITY_FILE, "r+") as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: capacity = json.load(f) # Update system stats capacity["updated_at"] = datetime.now().isoformat() capacity["system"]["load_1m"] = round(load_1m, 2) capacity["system"]["load_5m"] = round(load_5m, 2) capacity["system"]["memory_used_pct"] = mem_info["used_pct"] capacity["system"]["memory_available_mb"] = mem_info["available_mb"] # Apply updates for key, value in updates.items(): if key == "slots": capacity["slots"].update(value) elif key == "by_project": capacity["by_project"].update(value) elif key == "by_user": capacity["by_user"].update(value) else: capacity[key] = value # Recalculate available slots capacity["slots"]["available"] = ( capacity["slots"]["max"] - capacity["slots"]["used"] ) # Write back atomically f.seek(0) f.truncate() json.dump(capacity, f, indent=2) f.flush() os.fsync(f.fileno()) return capacity finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) def _init_capacity(self) -> Dict[str, Any]: """Initialize capacity.json with system info.""" cpu_count = os.cpu_count() or 4 mem_info = self._get_memory_info() capacity = { "updated_at": datetime.now().isoformat(), "system": { "cpu_count": cpu_count, "load_1m": 0.0, "load_5m": 0.0, "memory_total_mb": mem_info["total_mb"], "memory_used_pct": mem_info["used_pct"], "memory_available_mb": mem_info["available_mb"], }, "slots": { "max": self.config.get("max_concurrent_slots", 4), "used": 0, "available": self.config.get("max_concurrent_slots", 4), }, "by_project": {}, "by_user": {}, # NEW: Track per-user concurrent tasks } self._atomic_write_json(self.CAPACITY_FILE, capacity) return capacity def _get_memory_info(self) -> Dict[str, int]: """Get memory info from /proc/meminfo.""" try: with open("/proc/meminfo") as f: lines = f.readlines() mem = {} for line in lines: parts = line.split() if len(parts) >= 2: key = parts[0].rstrip(":") value = int(parts[1]) # kB mem[key] = value total_mb = mem.get("MemTotal", 0) // 1024 available_mb = mem.get("MemAvailable", mem.get("MemFree", 0)) // 1024 used_pct = int(100 * (total_mb - available_mb) / total_mb) if total_mb else 0 return { "total_mb": total_mb, "available_mb": available_mb, "used_pct": used_pct, } except Exception: return {"total_mb": 8192, "available_mb": 4096, "used_pct": 50} # --- Enqueue --- def enqueue( self, project: str, prompt: str, priority: int = 5, skill_match: str = None, enqueued_by: str = None, ) -> Tuple[str, int]: """ Add task to queue. Args: project: Project name prompt: Task prompt priority: 1-10 (1-3 = high, 4-10 = normal) skill_match: Matched skill name (optional) enqueued_by: User who enqueued (optional) Returns: Tuple of (task_id, queue_position) Raises: ValueError: If project name is invalid """ # SECURITY: Validate project name to prevent path traversal if not validate_project_name(project): raise ValueError(f"Invalid project name: {project}") task_id = str(uuid.uuid4())[:8] tier = "high" if priority <= 3 else "normal" entry = { "id": task_id, "project": project, "user": self.extract_user_from_project(project), # Add user field "priority": priority, "prompt": prompt, "skill_match": skill_match, "enqueued_at": datetime.now().isoformat(), "enqueued_by": enqueued_by or os.environ.get("USER", "unknown"), "status": "pending", } # Filename format: {priority}_{timestamp}_{project}_{task-id}.json filename = f"{priority}_{int(time.time())}_{project}_{task_id}.json" path = self.QUEUE_BASE / "pending" / tier / filename self._atomic_write_json(path, entry) position = self._get_queue_position(task_id, tier) return task_id, position def _get_queue_position(self, task_id: str, tier: str) -> int: """Get queue position for a task.""" # Count tasks ahead (high priority first, then by timestamp) position = 1 # High priority tasks for f in sorted((self.QUEUE_BASE / "pending" / "high").glob("*.json")): if task_id in f.name: return position position += 1 # Normal priority tasks (only count if task is in normal) if tier == "normal": for f in sorted((self.QUEUE_BASE / "pending" / "normal").glob("*.json")): if task_id in f.name: return position position += 1 return position # --- Capacity Check --- def _has_capacity(self, capacity: Dict) -> bool: """Check if system has capacity for new task.""" cpu_count = capacity["system"].get("cpu_count", 4) max_load = self.config["max_cpu_load"] * cpu_count return ( capacity["slots"]["available"] > 0 and capacity["system"]["load_5m"] < max_load and capacity["system"]["memory_used_pct"] < self.config["max_memory_pct"] ) # --- Fair Share Selection (with Per-User Awareness) --- def _get_pending_tasks(self) -> List[Dict]: """Get all pending tasks sorted by priority and timestamp.""" tasks = [] # High priority first for f in sorted((self.QUEUE_BASE / "pending" / "high").glob("*.json")): task = self._read_json_safe(f) if task: task["_path"] = str(f) tasks.append(task) # Then normal priority for f in sorted((self.QUEUE_BASE / "pending" / "normal").glob("*.json")): task = self._read_json_safe(f) if task: task["_path"] = str(f) tasks.append(task) return tasks def _select_next_task(self, capacity: Dict) -> Optional[Dict]: """Fair share task selection across projects/users with per-user exclusion.""" pending = self._get_pending_tasks() if not pending: return None active_by_project = capacity.get("by_project", {}) max_per_project = self.config["fair_share"]["max_per_project"] if not self.config["fair_share"]["enabled"]: # No fair share: check per-user locking and take first available task for task in pending: user = task.get("user") or self.extract_user_from_project(task.get("project")) if self.can_user_execute_task(user): return task return None # Group by project, filter those at limit eligible = {} for task in pending: project = task["project"] user = task.get("user") or self.extract_user_from_project(project) # Check per-user lock if not self.can_user_execute_task(user): continue if active_by_project.get(project, 0) < max_per_project: if project not in eligible: eligible[project] = [] eligible[project].append(task) if not eligible: return None # Pick project with fewest active tasks, then oldest task from that project project = min(eligible.keys(), key=lambda p: active_by_project.get(p, 0)) return min(eligible[project], key=lambda t: t.get("enqueued_at", "")) # --- Dispatch --- def _dispatch(self, task: Dict) -> bool: """ Dispatch task to conductor and spawn container with per-user locking. Uses atomic task claiming and per-user locks to prevent race conditions. Returns True if dispatch succeeded. """ project = task["project"] user = task.get("user") or self.extract_user_from_project(project) task_id = task["id"] task_path = Path(task.get("_path", "")) # SECURITY: Validate project name before using in path if not validate_project_name(project): print(f"[queue] Invalid project name: {project}") return False # Acquire per-user lock BEFORE atomic claim # This prevents another dispatcher from starting a task for the same user acquired, lock_id = self.acquire_user_lock(user, task_id) if not acquired: print(f"[queue] Cannot acquire per-user lock for {user}, another task may be running") return False # Atomic claim: try to rename task file to .dispatching if task_path.exists(): dispatching_path = task_path.with_suffix(".json.dispatching") try: os.rename(task_path, dispatching_path) except FileNotFoundError: # Another controller already claimed this task self.release_user_lock(user, lock_id) print(f"[queue] Task {task_id} already claimed by another controller") return False except OSError as e: # Release lock on failure self.release_user_lock(user, lock_id) print(f"[queue] Failed to claim task {task_id}: {e}") return False else: # Release lock on failure self.release_user_lock(user, lock_id) print(f"[queue] Task file not found: {task_path}") return False # Create conductor directory conductor_dir = Path(f"/home/{project}/conductor/active/{task_id}") try: conductor_dir.mkdir(parents=True, exist_ok=True) except PermissionError: # Unclaim task and release lock on failure try: os.rename(dispatching_path, task_path) except OSError: pass self.release_user_lock(user, lock_id) print(f"[queue] Cannot create conductor dir for {project}: permission denied") return False # Write meta.json to conductor meta = { "id": task_id, "prompt": task["prompt"], "started": datetime.now().isoformat(), "status": "running", "skill": task.get("skill_match"), "zen_continuation_id": None, "dispatched_by": task.get("enqueued_by", "queue"), "priority": task.get("priority", 5), "user": user, # Track user for cleanup "lock_id": lock_id, # Track lock ID for cleanup } self._atomic_write_json(conductor_dir / "meta.json", meta) # Write initial heartbeat heartbeat = {"ts": time.time(), "step": "Starting task"} self._atomic_write_json(conductor_dir / "heartbeat.json", heartbeat) # Write initial progress progress_md = f"""# Progress: {task_id} ## Milestones - [ ] Task started - [ ] Implementation in progress - [ ] Testing - [ ] Completed ## Current Status Task dispatched from queue. Last update: {datetime.now().strftime('%Y-%m-%d %H:%M')} """ (conductor_dir / "progress.md").write_text(progress_md) # Create dialogue directory (conductor_dir / "dialogue").mkdir(exist_ok=True) # Remove from queue (delete the .dispatching file we claimed earlier) try: dispatching_path.unlink() except FileNotFoundError: pass # Already cleaned up # Update capacity capacity = self._read_capacity() by_project = capacity.get("by_project", {}) by_user = capacity.get("by_user", {}) by_project[project] = by_project.get(project, 0) + 1 by_user[user] = by_user.get(user, 0) + 1 self._update_capacity({ "slots": {"used": capacity["slots"]["used"] + 1}, "by_project": by_project, "by_user": by_user, }) print(f"[queue] Dispatched {task_id} to {project} (user: {user}, lock: {lock_id})") # Spawn the actual agent via luzia's spawn_claude_agent job_id = self._spawn_agent(project, task, conductor_dir) if job_id: # Update conductor meta with job linkage meta["job_id"] = job_id meta["status"] = "running" self._atomic_write_json(conductor_dir / "meta.json", meta) print(f"[queue] Spawned agent job {job_id} for task {task_id}") else: print(f"[queue] Warning: Agent spawn failed for {task_id}") return True def _spawn_agent(self, project: str, task: Dict, conductor_dir: Path) -> Optional[str]: """ Spawn Claude agent for the task using luzia infrastructure. Returns job_id if successful, None otherwise. """ try: import subprocess prompt = task.get("prompt", "") # Build the luzia command cmd = [ "/opt/server-agents/orchestrator/bin/luzia", project, prompt ] # Run luzia to spawn the agent result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, env={ **os.environ, "LUZIA_QUEUE_DISPATCH": "1", "LUZIA_CONDUCTOR_DIR": str(conductor_dir), } ) if result.returncode == 0: # Parse job_id from output for line in result.stdout.strip().split("\n"): line = line.strip() if not line: continue parts = line.split(":") if len(parts) >= 2: job_id = parts[-1].strip() if re.match(r'^\d{6}-[a-f0-9]+', job_id): return job_id return f"queue-{task.get('id', 'unknown')}" else: print(f"[queue] Luzia spawn failed: {result.stderr}") return None except subprocess.TimeoutExpired: print(f"[queue] Luzia spawn timed out") return None except Exception as e: print(f"[queue] Spawn error: {e}") return None # --- Daemon Loop --- def run_loop(self): """Main daemon loop - poll and dispatch.""" print(f"[queue] Starting queue controller daemon (v2 with per-user locking)") print(f"[queue] Config: max_slots={self.config['max_concurrent_slots']}, " f"max_load={self.config['max_cpu_load']}, " f"max_mem={self.config['max_memory_pct']}%") print(f"[queue] Per-user serialization: {self.config['per_user_serialization'].get('enabled', True)}") poll_interval = self.config["poll_interval_ms"] / 1000 backpressure_sleep = self.config.get("backpressure", {}).get("sleep_ms", 5000) / 1000 while True: try: # Clean up stale locks periodically self.user_queue_manager.cleanup_all_stale_locks() capacity = self._update_capacity({}) # Refresh system stats if self._has_capacity(capacity): task = self._select_next_task(capacity) if task: self._dispatch(task) else: # Backpressure: sleep longer when overloaded if self.config.get("backpressure", {}).get("enabled"): time.sleep(backpressure_sleep) continue time.sleep(poll_interval) except KeyboardInterrupt: print("\n[queue] Shutting down...") break except Exception as e: print(f"[queue] Error in loop: {e}") time.sleep(poll_interval * 5) # --- Queue Status --- def get_queue_status(self, project: str = None) -> Dict[str, Any]: """Get queue status for display.""" capacity = self._read_capacity() pending_high = list((self.QUEUE_BASE / "pending" / "high").glob("*.json")) pending_normal = list((self.QUEUE_BASE / "pending" / "normal").glob("*.json")) # Filter by project if specified if project: pending_high = [f for f in pending_high if f"_{project}_" in f.name] pending_normal = [f for f in pending_normal if f"_{project}_" in f.name] # Load task details high_tasks = [self._read_json_safe(f) for f in sorted(pending_high)] normal_tasks = [self._read_json_safe(f) for f in sorted(pending_normal)] # Get active locks active_locks = self.user_queue_manager.get_all_locks() return { "pending": { "high": len(high_tasks), "normal": len(normal_tasks), "total": len(high_tasks) + len(normal_tasks), }, "active": { "slots_used": capacity["slots"]["used"], "slots_max": capacity["slots"]["max"], "by_project": capacity.get("by_project", {}), "by_user": capacity.get("by_user", {}), }, "user_locks": { "active": len(active_locks), "details": active_locks, }, "system": { "load": capacity["system"]["load_5m"], "memory_pct": capacity["system"]["memory_used_pct"], }, "tasks": { "high": high_tasks, "normal": normal_tasks, }, } def clear_queue(self, project: str = None) -> int: """Clear pending tasks. Returns count of cleared tasks.""" count = 0 for tier in ["high", "normal"]: for f in (self.QUEUE_BASE / "pending" / tier).glob("*.json"): if project and f"_{project}_" not in f.name: continue f.unlink() count += 1 return count # CLI interface if __name__ == "__main__": import sys qc = QueueControllerV2() if len(sys.argv) < 2: print("Usage:") print(" queue_controller_v2.py daemon Run queue daemon") print(" queue_controller_v2.py status Show queue status") print(" queue_controller_v2.py enqueue [priority]") print(" queue_controller_v2.py clear [project]") sys.exit(0) cmd = sys.argv[1] if cmd == "daemon": qc.run_loop() elif cmd == "status": status = qc.get_queue_status() print(json.dumps(status, indent=2)) elif cmd == "enqueue" and len(sys.argv) >= 4: project = sys.argv[2] prompt = sys.argv[3] priority = int(sys.argv[4]) if len(sys.argv) > 4 else 5 task_id, position = qc.enqueue(project, prompt, priority) print(f"Task {task_id} queued (position {position})") elif cmd == "clear": project = sys.argv[2] if len(sys.argv) > 2 else None count = qc.clear_queue(project) print(f"Cleared {count} tasks") else: print(f"Unknown command: {cmd}") sys.exit(1)