Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
641 lines
22 KiB
Python
641 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Queue Controller - Load-Aware Task Queue for Luzia
|
|
|
|
Implements:
|
|
- File-based task queue with priority tiers (high/normal)
|
|
- Load-aware scheduling (CPU, memory, slot limits)
|
|
- Fair share across projects (prevents starvation)
|
|
- Atomic file operations (write to .tmp, fsync, rename)
|
|
- File locking for capacity.json (fcntl.flock)
|
|
|
|
Usage:
|
|
from queue_controller import QueueController
|
|
|
|
qc = QueueController()
|
|
task_id, position = qc.enqueue("musica", "fix the bug", priority=5)
|
|
|
|
# Or run as daemon
|
|
qc.run_loop()
|
|
"""
|
|
|
|
import fcntl
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
|
|
def validate_project_name(project: str) -> bool:
|
|
"""
|
|
Validate project name to prevent path traversal attacks.
|
|
|
|
Rules:
|
|
- Must be alphanumeric with hyphens/underscores only
|
|
- Cannot contain path separators or dots
|
|
- Must be 1-32 characters
|
|
- Cannot start with hyphen or underscore
|
|
"""
|
|
if not project or len(project) > 32:
|
|
return False
|
|
# Only allow alphanumeric, hyphen, underscore; must start with letter
|
|
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_-]*$', project):
|
|
return False
|
|
# Extra check: no path components
|
|
if '/' in project or '\\' in project or '..' in project:
|
|
return False
|
|
return True
|
|
|
|
|
|
class QueueController:
|
|
"""Load-aware task queue controller with fair share scheduling."""
|
|
|
|
QUEUE_BASE = Path("/var/lib/luzia/queue")
|
|
CONFIG_FILE = QUEUE_BASE / "config.json"
|
|
CAPACITY_FILE = QUEUE_BASE / "capacity.json"
|
|
|
|
def __init__(self):
|
|
self.config = self._load_config()
|
|
self._ensure_dirs()
|
|
|
|
def _ensure_dirs(self):
|
|
"""Create queue directory structure if needed."""
|
|
for subdir in ["pending/high", "pending/normal"]:
|
|
(self.QUEUE_BASE / subdir).mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""Load queue configuration."""
|
|
if self.CONFIG_FILE.exists():
|
|
return json.loads(self.CONFIG_FILE.read_text())
|
|
return {
|
|
"max_concurrent_slots": 4,
|
|
"max_cpu_load": 0.8,
|
|
"max_memory_pct": 85,
|
|
"fair_share": {"enabled": True, "max_per_project": 2},
|
|
"poll_interval_ms": 1000,
|
|
}
|
|
|
|
# --- Atomic File Operations ---
|
|
|
|
def _atomic_write_json(self, path: Path, data: Dict) -> None:
|
|
"""Write JSON atomically: write to .tmp, fsync, rename."""
|
|
tmp_path = path.with_suffix(".json.tmp")
|
|
with open(tmp_path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.rename(tmp_path, path)
|
|
|
|
def _read_json_safe(self, path: Path, default: Dict = None) -> Dict:
|
|
"""Read JSON with fallback to default on error."""
|
|
if not path.exists():
|
|
return default or {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except (json.JSONDecodeError, IOError):
|
|
return default or {}
|
|
|
|
# --- Capacity Management (with locking) ---
|
|
|
|
def _read_capacity(self) -> Dict[str, Any]:
|
|
"""Read capacity.json with file locking."""
|
|
if not self.CAPACITY_FILE.exists():
|
|
return self._init_capacity()
|
|
|
|
with open(self.CAPACITY_FILE, "r") as f:
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
|
|
try:
|
|
return json.load(f)
|
|
finally:
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
|
|
|
|
def _update_capacity(self, updates: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Update capacity.json atomically with exclusive lock."""
|
|
# Get system stats
|
|
load_1m, load_5m, _ = os.getloadavg()
|
|
mem_info = self._get_memory_info()
|
|
|
|
with open(self.CAPACITY_FILE, "r+") as f:
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
|
|
try:
|
|
capacity = json.load(f)
|
|
|
|
# Update system stats
|
|
capacity["updated_at"] = datetime.now().isoformat()
|
|
capacity["system"]["load_1m"] = round(load_1m, 2)
|
|
capacity["system"]["load_5m"] = round(load_5m, 2)
|
|
capacity["system"]["memory_used_pct"] = mem_info["used_pct"]
|
|
capacity["system"]["memory_available_mb"] = mem_info["available_mb"]
|
|
|
|
# Apply updates
|
|
for key, value in updates.items():
|
|
if key == "slots":
|
|
capacity["slots"].update(value)
|
|
elif key == "by_project":
|
|
capacity["by_project"].update(value)
|
|
else:
|
|
capacity[key] = value
|
|
|
|
# Recalculate available slots
|
|
capacity["slots"]["available"] = (
|
|
capacity["slots"]["max"] - capacity["slots"]["used"]
|
|
)
|
|
|
|
# Write back atomically
|
|
f.seek(0)
|
|
f.truncate()
|
|
json.dump(capacity, f, indent=2)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
|
|
return capacity
|
|
finally:
|
|
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
|
|
|
|
def _init_capacity(self) -> Dict[str, Any]:
|
|
"""Initialize capacity.json with system info."""
|
|
cpu_count = os.cpu_count() or 4
|
|
mem_info = self._get_memory_info()
|
|
|
|
capacity = {
|
|
"updated_at": datetime.now().isoformat(),
|
|
"system": {
|
|
"cpu_count": cpu_count,
|
|
"load_1m": 0.0,
|
|
"load_5m": 0.0,
|
|
"memory_total_mb": mem_info["total_mb"],
|
|
"memory_used_pct": mem_info["used_pct"],
|
|
"memory_available_mb": mem_info["available_mb"],
|
|
},
|
|
"slots": {
|
|
"max": self.config.get("max_concurrent_slots", 4),
|
|
"used": 0,
|
|
"available": self.config.get("max_concurrent_slots", 4),
|
|
},
|
|
"by_project": {},
|
|
}
|
|
self._atomic_write_json(self.CAPACITY_FILE, capacity)
|
|
return capacity
|
|
|
|
def _get_memory_info(self) -> Dict[str, int]:
|
|
"""Get memory info from /proc/meminfo."""
|
|
try:
|
|
with open("/proc/meminfo") as f:
|
|
lines = f.readlines()
|
|
mem = {}
|
|
for line in lines:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
key = parts[0].rstrip(":")
|
|
value = int(parts[1]) # kB
|
|
mem[key] = value
|
|
|
|
total_mb = mem.get("MemTotal", 0) // 1024
|
|
available_mb = mem.get("MemAvailable", mem.get("MemFree", 0)) // 1024
|
|
used_pct = int(100 * (total_mb - available_mb) / total_mb) if total_mb else 0
|
|
|
|
return {
|
|
"total_mb": total_mb,
|
|
"available_mb": available_mb,
|
|
"used_pct": used_pct,
|
|
}
|
|
except Exception:
|
|
return {"total_mb": 8192, "available_mb": 4096, "used_pct": 50}
|
|
|
|
# --- Enqueue ---
|
|
|
|
def enqueue(
|
|
self,
|
|
project: str,
|
|
prompt: str,
|
|
priority: int = 5,
|
|
skill_match: str = None,
|
|
enqueued_by: str = None,
|
|
) -> Tuple[str, int]:
|
|
"""
|
|
Add task to queue.
|
|
|
|
Args:
|
|
project: Project name
|
|
prompt: Task prompt
|
|
priority: 1-10 (1-3 = high, 4-10 = normal)
|
|
skill_match: Matched skill name (optional)
|
|
enqueued_by: User who enqueued (optional)
|
|
|
|
Returns:
|
|
Tuple of (task_id, queue_position)
|
|
|
|
Raises:
|
|
ValueError: If project name is invalid
|
|
"""
|
|
# SECURITY: Validate project name to prevent path traversal
|
|
if not validate_project_name(project):
|
|
raise ValueError(f"Invalid project name: {project}")
|
|
|
|
task_id = str(uuid.uuid4())[:8]
|
|
tier = "high" if priority <= 3 else "normal"
|
|
|
|
entry = {
|
|
"id": task_id,
|
|
"project": project,
|
|
"priority": priority,
|
|
"prompt": prompt,
|
|
"skill_match": skill_match,
|
|
"enqueued_at": datetime.now().isoformat(),
|
|
"enqueued_by": enqueued_by or os.environ.get("USER", "unknown"),
|
|
"status": "pending",
|
|
}
|
|
|
|
# Filename format: {priority}_{timestamp}_{project}_{task-id}.json
|
|
filename = f"{priority}_{int(time.time())}_{project}_{task_id}.json"
|
|
path = self.QUEUE_BASE / "pending" / tier / filename
|
|
|
|
self._atomic_write_json(path, entry)
|
|
|
|
position = self._get_queue_position(task_id, tier)
|
|
return task_id, position
|
|
|
|
def _get_queue_position(self, task_id: str, tier: str) -> int:
|
|
"""Get queue position for a task."""
|
|
# Count tasks ahead (high priority first, then by timestamp)
|
|
position = 1
|
|
|
|
# High priority tasks
|
|
for f in sorted((self.QUEUE_BASE / "pending" / "high").glob("*.json")):
|
|
if task_id in f.name:
|
|
return position
|
|
position += 1
|
|
|
|
# Normal priority tasks (only count if task is in normal)
|
|
if tier == "normal":
|
|
for f in sorted((self.QUEUE_BASE / "pending" / "normal").glob("*.json")):
|
|
if task_id in f.name:
|
|
return position
|
|
position += 1
|
|
|
|
return position
|
|
|
|
# --- Capacity Check ---
|
|
|
|
def _has_capacity(self, capacity: Dict) -> bool:
|
|
"""Check if system has capacity for new task."""
|
|
cpu_count = capacity["system"].get("cpu_count", 4)
|
|
max_load = self.config["max_cpu_load"] * cpu_count
|
|
|
|
return (
|
|
capacity["slots"]["available"] > 0
|
|
and capacity["system"]["load_5m"] < max_load
|
|
and capacity["system"]["memory_used_pct"] < self.config["max_memory_pct"]
|
|
)
|
|
|
|
# --- Fair Share Selection ---
|
|
|
|
def _get_pending_tasks(self) -> List[Dict]:
|
|
"""Get all pending tasks sorted by priority and timestamp."""
|
|
tasks = []
|
|
|
|
# High priority first
|
|
for f in sorted((self.QUEUE_BASE / "pending" / "high").glob("*.json")):
|
|
task = self._read_json_safe(f)
|
|
if task:
|
|
task["_path"] = str(f)
|
|
tasks.append(task)
|
|
|
|
# Then normal priority
|
|
for f in sorted((self.QUEUE_BASE / "pending" / "normal").glob("*.json")):
|
|
task = self._read_json_safe(f)
|
|
if task:
|
|
task["_path"] = str(f)
|
|
tasks.append(task)
|
|
|
|
return tasks
|
|
|
|
def _select_next_task(self, capacity: Dict) -> Optional[Dict]:
|
|
"""Fair share task selection across projects."""
|
|
pending = self._get_pending_tasks()
|
|
if not pending:
|
|
return None
|
|
|
|
active_by_project = capacity.get("by_project", {})
|
|
max_per_project = self.config["fair_share"]["max_per_project"]
|
|
|
|
if not self.config["fair_share"]["enabled"]:
|
|
# No fair share: just take the first pending task
|
|
return pending[0]
|
|
|
|
# Group by project, filter those at limit
|
|
eligible = {}
|
|
for task in pending:
|
|
project = task["project"]
|
|
if active_by_project.get(project, 0) < max_per_project:
|
|
if project not in eligible:
|
|
eligible[project] = []
|
|
eligible[project].append(task)
|
|
|
|
if not eligible:
|
|
return None
|
|
|
|
# Pick project with fewest active tasks, then oldest task from that project
|
|
project = min(eligible.keys(), key=lambda p: active_by_project.get(p, 0))
|
|
return min(eligible[project], key=lambda t: t.get("enqueued_at", ""))
|
|
|
|
# --- Dispatch ---
|
|
|
|
def _dispatch(self, task: Dict) -> bool:
|
|
"""
|
|
Dispatch task to conductor and spawn container.
|
|
|
|
Uses atomic task claiming to prevent race conditions:
|
|
1. Try to rename task file to .dispatching (atomic claim)
|
|
2. If rename fails, another controller claimed it
|
|
3. Only proceed with dispatch if claim succeeded
|
|
|
|
Returns True if dispatch succeeded.
|
|
"""
|
|
project = task["project"]
|
|
task_id = task["id"]
|
|
task_path = Path(task.get("_path", ""))
|
|
|
|
# SECURITY: Validate project name before using in path
|
|
if not validate_project_name(project):
|
|
print(f"[queue] Invalid project name: {project}")
|
|
return False
|
|
|
|
# Atomic claim: try to rename task file to .dispatching
|
|
# This prevents race conditions where two controllers try to dispatch same task
|
|
if task_path.exists():
|
|
dispatching_path = task_path.with_suffix(".json.dispatching")
|
|
try:
|
|
os.rename(task_path, dispatching_path)
|
|
except FileNotFoundError:
|
|
# Another controller already claimed this task
|
|
print(f"[queue] Task {task_id} already claimed by another controller")
|
|
return False
|
|
except OSError as e:
|
|
print(f"[queue] Failed to claim task {task_id}: {e}")
|
|
return False
|
|
else:
|
|
print(f"[queue] Task file not found: {task_path}")
|
|
return False
|
|
|
|
# Create conductor directory
|
|
conductor_dir = Path(f"/home/{project}/conductor/active/{task_id}")
|
|
try:
|
|
conductor_dir.mkdir(parents=True, exist_ok=True)
|
|
except PermissionError:
|
|
# Unclaim task on failure
|
|
try:
|
|
os.rename(dispatching_path, task_path)
|
|
except OSError:
|
|
pass
|
|
print(f"[queue] Cannot create conductor dir for {project}: permission denied")
|
|
return False
|
|
|
|
# Write meta.json to conductor
|
|
meta = {
|
|
"id": task_id,
|
|
"prompt": task["prompt"],
|
|
"started": datetime.now().isoformat(),
|
|
"status": "running",
|
|
"skill": task.get("skill_match"),
|
|
"zen_continuation_id": None,
|
|
"dispatched_by": task.get("enqueued_by", "queue"),
|
|
"priority": task.get("priority", 5),
|
|
}
|
|
self._atomic_write_json(conductor_dir / "meta.json", meta)
|
|
|
|
# Write initial heartbeat
|
|
heartbeat = {"ts": time.time(), "step": "Starting task"}
|
|
self._atomic_write_json(conductor_dir / "heartbeat.json", heartbeat)
|
|
|
|
# Write initial progress
|
|
progress_md = f"""# Progress: {task_id}
|
|
|
|
## Milestones
|
|
- [ ] Task started
|
|
- [ ] Implementation in progress
|
|
- [ ] Testing
|
|
- [ ] Completed
|
|
|
|
## Current Status
|
|
Task dispatched from queue.
|
|
Last update: {datetime.now().strftime('%Y-%m-%d %H:%M')}
|
|
"""
|
|
(conductor_dir / "progress.md").write_text(progress_md)
|
|
|
|
# Create dialogue directory
|
|
(conductor_dir / "dialogue").mkdir(exist_ok=True)
|
|
|
|
# Remove from queue (delete the .dispatching file we claimed earlier)
|
|
try:
|
|
dispatching_path.unlink()
|
|
except FileNotFoundError:
|
|
pass # Already cleaned up
|
|
|
|
# Update capacity
|
|
capacity = self._read_capacity()
|
|
by_project = capacity.get("by_project", {})
|
|
by_project[project] = by_project.get(project, 0) + 1
|
|
self._update_capacity({
|
|
"slots": {"used": capacity["slots"]["used"] + 1},
|
|
"by_project": by_project,
|
|
})
|
|
|
|
print(f"[queue] Dispatched {task_id} to {project}")
|
|
|
|
# Spawn the actual agent via luzia's spawn_claude_agent
|
|
job_id = self._spawn_agent(project, task, conductor_dir)
|
|
if job_id:
|
|
# Update conductor meta with job linkage
|
|
meta["job_id"] = job_id
|
|
meta["status"] = "running"
|
|
self._atomic_write_json(conductor_dir / "meta.json", meta)
|
|
print(f"[queue] Spawned agent job {job_id} for task {task_id}")
|
|
else:
|
|
print(f"[queue] Warning: Agent spawn failed for {task_id}")
|
|
# Task is dispatched to conductor but agent didn't start
|
|
# Watchdog will detect this via missing heartbeat updates
|
|
|
|
return True
|
|
|
|
def _spawn_agent(self, project: str, task: Dict, conductor_dir: Path) -> Optional[str]:
|
|
"""
|
|
Spawn Claude agent for the task using luzia infrastructure.
|
|
|
|
NON-BLOCKING: Uses Popen to spawn agent in background instead of blocking with run().
|
|
This allows the dispatcher to spawn multiple tasks in quick succession.
|
|
|
|
Returns job_id if spawn started successfully, None otherwise.
|
|
"""
|
|
try:
|
|
# Import luzia spawn function dynamically
|
|
import sys
|
|
luzia_bin = Path("/opt/server-agents/orchestrator/bin")
|
|
if str(luzia_bin) not in sys.path:
|
|
sys.path.insert(0, str(luzia_bin))
|
|
|
|
# We can't import luzia directly (it's a script), so use subprocess
|
|
import subprocess
|
|
|
|
prompt = task.get("prompt", "")
|
|
skill = task.get("skill_match", "")
|
|
|
|
# Build the luzia command
|
|
# Use 'luzia <project> <task>' which routes through spawn_claude_agent
|
|
cmd = [
|
|
"/opt/server-agents/orchestrator/bin/luzia",
|
|
project,
|
|
prompt
|
|
]
|
|
|
|
# NON-BLOCKING: Use Popen instead of run() to spawn in background
|
|
# This prevents the dispatcher loop from blocking on each dispatch
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
env={
|
|
**os.environ,
|
|
"LUZIA_QUEUE_DISPATCH": "1", # Signal this is from queue
|
|
"LUZIA_CONDUCTOR_DIR": str(conductor_dir),
|
|
}
|
|
)
|
|
|
|
# Generate job_id immediately instead of waiting for subprocess output
|
|
# Format: agent:project:HHMMSS-xxxx
|
|
import time
|
|
job_id = f"agent:{project}:{time.strftime('%H%M%S')}-{task.get('id', 'unknown')[:8]}"
|
|
|
|
print(f"[queue] Spawned agent {job_id} in background (PID {proc.pid})")
|
|
return job_id
|
|
|
|
except Exception as e:
|
|
print(f"[queue] Spawn error: {e}")
|
|
return None
|
|
|
|
# --- Daemon Loop ---
|
|
|
|
def run_loop(self):
|
|
"""Main daemon loop - poll and dispatch."""
|
|
print(f"[queue] Starting queue controller daemon")
|
|
print(f"[queue] Config: max_slots={self.config['max_concurrent_slots']}, "
|
|
f"max_load={self.config['max_cpu_load']}, "
|
|
f"max_mem={self.config['max_memory_pct']}%")
|
|
|
|
poll_interval = self.config["poll_interval_ms"] / 1000
|
|
backpressure_sleep = self.config.get("backpressure", {}).get("sleep_ms", 5000) / 1000
|
|
|
|
while True:
|
|
try:
|
|
capacity = self._update_capacity({}) # Refresh system stats
|
|
|
|
if self._has_capacity(capacity):
|
|
task = self._select_next_task(capacity)
|
|
if task:
|
|
self._dispatch(task)
|
|
else:
|
|
# Backpressure: sleep longer when overloaded
|
|
if self.config.get("backpressure", {}).get("enabled"):
|
|
time.sleep(backpressure_sleep)
|
|
continue
|
|
|
|
time.sleep(poll_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n[queue] Shutting down...")
|
|
break
|
|
except Exception as e:
|
|
print(f"[queue] Error in loop: {e}")
|
|
time.sleep(poll_interval * 5) # Back off on errors
|
|
|
|
# --- Queue Status ---
|
|
|
|
def get_queue_status(self, project: str = None) -> Dict[str, Any]:
|
|
"""Get queue status for display."""
|
|
capacity = self._read_capacity()
|
|
pending_high = list((self.QUEUE_BASE / "pending" / "high").glob("*.json"))
|
|
pending_normal = list((self.QUEUE_BASE / "pending" / "normal").glob("*.json"))
|
|
|
|
# Filter by project if specified
|
|
if project:
|
|
pending_high = [f for f in pending_high if f"_{project}_" in f.name]
|
|
pending_normal = [f for f in pending_normal if f"_{project}_" in f.name]
|
|
|
|
# Load task details
|
|
high_tasks = [self._read_json_safe(f) for f in sorted(pending_high)]
|
|
normal_tasks = [self._read_json_safe(f) for f in sorted(pending_normal)]
|
|
|
|
return {
|
|
"pending": {
|
|
"high": len(high_tasks),
|
|
"normal": len(normal_tasks),
|
|
"total": len(high_tasks) + len(normal_tasks),
|
|
},
|
|
"active": {
|
|
"slots_used": capacity["slots"]["used"],
|
|
"slots_max": capacity["slots"]["max"],
|
|
"by_project": capacity.get("by_project", {}),
|
|
},
|
|
"system": {
|
|
"load": capacity["system"]["load_5m"],
|
|
"memory_pct": capacity["system"]["memory_used_pct"],
|
|
},
|
|
"tasks": {
|
|
"high": high_tasks,
|
|
"normal": normal_tasks,
|
|
},
|
|
}
|
|
|
|
def clear_queue(self, project: str = None) -> int:
|
|
"""Clear pending tasks. Returns count of cleared tasks."""
|
|
count = 0
|
|
for tier in ["high", "normal"]:
|
|
for f in (self.QUEUE_BASE / "pending" / tier).glob("*.json"):
|
|
if project and f"_{project}_" not in f.name:
|
|
continue
|
|
f.unlink()
|
|
count += 1
|
|
return count
|
|
|
|
|
|
# CLI interface
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
qc = QueueController()
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage:")
|
|
print(" queue_controller.py daemon Run queue daemon")
|
|
print(" queue_controller.py status Show queue status")
|
|
print(" queue_controller.py enqueue <project> <prompt> [priority]")
|
|
print(" queue_controller.py clear [project]")
|
|
sys.exit(0)
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "daemon":
|
|
qc.run_loop()
|
|
elif cmd == "status":
|
|
status = qc.get_queue_status()
|
|
print(json.dumps(status, indent=2))
|
|
elif cmd == "enqueue" and len(sys.argv) >= 4:
|
|
project = sys.argv[2]
|
|
prompt = sys.argv[3]
|
|
priority = int(sys.argv[4]) if len(sys.argv) > 4 else 5
|
|
task_id, position = qc.enqueue(project, prompt, priority)
|
|
print(f"Task {task_id} queued (position {position})")
|
|
elif cmd == "clear":
|
|
project = sys.argv[2] if len(sys.argv) > 2 else None
|
|
count = qc.clear_queue(project)
|
|
print(f"Cleared {count} tasks")
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
sys.exit(1)
|