#!/usr/bin/env python3 """ Dispatcher Enhancements - Integration module for responsive dispatcher in Luzia This module patches existing luzia functions to use the responsive dispatcher. It maintains backward compatibility while adding non-blocking features. Integration Points: 1. route_project_task() - Enhanced to use responsive feedback 2. spawn_claude_agent() - Now integrated with background monitor 3. Jobs listing and status tracking """ import sys import json from pathlib import Path from typing import Dict, Optional, Tuple from datetime import datetime # Add lib to path lib_path = Path(__file__).parent sys.path.insert(0, str(lib_path)) from responsive_dispatcher import ResponseiveDispatcher from cli_feedback import CLIFeedback, Colors class EnhancedDispatcher: """Enhanced dispatcher that wraps responsive features""" def __init__(self, jobs_dir: Path = None): self.dispatcher = ResponseiveDispatcher(jobs_dir) self.feedback = CLIFeedback() def dispatch_and_report( self, project: str, task: str, show_details: bool = True, show_feedback: bool = True, ) -> Tuple[str, Dict]: """ Dispatch task and show responsive feedback. Returns: (job_id, status_dict) """ # Dispatch task job_id, status = self.dispatcher.dispatch_task(project, task) # Show immediate feedback if show_feedback: self.feedback.job_dispatched(job_id, project, task, show_details) return job_id, status def get_status_and_display(self, job_id: str, show_full: bool = False) -> Optional[Dict]: """Get status and display it""" status = self.dispatcher.get_status(job_id) if status: self.feedback.show_status(status, show_full) return status def show_jobs_summary(self, project: str = None): """Show summary of jobs with responsive formatting""" jobs = self.dispatcher.list_jobs(project=project) self.feedback.show_jobs_list(jobs) def show_concurrent_summary(self): """Show summary of all concurrent tasks""" jobs = self.dispatcher.list_jobs() self.feedback.show_concurrent_jobs(jobs) # Global dispatcher instance _dispatcher = None def get_enhanced_dispatcher(jobs_dir: Path = None) -> EnhancedDispatcher: """Get or create enhanced dispatcher instance""" global _dispatcher if _dispatcher is None: _dispatcher = EnhancedDispatcher(jobs_dir) return _dispatcher # Integration functions that can replace or enhance existing luzia functions def enhanced_spawn_claude_agent( project: str, task: str, context: str, config: dict, show_feedback: bool = True ) -> str: """ Enhanced spawn_claude_agent that returns job_id immediately. This is a wrapper around the existing spawn_claude_agent that adds responsive dispatcher tracking. Returns: job_id (for compatibility with existing code) """ dispatcher = get_enhanced_dispatcher() # Dispatch using responsive system job_id, status = dispatcher.dispatch_and_report( project, task, show_details=False, show_feedback=show_feedback ) # For backward compatibility, also return the job_id from here # The actual Claude agent spawning happens in the background return job_id def track_existing_job(job_id: str, project: str, task: str) -> None: """ Track an existing job that was spawned outside the responsive system. Useful for retroactive tracking. """ dispatcher = get_enhanced_dispatcher() _, status = dispatcher.dispatcher.dispatch_task(project, task) def show_job_status_interactive(job_id: str) -> None: """Show job status in interactive mode (polls for updates)""" dispatcher = get_enhanced_dispatcher() print(f"\n{Colors.BOLD}Monitoring job: {job_id}{Colors.RESET}\n") while True: status = dispatcher.dispatcher.get_status(job_id, use_cache=False) if not status: print(f"Job {job_id} not found") return # Clear line and show status print(f"\r", end="", flush=True) print(f" {Colors.status_color(status['status'])}{status['status']:10}{Colors.RESET} " f"{status.get('progress', 0):3d}% {status.get('message', ''):<60}") # Check if done if status.get("status") in ["completed", "failed", "killed"]: print(f"\n\n{Colors.BOLD}Final Status:{Colors.RESET}") dispatcher.feedback.show_status(status, show_full=True) return import time time.sleep(0.5) def export_job_status_json(job_id: str) -> Dict: """Export job status as JSON (for programmatic use)""" dispatcher = get_enhanced_dispatcher() status = dispatcher.dispatcher.get_status(job_id) return status or {"error": f"Job {job_id} not found"} # Async background monitoring helpers def start_background_monitoring() -> None: """Start background monitoring thread""" dispatcher = get_enhanced_dispatcher() monitor = dispatcher.dispatcher.start_background_monitor() print(f"[Background monitor started (PID: {id(monitor)})]") def get_job_queue_status() -> Dict: """Get status of job queue""" dispatcher = get_enhanced_dispatcher() jobs = dispatcher.dispatcher.list_jobs() running = [j for j in jobs if j.get("status") == "running"] pending = [j for j in jobs if j.get("status") in ["dispatched", "starting"]] completed = [j for j in jobs if j.get("status") == "completed"] failed = [j for j in jobs if j.get("status") in ["failed", "killed"]] return { "running": len(running), "pending": len(pending), "completed": len(completed), "failed": len(failed), "total": len(jobs), "jobs": jobs[:20], }