Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
286 lines
10 KiB
Python
286 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test Suite for Responsive Dispatcher
|
|
|
|
Tests:
|
|
1. Immediate job dispatch with job_id return
|
|
2. Non-blocking task spawning
|
|
3. Background status monitoring
|
|
4. Concurrent task handling
|
|
5. Status polling and updates
|
|
6. CLI feedback rendering
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
# Add lib to path
|
|
lib_path = Path(__file__).parent.parent / "lib"
|
|
sys.path.insert(0, str(lib_path))
|
|
|
|
from responsive_dispatcher import ResponseiveDispatcher
|
|
from cli_feedback import CLIFeedback, Colors, ProgressBar
|
|
from dispatcher_enhancements import EnhancedDispatcher, get_enhanced_dispatcher
|
|
|
|
|
|
class TestResponsiveDispatcher:
|
|
"""Test responsive dispatcher functionality"""
|
|
|
|
def __init__(self):
|
|
self.test_dir = Path(tempfile.mkdtemp(prefix="luzia_test_"))
|
|
self.dispatcher = ResponseiveDispatcher(self.test_dir)
|
|
self.feedback = CLIFeedback()
|
|
self.passed = 0
|
|
self.failed = 0
|
|
|
|
def run_all_tests(self):
|
|
"""Run all tests"""
|
|
print(f"\n{Colors.BOLD}=== Responsive Dispatcher Test Suite ==={Colors.RESET}\n")
|
|
|
|
tests = [
|
|
self.test_immediate_dispatch,
|
|
self.test_job_status_retrieval,
|
|
self.test_status_updates,
|
|
self.test_concurrent_jobs,
|
|
self.test_cache_behavior,
|
|
self.test_cli_feedback,
|
|
self.test_progress_bar,
|
|
self.test_background_monitoring,
|
|
]
|
|
|
|
for test in tests:
|
|
try:
|
|
print(f" Running {test.__name__}...", end=" ", flush=True)
|
|
test()
|
|
self.passed += 1
|
|
print(f"{Colors.GREEN}✓{Colors.RESET}")
|
|
except AssertionError as e:
|
|
self.failed += 1
|
|
print(f"{Colors.RED}✗{Colors.RESET}")
|
|
print(f" Error: {e}")
|
|
except Exception as e:
|
|
self.failed += 1
|
|
print(f"{Colors.RED}✗{Colors.RESET}")
|
|
print(f" Unexpected error: {e}")
|
|
|
|
# Summary
|
|
print(f"\n{Colors.BOLD}=== Test Summary ==={Colors.RESET}")
|
|
print(f" {Colors.GREEN}Passed:{Colors.RESET} {self.passed}")
|
|
print(f" {Colors.RED}Failed:{Colors.RESET} {self.failed}")
|
|
print(f" {Colors.BLUE}Total:{Colors.RESET} {self.passed + self.failed}\n")
|
|
|
|
return self.failed == 0
|
|
|
|
def test_immediate_dispatch(self):
|
|
"""Test that dispatch returns immediately with job_id"""
|
|
start_time = time.time()
|
|
job_id, status = self.dispatcher.dispatch_task("test_project", "echo hello")
|
|
elapsed = time.time() - start_time
|
|
|
|
assert job_id, "Job ID should be returned"
|
|
assert isinstance(status, dict), "Status should be dict"
|
|
assert status["status"] == "dispatched", "Initial status should be 'dispatched'"
|
|
assert status["project"] == "test_project", "Project should match"
|
|
assert elapsed < 0.5, f"Dispatch should be instant (took {elapsed}s)"
|
|
|
|
def test_job_status_retrieval(self):
|
|
"""Test retrieving job status"""
|
|
job_id, initial_status = self.dispatcher.dispatch_task("proj1", "task1")
|
|
|
|
# Retrieve status
|
|
retrieved = self.dispatcher.get_status(job_id)
|
|
assert retrieved is not None, "Status should be retrievable"
|
|
assert retrieved["id"] == job_id, "Job ID should match"
|
|
assert retrieved["status"] == "dispatched", "Status should be dispatched"
|
|
|
|
def test_status_updates(self):
|
|
"""Test updating job status"""
|
|
job_id, _ = self.dispatcher.dispatch_task("proj1", "task1")
|
|
|
|
# Update status
|
|
self.dispatcher.update_status(job_id, "running", progress=25, message="Processing...")
|
|
status = self.dispatcher.get_status(job_id, use_cache=False)
|
|
|
|
assert status["status"] == "running", "Status should be updated"
|
|
assert status["progress"] == 25, "Progress should be updated"
|
|
assert status["message"] == "Processing...", "Message should be updated"
|
|
|
|
def test_concurrent_jobs(self):
|
|
"""Test handling multiple concurrent jobs"""
|
|
jobs = []
|
|
for i in range(5):
|
|
job_id, status = self.dispatcher.dispatch_task(f"proj{i}", f"task{i}")
|
|
jobs.append(job_id)
|
|
|
|
# Verify all jobs exist
|
|
for job_id in jobs:
|
|
status = self.dispatcher.get_status(job_id)
|
|
assert status is not None, f"Job {job_id} should exist"
|
|
|
|
# Verify list shows all jobs
|
|
all_jobs = self.dispatcher.list_jobs()
|
|
assert len(all_jobs) >= 5, "Should have at least 5 jobs"
|
|
|
|
def test_cache_behavior(self):
|
|
"""Test cache behavior"""
|
|
job_id, _ = self.dispatcher.dispatch_task("proj1", "task1")
|
|
|
|
# First read should cache
|
|
status1 = self.dispatcher.get_status(job_id, use_cache=True)
|
|
|
|
# Update directly on disk
|
|
self.dispatcher.update_status(job_id, "running", progress=50)
|
|
|
|
# Cached read should be stale
|
|
status2 = self.dispatcher.get_status(job_id, use_cache=True)
|
|
assert status2["progress"] == 50, "Cache should be updated on write"
|
|
|
|
# Non-cached read should be fresh
|
|
time.sleep(1.1) # Wait for cache to expire
|
|
status3 = self.dispatcher.get_status(job_id, use_cache=False)
|
|
assert status3["progress"] == 50, "Fresh read should show updated status"
|
|
|
|
def test_cli_feedback(self):
|
|
"""Test CLI feedback rendering"""
|
|
status = {
|
|
"id": "test-job-id",
|
|
"project": "test_proj",
|
|
"status": "running",
|
|
"progress": 45,
|
|
"message": "Processing files...",
|
|
}
|
|
|
|
# Should not raise exception
|
|
self.feedback.show_status(status)
|
|
self.feedback.show_status_line(status)
|
|
self.feedback.job_dispatched("test-id", "proj", "task")
|
|
|
|
def test_progress_bar(self):
|
|
"""Test progress bar rendering"""
|
|
bar = ProgressBar.render(0)
|
|
assert "[" in bar and "]" in bar, "Progress bar should have brackets"
|
|
|
|
bar50 = ProgressBar.render(50)
|
|
bar100 = ProgressBar.render(100)
|
|
|
|
assert bar50.count("█") > bar.count("█"), "50% should have more filled blocks"
|
|
assert bar100.count("█") > bar50.count("█"), "100% should have all filled blocks"
|
|
|
|
def test_background_monitoring(self):
|
|
"""Test background monitoring queue"""
|
|
job_id, _ = self.dispatcher.dispatch_task("proj1", "test task")
|
|
|
|
# Monitoring queue should have the job
|
|
assert not self.dispatcher.monitoring_queue.empty(), "Queue should have job"
|
|
|
|
# Get item from queue (with retry in case timing issues)
|
|
try:
|
|
job_info = self.dispatcher.monitoring_queue.get(timeout=1)
|
|
assert job_info["job_id"] == job_id, "Queue should contain correct job_id"
|
|
except Exception:
|
|
# Queue might have been processed already - verify job exists instead
|
|
status = self.dispatcher.get_status(job_id)
|
|
assert status is not None, "Job should exist in dispatcher"
|
|
|
|
|
|
class TestEnhancedDispatcher:
|
|
"""Test enhanced dispatcher with integrated features"""
|
|
|
|
def __init__(self):
|
|
self.test_dir = Path(tempfile.mkdtemp(prefix="luzia_enh_test_"))
|
|
self.enhanced = EnhancedDispatcher(self.test_dir)
|
|
self.passed = 0
|
|
self.failed = 0
|
|
|
|
def run_all_tests(self):
|
|
"""Run all tests"""
|
|
print(f"\n{Colors.BOLD}=== Enhanced Dispatcher Test Suite ==={Colors.RESET}\n")
|
|
|
|
tests = [
|
|
self.test_dispatch_and_report,
|
|
self.test_status_display,
|
|
self.test_jobs_summary,
|
|
]
|
|
|
|
for test in tests:
|
|
try:
|
|
print(f" Running {test.__name__}...", end=" ", flush=True)
|
|
test()
|
|
self.passed += 1
|
|
print(f"{Colors.GREEN}✓{Colors.RESET}")
|
|
except AssertionError as e:
|
|
self.failed += 1
|
|
print(f"{Colors.RED}✗{Colors.RESET}")
|
|
print(f" Error: {e}")
|
|
except Exception as e:
|
|
self.failed += 1
|
|
print(f"{Colors.RED}✗{Colors.RESET}")
|
|
print(f" Unexpected error: {e}")
|
|
|
|
print(f"\n{Colors.BOLD}=== Test Summary ==={Colors.RESET}")
|
|
print(f" {Colors.GREEN}Passed:{Colors.RESET} {self.passed}")
|
|
print(f" {Colors.RED}Failed:{Colors.RESET} {self.failed}")
|
|
print(f" {Colors.BLUE}Total:{Colors.RESET} {self.passed + self.failed}\n")
|
|
|
|
return self.failed == 0
|
|
|
|
def test_dispatch_and_report(self):
|
|
"""Test dispatch with feedback"""
|
|
job_id, status = self.enhanced.dispatch_and_report(
|
|
"test_proj", "test task", show_feedback=False
|
|
)
|
|
assert job_id, "Should return job_id"
|
|
assert status["status"] == "dispatched", "Should be dispatched"
|
|
|
|
def test_status_display(self):
|
|
"""Test status display"""
|
|
job_id, _ = self.enhanced.dispatch_and_report(
|
|
"proj", "task", show_feedback=False
|
|
)
|
|
status = self.enhanced.get_status_and_display(job_id, show_full=False)
|
|
assert status is not None, "Should retrieve status"
|
|
|
|
def test_jobs_summary(self):
|
|
"""Test jobs summary display"""
|
|
for i in range(3):
|
|
self.enhanced.dispatch_and_report(f"proj{i}", f"task{i}", show_feedback=False)
|
|
|
|
# Should not raise exception
|
|
self.enhanced.show_jobs_summary()
|
|
self.enhanced.show_concurrent_summary()
|
|
|
|
|
|
def main():
|
|
"""Run all test suites"""
|
|
print(f"\n{Colors.BOLD}{Colors.CYAN}Luzia Responsive Dispatcher Tests{Colors.RESET}")
|
|
print(f"{Colors.GRAY}Testing non-blocking dispatch and status tracking{Colors.RESET}")
|
|
|
|
# Test responsive dispatcher
|
|
dispatcher_tests = TestResponsiveDispatcher()
|
|
dispatcher_ok = dispatcher_tests.run_all_tests()
|
|
|
|
# Test enhanced dispatcher
|
|
enhanced_tests = TestEnhancedDispatcher()
|
|
enhanced_ok = enhanced_tests.run_all_tests()
|
|
|
|
# Summary
|
|
all_passed = dispatcher_ok and enhanced_ok
|
|
if all_passed:
|
|
print(
|
|
f"{Colors.GREEN}{Colors.BOLD}✓ All tests passed!{Colors.RESET}\n"
|
|
)
|
|
return 0
|
|
else:
|
|
print(
|
|
f"{Colors.RED}{Colors.BOLD}✗ Some tests failed{Colors.RESET}\n"
|
|
)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|