#!/usr/bin/env python3 """ Project Queue Scheduler - Per-project sequential task execution Extends QueueController to implement: - Per-project sequential execution (max 1 task per project at a time) - Parallel execution across different projects - Fair project rotation (prevents starvation) - Project-aware task selection Architecture: QueueController: Global queue management (priority, capacity, fair share) ↓ ProjectQueueScheduler: Project-based sequencing layer ├─ Track active tasks per project ├─ Fair rotation among projects └─ Sequential selection logic """ import json import os import subprocess import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any # Assume QueueController is in the same directory try: from queue_controller import QueueController except ImportError: raise ImportError("queue_controller.py not found in lib directory") class ProjectQueueScheduler(QueueController): """ Scheduler that ensures per-project sequential task execution while allowing parallel execution across projects. """ PROJECT_STATE_FILE = Path("/var/lib/luzia/queue/projects_state.json") def __init__(self): """Initialize scheduler with project state tracking.""" super().__init__() self.project_state = self._load_project_state() def _load_project_state(self) -> Dict[str, Any]: """Load or initialize project state tracking.""" if self.PROJECT_STATE_FILE.exists(): try: return json.loads(self.PROJECT_STATE_FILE.read_text()) except (json.JSONDecodeError, IOError): pass return { "active_tasks": {}, # project → task_id "last_served": None, # Last project to get a task (for rotation) "updated_at": datetime.now().isoformat() } def _save_project_state(self) -> None: """Save project state atomically.""" self.project_state["updated_at"] = datetime.now().isoformat() tmp_path = self.PROJECT_STATE_FILE.with_suffix(".json.tmp") self.PROJECT_STATE_FILE.parent.mkdir(parents=True, exist_ok=True) with open(tmp_path, "w") as f: json.dump(self.project_state, f, indent=2) f.flush() os.fsync(f.fileno()) os.rename(tmp_path, self.PROJECT_STATE_FILE) def get_project_queue_status(self, project: str) -> Dict[str, Any]: """Get queue status for a specific project.""" # Get all pending tasks for this project pending_tasks = [] for tier in ["high", "normal"]: tier_dir = self.QUEUE_BASE / "pending" / tier if tier_dir.exists(): for task_file in sorted(tier_dir.glob("*.json")): if f"_{project}_" in task_file.name: try: task = json.loads(task_file.read_text()) if task.get("project") == project: pending_tasks.append(task) except (json.JSONDecodeError, IOError): pass # Check if project has active task active_task_id = self.project_state["active_tasks"].get(project) active_task = None if active_task_id: # Try to find the active task to get details active_task = {"id": active_task_id, "status": "running"} return { "project": project, "pending_count": len(pending_tasks), "pending_tasks": pending_tasks[:10], # First 10 "active_task": active_task, "is_running": active_task_id is not None, "queued_at": datetime.now().isoformat() } def select_next_executable_task(self) -> Optional[Dict[str, Any]]: """ Select next task respecting per-project sequencing. Algorithm: 1. Get all pending tasks (by priority) 2. Get list of projects with active tasks 3. For each pending task: - If project has NO active task → CAN_RUN - If project has active task → SKIP (wait for completion) 4. Round-robin project selection for fairness 5. Return first available task respecting round-robin Returns: Task dict with all details, or None if no task can run """ # Refresh project state (clean up completed tasks if needed) self._cleanup_completed_tasks() # Get all pending tasks (high priority first, then normal, then by timestamp) all_tasks = self._get_pending_tasks_ordered() if not all_tasks: return None # No pending tasks # Get projects with active tasks active_projects = set(self.project_state["active_tasks"].keys()) # Find first project in round-robin order that can execute a task last_served = self.project_state.get("last_served") # Create rotation list: projects with pending tasks projects_with_tasks = {} for task in all_tasks: project = task.get("project") if project not in projects_with_tasks: projects_with_tasks[project] = [] projects_with_tasks[project].append(task) # Sort projects for round-robin starting after last_served project_list = sorted(projects_with_tasks.keys()) if last_served and last_served in project_list: # Rotate: projects after last_served, then before idx = project_list.index(last_served) project_list = project_list[idx + 1:] + project_list[:idx + 1] # Find first project with available slot for project in project_list: if project not in active_projects: # Project has no active tasks, can run next task task = projects_with_tasks[project][0] # Update round-robin self.project_state["last_served"] = project self._save_project_state() return task # All projects with pending tasks have active tasks return None def _cleanup_completed_tasks(self) -> None: """Clean up completed tasks from active_tasks tracking.""" # Check if active tasks are still running # If they completed, remove them from tracking completed = [] for project, task_id in list(self.project_state["active_tasks"].items()): # Try to find the task file (it should be in completed/failed if done) # For now, we rely on external process to mark completion # This will be called by the dispatcher after task completes pass for project in completed: del self.project_state["active_tasks"][project] if completed: self._save_project_state() def _get_pending_tasks_ordered(self) -> List[Dict[str, Any]]: """Get all pending tasks ordered by priority and timestamp.""" tasks = [] # High priority tasks first high_dir = self.QUEUE_BASE / "pending" / "high" if high_dir.exists(): for task_file in sorted(high_dir.glob("*.json")): try: task = json.loads(task_file.read_text()) tasks.append(task) except (json.JSONDecodeError, IOError): pass # Then normal priority tasks normal_dir = self.QUEUE_BASE / "pending" / "normal" if normal_dir.exists(): for task_file in sorted(normal_dir.glob("*.json")): try: task = json.loads(task_file.read_text()) tasks.append(task) except (json.JSONDecodeError, IOError): pass return tasks def claim_task(self, task_id: str, project: str) -> bool: """ Mark a task as claimed (now running) for this project. Args: task_id: Task identifier project: Project name Returns: True if successfully claimed, False if project already has active task """ if project in self.project_state["active_tasks"]: return False # Project already has active task self.project_state["active_tasks"][project] = task_id self._save_project_state() return True def release_task(self, project: str) -> bool: """ Mark a task as complete for this project (release the slot). Args: project: Project name Returns: True if successfully released, False if no active task """ if project not in self.project_state["active_tasks"]: return False del self.project_state["active_tasks"][project] self._save_project_state() return True def get_scheduling_status(self) -> Dict[str, Any]: """Get current scheduling status across all projects.""" all_tasks = self._get_pending_tasks_ordered() # Count by project pending_by_project = {} for task in all_tasks: project = task.get("project", "unknown") if project not in pending_by_project: pending_by_project[project] = 0 pending_by_project[project] += 1 active_projects = self.project_state.get("active_tasks", {}) return { "timestamp": datetime.now().isoformat(), "total_pending": len(all_tasks), "pending_by_project": pending_by_project, "active_tasks": active_projects, "active_count": len(active_projects), "last_served": self.project_state.get("last_served"), "scheduling_algorithm": "per-project-sequential", "max_concurrent_slots": self.config.get("max_concurrent_slots", 4) } def main(): """Test harness for ProjectQueueScheduler""" import json scheduler = ProjectQueueScheduler() print("=" * 60) print("PROJECT QUEUE SCHEDULER STATUS") print("=" * 60) status = scheduler.get_scheduling_status() print(json.dumps(status, indent=2)) print("\n" + "=" * 60) print("PROJECT-SPECIFIC STATUSES") print("=" * 60) for project in status["pending_by_project"].keys(): proj_status = scheduler.get_project_queue_status(project) print(f"\n{project}:") print(f" Pending: {proj_status['pending_count']}") print(f" Active: {proj_status['is_running']}") print("\n" + "=" * 60) print("NEXT EXECUTABLE TASK") print("=" * 60) next_task = scheduler.select_next_executable_task() if next_task: print(f"Project: {next_task['project']}") print(f"Task ID: {next_task['id']}") print(f"Prompt: {next_task['prompt'][:100]}...") else: print("No tasks available to execute") if __name__ == "__main__": main()