#!/usr/bin/env python3 """ Test Suite for Responsive Dispatcher Tests: 1. Immediate job dispatch with job_id return 2. Non-blocking task spawning 3. Background status monitoring 4. Concurrent task handling 5. Status polling and updates 6. CLI feedback rendering """ import json import sys import time import tempfile import threading from pathlib import Path # Add lib to path lib_path = Path(__file__).parent.parent / "lib" sys.path.insert(0, str(lib_path)) from responsive_dispatcher import ResponseiveDispatcher from cli_feedback import CLIFeedback, Colors, ProgressBar from dispatcher_enhancements import EnhancedDispatcher, get_enhanced_dispatcher class TestResponsiveDispatcher: """Test responsive dispatcher functionality""" def __init__(self): self.test_dir = Path(tempfile.mkdtemp(prefix="luzia_test_")) self.dispatcher = ResponseiveDispatcher(self.test_dir) self.feedback = CLIFeedback() self.passed = 0 self.failed = 0 def run_all_tests(self): """Run all tests""" print(f"\n{Colors.BOLD}=== Responsive Dispatcher Test Suite ==={Colors.RESET}\n") tests = [ self.test_immediate_dispatch, self.test_job_status_retrieval, self.test_status_updates, self.test_concurrent_jobs, self.test_cache_behavior, self.test_cli_feedback, self.test_progress_bar, self.test_background_monitoring, ] for test in tests: try: print(f" Running {test.__name__}...", end=" ", flush=True) test() self.passed += 1 print(f"{Colors.GREEN}✓{Colors.RESET}") except AssertionError as e: self.failed += 1 print(f"{Colors.RED}✗{Colors.RESET}") print(f" Error: {e}") except Exception as e: self.failed += 1 print(f"{Colors.RED}✗{Colors.RESET}") print(f" Unexpected error: {e}") # Summary print(f"\n{Colors.BOLD}=== Test Summary ==={Colors.RESET}") print(f" {Colors.GREEN}Passed:{Colors.RESET} {self.passed}") print(f" {Colors.RED}Failed:{Colors.RESET} {self.failed}") print(f" {Colors.BLUE}Total:{Colors.RESET} {self.passed + self.failed}\n") return self.failed == 0 def test_immediate_dispatch(self): """Test that dispatch returns immediately with job_id""" start_time = time.time() job_id, status = self.dispatcher.dispatch_task("test_project", "echo hello") elapsed = time.time() - start_time assert job_id, "Job ID should be returned" assert isinstance(status, dict), "Status should be dict" assert status["status"] == "dispatched", "Initial status should be 'dispatched'" assert status["project"] == "test_project", "Project should match" assert elapsed < 0.5, f"Dispatch should be instant (took {elapsed}s)" def test_job_status_retrieval(self): """Test retrieving job status""" job_id, initial_status = self.dispatcher.dispatch_task("proj1", "task1") # Retrieve status retrieved = self.dispatcher.get_status(job_id) assert retrieved is not None, "Status should be retrievable" assert retrieved["id"] == job_id, "Job ID should match" assert retrieved["status"] == "dispatched", "Status should be dispatched" def test_status_updates(self): """Test updating job status""" job_id, _ = self.dispatcher.dispatch_task("proj1", "task1") # Update status self.dispatcher.update_status(job_id, "running", progress=25, message="Processing...") status = self.dispatcher.get_status(job_id, use_cache=False) assert status["status"] == "running", "Status should be updated" assert status["progress"] == 25, "Progress should be updated" assert status["message"] == "Processing...", "Message should be updated" def test_concurrent_jobs(self): """Test handling multiple concurrent jobs""" jobs = [] for i in range(5): job_id, status = self.dispatcher.dispatch_task(f"proj{i}", f"task{i}") jobs.append(job_id) # Verify all jobs exist for job_id in jobs: status = self.dispatcher.get_status(job_id) assert status is not None, f"Job {job_id} should exist" # Verify list shows all jobs all_jobs = self.dispatcher.list_jobs() assert len(all_jobs) >= 5, "Should have at least 5 jobs" def test_cache_behavior(self): """Test cache behavior""" job_id, _ = self.dispatcher.dispatch_task("proj1", "task1") # First read should cache status1 = self.dispatcher.get_status(job_id, use_cache=True) # Update directly on disk self.dispatcher.update_status(job_id, "running", progress=50) # Cached read should be stale status2 = self.dispatcher.get_status(job_id, use_cache=True) assert status2["progress"] == 50, "Cache should be updated on write" # Non-cached read should be fresh time.sleep(1.1) # Wait for cache to expire status3 = self.dispatcher.get_status(job_id, use_cache=False) assert status3["progress"] == 50, "Fresh read should show updated status" def test_cli_feedback(self): """Test CLI feedback rendering""" status = { "id": "test-job-id", "project": "test_proj", "status": "running", "progress": 45, "message": "Processing files...", } # Should not raise exception self.feedback.show_status(status) self.feedback.show_status_line(status) self.feedback.job_dispatched("test-id", "proj", "task") def test_progress_bar(self): """Test progress bar rendering""" bar = ProgressBar.render(0) assert "[" in bar and "]" in bar, "Progress bar should have brackets" bar50 = ProgressBar.render(50) bar100 = ProgressBar.render(100) assert bar50.count("█") > bar.count("█"), "50% should have more filled blocks" assert bar100.count("█") > bar50.count("█"), "100% should have all filled blocks" def test_background_monitoring(self): """Test background monitoring queue""" job_id, _ = self.dispatcher.dispatch_task("proj1", "test task") # Monitoring queue should have the job assert not self.dispatcher.monitoring_queue.empty(), "Queue should have job" # Get item from queue (with retry in case timing issues) try: job_info = self.dispatcher.monitoring_queue.get(timeout=1) assert job_info["job_id"] == job_id, "Queue should contain correct job_id" except Exception: # Queue might have been processed already - verify job exists instead status = self.dispatcher.get_status(job_id) assert status is not None, "Job should exist in dispatcher" class TestEnhancedDispatcher: """Test enhanced dispatcher with integrated features""" def __init__(self): self.test_dir = Path(tempfile.mkdtemp(prefix="luzia_enh_test_")) self.enhanced = EnhancedDispatcher(self.test_dir) self.passed = 0 self.failed = 0 def run_all_tests(self): """Run all tests""" print(f"\n{Colors.BOLD}=== Enhanced Dispatcher Test Suite ==={Colors.RESET}\n") tests = [ self.test_dispatch_and_report, self.test_status_display, self.test_jobs_summary, ] for test in tests: try: print(f" Running {test.__name__}...", end=" ", flush=True) test() self.passed += 1 print(f"{Colors.GREEN}✓{Colors.RESET}") except AssertionError as e: self.failed += 1 print(f"{Colors.RED}✗{Colors.RESET}") print(f" Error: {e}") except Exception as e: self.failed += 1 print(f"{Colors.RED}✗{Colors.RESET}") print(f" Unexpected error: {e}") print(f"\n{Colors.BOLD}=== Test Summary ==={Colors.RESET}") print(f" {Colors.GREEN}Passed:{Colors.RESET} {self.passed}") print(f" {Colors.RED}Failed:{Colors.RESET} {self.failed}") print(f" {Colors.BLUE}Total:{Colors.RESET} {self.passed + self.failed}\n") return self.failed == 0 def test_dispatch_and_report(self): """Test dispatch with feedback""" job_id, status = self.enhanced.dispatch_and_report( "test_proj", "test task", show_feedback=False ) assert job_id, "Should return job_id" assert status["status"] == "dispatched", "Should be dispatched" def test_status_display(self): """Test status display""" job_id, _ = self.enhanced.dispatch_and_report( "proj", "task", show_feedback=False ) status = self.enhanced.get_status_and_display(job_id, show_full=False) assert status is not None, "Should retrieve status" def test_jobs_summary(self): """Test jobs summary display""" for i in range(3): self.enhanced.dispatch_and_report(f"proj{i}", f"task{i}", show_feedback=False) # Should not raise exception self.enhanced.show_jobs_summary() self.enhanced.show_concurrent_summary() def main(): """Run all test suites""" print(f"\n{Colors.BOLD}{Colors.CYAN}Luzia Responsive Dispatcher Tests{Colors.RESET}") print(f"{Colors.GRAY}Testing non-blocking dispatch and status tracking{Colors.RESET}") # Test responsive dispatcher dispatcher_tests = TestResponsiveDispatcher() dispatcher_ok = dispatcher_tests.run_all_tests() # Test enhanced dispatcher enhanced_tests = TestEnhancedDispatcher() enhanced_ok = enhanced_tests.run_all_tests() # Summary all_passed = dispatcher_ok and enhanced_ok if all_passed: print( f"{Colors.GREEN}{Colors.BOLD}✓ All tests passed!{Colors.RESET}\n" ) return 0 else: print( f"{Colors.RED}{Colors.BOLD}✗ Some tests failed{Colors.RESET}\n" ) return 1 if __name__ == "__main__": sys.exit(main())