Files
luzia/tests/test_per_user_queue.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

288 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Test Per-User Queue System
Tests:
1. Per-user lock acquisition and release
2. Lock timeout and cleanup
3. Queue controller with per-user serialization
4. Fair scheduling respects per-user locks
5. Conductor lock cleanup
"""
import sys
import json
import time
from pathlib import Path
from datetime import datetime, timedelta
# Add lib to path
lib_path = Path(__file__).parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from per_user_queue_manager import PerUserQueueManager
from queue_controller_v2 import QueueControllerV2
from conductor_lock_cleanup import ConductorLockCleanup
def test_per_user_lock_basic():
"""Test basic lock acquire and release."""
print("\n=== Test: Basic Lock Acquire/Release ===")
manager = PerUserQueueManager()
user = "testuser"
task_id = "task_123"
# Acquire lock
acquired, lock_id = manager.acquire_lock(user, task_id)
assert acquired, f"Failed to acquire lock for {user}"
assert lock_id, "Lock ID should not be None"
print(f"✓ Acquired lock: user={user}, lock_id={lock_id}")
# Check lock is active
assert manager.is_user_locked(user), "User should be locked"
print(f"✓ User is locked")
# Get lock info
lock_info = manager.get_lock_info(user)
assert lock_info, "Should return lock info"
assert lock_info["user"] == user
print(f"✓ Lock info retrieved: {lock_info['lock_id']}")
# Release lock
released = manager.release_lock(user, lock_id)
assert released, "Failed to release lock"
print(f"✓ Released lock")
# Check lock is gone
assert not manager.is_user_locked(user), "User should not be locked"
print(f"✓ Lock released successfully")
def test_concurrent_lock_contention():
"""Test that only one lock per user can be held."""
print("\n=== Test: Concurrent Lock Contention ===")
manager = PerUserQueueManager()
user = "contentionuser"
# Acquire first lock
acquired1, lock_id1 = manager.acquire_lock(user, "task_1", timeout=1)
assert acquired1, "First lock should succeed"
print(f"✓ First lock acquired: {lock_id1}")
# Try to acquire second lock (should timeout)
acquired2, lock_id2 = manager.acquire_lock(user, "task_2", timeout=1)
assert not acquired2, "Second lock should fail due to contention"
assert lock_id2 is None
print(f"✓ Second lock correctly rejected (contention)")
# Release first lock
manager.release_lock(user, lock_id1)
print(f"✓ First lock released")
# Now second should succeed
acquired3, lock_id3 = manager.acquire_lock(user, "task_2", timeout=1)
assert acquired3, "Third lock should succeed after release"
print(f"✓ Third lock acquired after release: {lock_id3}")
manager.release_lock(user, lock_id3)
def test_stale_lock_cleanup():
"""Test stale lock detection and cleanup."""
print("\n=== Test: Stale Lock Cleanup ===")
manager = PerUserQueueManager()
user = "staleuser"
# Acquire lock with custom timeout
acquired, lock_id = manager.acquire_lock(user, "task_stale")
assert acquired
print(f"✓ Lock acquired: {lock_id}")
# Manually set lock as expired
lock_meta_path = manager._get_lock_meta_path(user)
meta = json.loads(lock_meta_path.read_text())
meta["expires_at"] = (datetime.now() - timedelta(hours=1)).isoformat()
lock_meta_path.write_text(json.dumps(meta))
print(f"✓ Lock manually set as stale")
# Should be detected as stale
assert manager._is_lock_stale(user), "Lock should be detected as stale"
print(f"✓ Stale lock detected")
# Cleanup should remove it
manager._cleanup_stale_locks(user)
assert not manager.is_user_locked(user), "Stale lock should be cleaned up"
print(f"✓ Stale lock cleaned up")
def test_multiple_users():
"""Test that different users have independent locks."""
print("\n=== Test: Multiple Users Independence ===")
manager = PerUserQueueManager()
# Acquire locks for different users
acquired1, lock_id1 = manager.acquire_lock("user_a", "task_a")
acquired2, lock_id2 = manager.acquire_lock("user_b", "task_b")
assert acquired1 and acquired2, "Both locks should succeed"
print(f"✓ Acquired locks for user_a and user_b")
# Both should be locked
assert manager.is_user_locked("user_a"), "user_a should be locked"
assert manager.is_user_locked("user_b"), "user_b should be locked"
print(f"✓ Both users are locked")
# Release user_a's lock
manager.release_lock("user_a", lock_id1)
assert not manager.is_user_locked("user_a"), "user_a should be unlocked"
assert manager.is_user_locked("user_b"), "user_b should still be locked"
print(f"✓ user_a released, user_b still locked")
manager.release_lock("user_b", lock_id2)
def test_queue_controller_v2():
"""Test QueueControllerV2 with per-user serialization."""
print("\n=== Test: QueueControllerV2 Integration ===")
qc = QueueControllerV2()
# Ensure per-user serialization is in config and enabled for testing
if "per_user_serialization" not in qc.config:
qc.config["per_user_serialization"] = {"enabled": True, "lock_timeout_seconds": 3600}
qc.config["per_user_serialization"]["enabled"] = True
# Enqueue tasks for different projects (users)
task_id_1, pos_1 = qc.enqueue("project_a", "Task 1 for project A")
task_id_2, pos_2 = qc.enqueue("project_b", "Task 1 for project B")
task_id_3, pos_3 = qc.enqueue("project_a", "Task 2 for project A")
print(f"✓ Enqueued 3 tasks")
print(f" - project_a: {task_id_1} (pos {pos_1}), {task_id_3} (pos {pos_3})")
print(f" - project_b: {task_id_2} (pos {pos_2})")
# Get queue status
status = qc.get_queue_status()
initial_pending = status["pending"]["total"]
assert initial_pending >= 3, f"Should have at least 3 pending tasks, have {initial_pending}"
print(f"✓ Queue status: {initial_pending} total pending tasks (at least 3 new ones)")
# Check that per-user locks are respected
user_a = qc.extract_user_from_project("project_a")
user_b = qc.extract_user_from_project("project_b")
can_exec_a = qc.can_user_execute_task(user_a)
can_exec_b = qc.can_user_execute_task(user_b)
assert can_exec_a and can_exec_b, "Both users should be able to execute"
print(f"✓ Both users can execute tasks")
# Acquire locks
acq_a, lock_a = qc.acquire_user_lock(user_a, task_id_1)
assert acq_a and lock_a, "Should acquire lock for user_a"
print(f"✓ Acquired lock for user_a: {lock_a}")
# Now user_a cannot execute another task
can_exec_a2 = qc.can_user_execute_task(user_a)
assert not can_exec_a2, "user_a should not be able to execute while locked"
print(f"✓ user_a locked, cannot execute new tasks")
# But user_b can
can_exec_b2 = qc.can_user_execute_task(user_b)
assert can_exec_b2, "user_b should still be able to execute"
print(f"✓ user_b can still execute")
# Release user_a's lock
qc.release_user_lock(user_a, lock_a)
can_exec_a3 = qc.can_user_execute_task(user_a)
assert can_exec_a3, "user_a should be able to execute again"
print(f"✓ Released user_a lock, can execute again")
def test_fair_scheduling_with_locks():
"""Test that fair scheduling respects per-user locks."""
print("\n=== Test: Fair Scheduling with Per-User Locks ===")
qc = QueueControllerV2()
# Ensure per-user serialization is in config and enabled for testing
if "per_user_serialization" not in qc.config:
qc.config["per_user_serialization"] = {"enabled": True, "lock_timeout_seconds": 3600}
qc.config["per_user_serialization"]["enabled"] = True
# Enqueue multiple tasks
task_id_1, _ = qc.enqueue("proj_a", "Task A1", priority=5)
task_id_2, _ = qc.enqueue("proj_b", "Task B1", priority=5)
task_id_3, _ = qc.enqueue("proj_a", "Task A2", priority=5)
# Get pending tasks
capacity = qc._read_capacity()
task = qc._select_next_task(capacity)
assert task, "Should select a task"
print(f"✓ Selected task: {task['id']} for {task['project']}")
# Acquire lock for this task's user
user = task.get("user") or qc.extract_user_from_project(task["project"])
acq, lock_id = qc.acquire_user_lock(user, task["id"])
assert acq, "Should acquire user lock"
# Now selecting next task should skip tasks for this user
# and select from another user
task2 = qc._select_next_task(capacity)
if task2:
user2 = task2.get("user") or qc.extract_user_from_project(task2["project"])
# Task should be from a different user or None
assert user2 != user, f"Should select different user, got {user2}"
print(f"✓ Fair scheduling respects user lock: skipped {user}, selected {user2}")
else:
print(f"✓ Fair scheduling: no available task (all from locked user)")
qc.release_user_lock(user, lock_id)
def run_all_tests():
"""Run all tests."""
print("=" * 60)
print("Per-User Queue System Tests")
print("=" * 60)
tests = [
test_per_user_lock_basic,
test_concurrent_lock_contention,
test_stale_lock_cleanup,
test_multiple_users,
test_queue_controller_v2,
test_fair_scheduling_with_locks,
]
passed = 0
failed = 0
for test_func in tests:
try:
test_func()
passed += 1
except AssertionError as e:
print(f"✗ FAILED: {e}")
failed += 1
except Exception as e:
print(f"✗ ERROR: {e}")
failed += 1
print("\n" + "=" * 60)
print(f"Results: {passed} passed, {failed} failed")
print("=" * 60)
return failed == 0
if __name__ == "__main__":
success = run_all_tests()
sys.exit(0 if success else 1)