#!/usr/bin/env python3 """ Dispatcher-Plugin Integration - Seamless plugin skill integration into task dispatch Bridges the responsive dispatcher with plugin skill matching to enable: 1. Automatic plugin skill detection for incoming tasks 2. Plugin metadata injection into dispatcher context 3. Skill-aware task routing 4. Plugin capability-based task optimization """ import json import logging from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime from plugin_marketplace import PluginMarketplaceRegistry from plugin_skill_loader import PluginSkillLoader logger = logging.getLogger(__name__) class DispatcherPluginBridge: """ Integrates plugin skills into the responsive dispatcher workflow Enhances task dispatch with: - Automatic plugin skill detection - Skill metadata injection into job context - Plugin-aware task routing suggestions """ def __init__(self, registry: Optional[PluginMarketplaceRegistry] = None, skill_loader: Optional[PluginSkillLoader] = None, context_dir: Optional[Path] = None): """Initialize dispatcher-plugin bridge Args: registry: Plugin marketplace registry skill_loader: Plugin skill loader context_dir: Directory for storing enhanced task context """ self.registry = registry or PluginMarketplaceRegistry() self.skill_loader = skill_loader or PluginSkillLoader(self.registry) self.context_dir = context_dir or Path("/tmp/.luzia-plugin-context") self.context_dir.mkdir(parents=True, exist_ok=True) # Load all plugin skills on initialization if not self.skill_loader.skills: self.skill_loader.generate_skills_from_plugins() def enhance_task_context(self, task_description: str, project: str, job_id: str) -> Dict[str, Any]: """ Enhance task context with relevant plugin skills Args: task_description: Description of the task project: Project name job_id: Job ID for tracking Returns: Enhanced context dict with plugin skill recommendations """ # Find relevant plugins and skills matched_skills = self.skill_loader.find_skills_for_task(task_description, min_relevance=0.3) matched_plugins = self.registry.find_plugins_for_task( task_description, self.skill_loader.matcher.extract_task_keywords(task_description) ) # Extract context context = { 'timestamp': datetime.now().isoformat(), 'job_id': job_id, 'project': project, 'task_description': task_description, 'plugin_analysis': { 'matched_plugins': [ { 'id': pid, 'name': self.registry.get_plugin(pid).name, 'relevance_score': score } for pid, score in matched_plugins[:3] # Top 3 ], 'matched_skills': matched_skills[:5], # Top 5 skills 'total_skills_available': len(self.skill_loader.skills), 'analysis_timestamp': datetime.now().isoformat() }, 'recommended_plugins': self._generate_recommendations(matched_plugins, matched_skills), 'skill_metadata': self._compile_skill_metadata(matched_skills) } # Save context context_file = self.context_dir / f"{job_id}_context.json" context_file.write_text(json.dumps(context, indent=2)) return context def _generate_recommendations(self, matched_plugins: List[Tuple[str, float]], matched_skills: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate actionable recommendations for task handling Args: matched_plugins: List of (plugin_id, score) tuples matched_skills: List of matched skills Returns: Recommendations dict """ recommendations = { 'primary_skill': None, 'alternative_skills': [], 'required_capabilities': [], 'suggested_sequence': [] } if matched_skills: # Primary skill is the top-ranked one recommendations['primary_skill'] = { 'skill_id': matched_skills[0]['skill_id'], 'name': matched_skills[0]['name'], 'plugin': matched_skills[0]['plugin_name'], 'confidence': matched_skills[0]['relevance_score'] } # Alternative skills for fallback/additional analysis if len(matched_skills) > 1: recommendations['alternative_skills'] = [ { 'skill_id': skill['skill_id'], 'name': skill['name'], 'confidence': skill['relevance_score'] } for skill in matched_skills[1:3] ] # Extract unique capability categories capability_categories = set() for skill in matched_skills: capability_categories.add(skill['category']) recommendations['required_capabilities'] = list(capability_categories) # Suggest execution sequence based on skill dependencies recommendations['suggested_sequence'] = self._build_execution_sequence(matched_skills) return recommendations def _build_execution_sequence(self, matched_skills: List[Dict[str, Any]]) -> List[Dict[str, str]]: """Build suggested task execution sequence Args: matched_skills: List of matched skills Returns: List of execution steps """ sequence = [] # Group skills by category for logical ordering categories_seen = set() for skill in matched_skills[:5]: # Limit to top 5 category = skill['category'] if category not in categories_seen: sequence.append({ 'step': len(sequence) + 1, 'category': category, 'description': f"Execute {category} plugins", 'skills': [s['skill_id'] for s in matched_skills if s['category'] == category] }) categories_seen.add(category) return sequence def _compile_skill_metadata(self, matched_skills: List[Dict[str, Any]]) -> Dict[str, Any]: """Compile comprehensive skill metadata Args: matched_skills: List of matched skills Returns: Compiled metadata """ metadata = { 'total_matched': len(matched_skills), 'by_category': {}, 'by_trust_level': {}, 'capabilities_available': [] } for skill in matched_skills: # Count by category cat = skill['category'] metadata['by_category'][cat] = metadata['by_category'].get(cat, 0) + 1 # Count by trust level trust = skill['trust_level'] metadata['by_trust_level'][trust] = metadata['by_trust_level'].get(trust, 0) + 1 # Collect unique capabilities if skill['name'] not in metadata['capabilities_available']: metadata['capabilities_available'].append(skill['name']) return metadata def get_task_context(self, job_id: str) -> Optional[Dict[str, Any]]: """Retrieve enhanced task context Args: job_id: Job ID Returns: Context dict or None if not found """ context_file = self.context_dir / f"{job_id}_context.json" if context_file.exists(): try: return json.loads(context_file.read_text()) except json.JSONDecodeError: return None return None def export_dispatch_metadata(self) -> Dict[str, Any]: """Export metadata for dispatcher initialization Returns: Dict with all plugin dispatch metadata """ return { 'source': 'dispatcher-plugin-integration', 'timestamp': datetime.now().isoformat(), 'total_available_skills': len(self.skill_loader.skills), 'total_available_plugins': len(self.registry.plugins), 'skill_categories': list(self.skill_loader.category_index.keys()), 'skill_keywords': list(self.skill_loader.skill_index.keys()), 'dispatcher_enhancements': { 'enhanced_task_context': True, 'skill_detection': True, 'plugin_recommendations': True, 'execution_sequence_planning': True } } class PluginAwareTaskDispatcher: """ Enhanced task dispatcher that leverages plugin skills Wraps the responsive dispatcher with plugin-aware features for intelligent task routing and context enrichment. """ def __init__(self, bridge: Optional[DispatcherPluginBridge] = None): """Initialize plugin-aware dispatcher Args: bridge: Dispatcher-plugin bridge instance """ self.bridge = bridge or DispatcherPluginBridge() def dispatch_with_plugin_context(self, task_description: str, project: str, job_id: str, priority: int = 5) -> Dict[str, Any]: """ Dispatch a task with automatic plugin skill detection and context enrichment Args: task_description: Description of the task project: Project name job_id: Job ID priority: Task priority Returns: Enhanced dispatch result with plugin context """ # Enhance task context with plugin skills enhanced_context = self.bridge.enhance_task_context( task_description, project, job_id ) # Build dispatch payload dispatch_result = { 'job_id': job_id, 'project': project, 'task': task_description[:200], 'priority': priority, 'dispatched_at': datetime.now().isoformat(), 'plugin_enhanced': True, 'plugin_context': enhanced_context } logger.info(f"Dispatched job {job_id} with plugin context: " f"{len(enhanced_context['plugin_analysis']['matched_skills'])} skills matched") return dispatch_result def get_dispatch_recommendations(self, job_id: str) -> Optional[Dict[str, Any]]: """Get plugin-based recommendations for a dispatched task Args: job_id: Job ID Returns: Recommendations or None """ context = self.bridge.get_task_context(job_id) if context: return context.get('recommended_plugins') return None # Convenience functions for integration with existing dispatcher def get_dispatcher_bridge(registry: Optional[PluginMarketplaceRegistry] = None) -> DispatcherPluginBridge: """Get or create dispatcher-plugin bridge""" return DispatcherPluginBridge(registry) def get_plugin_aware_dispatcher() -> PluginAwareTaskDispatcher: """Get plugin-aware task dispatcher""" return PluginAwareTaskDispatcher()