Files
luzia/lib/project_queue_scheduler.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

315 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Project Queue Scheduler - Per-project sequential task execution
Extends QueueController to implement:
- Per-project sequential execution (max 1 task per project at a time)
- Parallel execution across different projects
- Fair project rotation (prevents starvation)
- Project-aware task selection
Architecture:
QueueController: Global queue management (priority, capacity, fair share)
ProjectQueueScheduler: Project-based sequencing layer
├─ Track active tasks per project
├─ Fair rotation among projects
└─ Sequential selection logic
"""
import json
import os
import subprocess
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
# Assume QueueController is in the same directory
try:
from queue_controller import QueueController
except ImportError:
raise ImportError("queue_controller.py not found in lib directory")
class ProjectQueueScheduler(QueueController):
"""
Scheduler that ensures per-project sequential task execution
while allowing parallel execution across projects.
"""
PROJECT_STATE_FILE = Path("/var/lib/luzia/queue/projects_state.json")
def __init__(self):
"""Initialize scheduler with project state tracking."""
super().__init__()
self.project_state = self._load_project_state()
def _load_project_state(self) -> Dict[str, Any]:
"""Load or initialize project state tracking."""
if self.PROJECT_STATE_FILE.exists():
try:
return json.loads(self.PROJECT_STATE_FILE.read_text())
except (json.JSONDecodeError, IOError):
pass
return {
"active_tasks": {}, # project → task_id
"last_served": None, # Last project to get a task (for rotation)
"updated_at": datetime.now().isoformat()
}
def _save_project_state(self) -> None:
"""Save project state atomically."""
self.project_state["updated_at"] = datetime.now().isoformat()
tmp_path = self.PROJECT_STATE_FILE.with_suffix(".json.tmp")
self.PROJECT_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(tmp_path, "w") as f:
json.dump(self.project_state, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.rename(tmp_path, self.PROJECT_STATE_FILE)
def get_project_queue_status(self, project: str) -> Dict[str, Any]:
"""Get queue status for a specific project."""
# Get all pending tasks for this project
pending_tasks = []
for tier in ["high", "normal"]:
tier_dir = self.QUEUE_BASE / "pending" / tier
if tier_dir.exists():
for task_file in sorted(tier_dir.glob("*.json")):
if f"_{project}_" in task_file.name:
try:
task = json.loads(task_file.read_text())
if task.get("project") == project:
pending_tasks.append(task)
except (json.JSONDecodeError, IOError):
pass
# Check if project has active task
active_task_id = self.project_state["active_tasks"].get(project)
active_task = None
if active_task_id:
# Try to find the active task to get details
active_task = {"id": active_task_id, "status": "running"}
return {
"project": project,
"pending_count": len(pending_tasks),
"pending_tasks": pending_tasks[:10], # First 10
"active_task": active_task,
"is_running": active_task_id is not None,
"queued_at": datetime.now().isoformat()
}
def select_next_executable_task(self) -> Optional[Dict[str, Any]]:
"""
Select next task respecting per-project sequencing.
Algorithm:
1. Get all pending tasks (by priority)
2. Get list of projects with active tasks
3. For each pending task:
- If project has NO active task → CAN_RUN
- If project has active task → SKIP (wait for completion)
4. Round-robin project selection for fairness
5. Return first available task respecting round-robin
Returns:
Task dict with all details, or None if no task can run
"""
# Refresh project state (clean up completed tasks if needed)
self._cleanup_completed_tasks()
# Get all pending tasks (high priority first, then normal, then by timestamp)
all_tasks = self._get_pending_tasks_ordered()
if not all_tasks:
return None # No pending tasks
# Get projects with active tasks
active_projects = set(self.project_state["active_tasks"].keys())
# Find first project in round-robin order that can execute a task
last_served = self.project_state.get("last_served")
# Create rotation list: projects with pending tasks
projects_with_tasks = {}
for task in all_tasks:
project = task.get("project")
if project not in projects_with_tasks:
projects_with_tasks[project] = []
projects_with_tasks[project].append(task)
# Sort projects for round-robin starting after last_served
project_list = sorted(projects_with_tasks.keys())
if last_served and last_served in project_list:
# Rotate: projects after last_served, then before
idx = project_list.index(last_served)
project_list = project_list[idx + 1:] + project_list[:idx + 1]
# Find first project with available slot
for project in project_list:
if project not in active_projects:
# Project has no active tasks, can run next task
task = projects_with_tasks[project][0]
# Update round-robin
self.project_state["last_served"] = project
self._save_project_state()
return task
# All projects with pending tasks have active tasks
return None
def _cleanup_completed_tasks(self) -> None:
"""Clean up completed tasks from active_tasks tracking."""
# Check if active tasks are still running
# If they completed, remove them from tracking
completed = []
for project, task_id in list(self.project_state["active_tasks"].items()):
# Try to find the task file (it should be in completed/failed if done)
# For now, we rely on external process to mark completion
# This will be called by the dispatcher after task completes
pass
for project in completed:
del self.project_state["active_tasks"][project]
if completed:
self._save_project_state()
def _get_pending_tasks_ordered(self) -> List[Dict[str, Any]]:
"""Get all pending tasks ordered by priority and timestamp."""
tasks = []
# High priority tasks first
high_dir = self.QUEUE_BASE / "pending" / "high"
if high_dir.exists():
for task_file in sorted(high_dir.glob("*.json")):
try:
task = json.loads(task_file.read_text())
tasks.append(task)
except (json.JSONDecodeError, IOError):
pass
# Then normal priority tasks
normal_dir = self.QUEUE_BASE / "pending" / "normal"
if normal_dir.exists():
for task_file in sorted(normal_dir.glob("*.json")):
try:
task = json.loads(task_file.read_text())
tasks.append(task)
except (json.JSONDecodeError, IOError):
pass
return tasks
def claim_task(self, task_id: str, project: str) -> bool:
"""
Mark a task as claimed (now running) for this project.
Args:
task_id: Task identifier
project: Project name
Returns:
True if successfully claimed, False if project already has active task
"""
if project in self.project_state["active_tasks"]:
return False # Project already has active task
self.project_state["active_tasks"][project] = task_id
self._save_project_state()
return True
def release_task(self, project: str) -> bool:
"""
Mark a task as complete for this project (release the slot).
Args:
project: Project name
Returns:
True if successfully released, False if no active task
"""
if project not in self.project_state["active_tasks"]:
return False
del self.project_state["active_tasks"][project]
self._save_project_state()
return True
def get_scheduling_status(self) -> Dict[str, Any]:
"""Get current scheduling status across all projects."""
all_tasks = self._get_pending_tasks_ordered()
# Count by project
pending_by_project = {}
for task in all_tasks:
project = task.get("project", "unknown")
if project not in pending_by_project:
pending_by_project[project] = 0
pending_by_project[project] += 1
active_projects = self.project_state.get("active_tasks", {})
return {
"timestamp": datetime.now().isoformat(),
"total_pending": len(all_tasks),
"pending_by_project": pending_by_project,
"active_tasks": active_projects,
"active_count": len(active_projects),
"last_served": self.project_state.get("last_served"),
"scheduling_algorithm": "per-project-sequential",
"max_concurrent_slots": self.config.get("max_concurrent_slots", 4)
}
def main():
"""Test harness for ProjectQueueScheduler"""
import json
scheduler = ProjectQueueScheduler()
print("=" * 60)
print("PROJECT QUEUE SCHEDULER STATUS")
print("=" * 60)
status = scheduler.get_scheduling_status()
print(json.dumps(status, indent=2))
print("\n" + "=" * 60)
print("PROJECT-SPECIFIC STATUSES")
print("=" * 60)
for project in status["pending_by_project"].keys():
proj_status = scheduler.get_project_queue_status(project)
print(f"\n{project}:")
print(f" Pending: {proj_status['pending_count']}")
print(f" Active: {proj_status['is_running']}")
print("\n" + "=" * 60)
print("NEXT EXECUTABLE TASK")
print("=" * 60)
next_task = scheduler.select_next_executable_task()
if next_task:
print(f"Project: {next_task['project']}")
print(f"Task ID: {next_task['id']}")
print(f"Prompt: {next_task['prompt'][:100]}...")
else:
print("No tasks available to execute")
if __name__ == "__main__":
main()