#!/usr/bin/env python3 """ Smart Router - Intelligent task routing using Gemini 3 Flash for decision making. Key decision points: 1. Task Complexity Analysis - Before dispatch, assess complexity 2. Agent Selection - Route to optimal agent/model based on task 3. Response Validation - Check output quality before returning 4. Continuation Decisions - Determine if follow-up is needed Uses Gemini 3 Flash for fast, cost-effective decisions at critical flow points. """ import os import json import logging from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from enum import Enum import time # Try to import google.generativeai try: import google.generativeai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False genai = None logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) class TaskComplexity(Enum): """Task complexity levels for routing decisions.""" TRIVIAL = "trivial" # Simple command, quick lookup SIMPLE = "simple" # Single-step task, clear path MODERATE = "moderate" # Multi-step, some reasoning needed COMPLEX = "complex" # Deep analysis, multi-agent coordination RESEARCH = "research" # Open-ended exploration class AgentTier(Enum): """Agent tiers for model selection.""" FLASH = "flash" # Gemini Flash - fast decisions HAIKU = "haiku" # Claude Haiku - quick tasks SONNET = "sonnet" # Claude Sonnet - balanced OPUS = "opus" # Claude Opus - complex tasks PRO = "pro" # Gemini Pro - deep reasoning @dataclass class RoutingDecision: """Result of routing analysis.""" complexity: TaskComplexity recommended_agent: AgentTier reasoning: str confidence: float suggested_steps: List[str] estimated_tokens: int requires_human: bool = False validation_needed: bool = True @dataclass class ValidationResult: """Result of output validation.""" is_valid: bool quality_score: float # 0-1 issues: List[str] suggestions: List[str] needs_retry: bool = False continuation_prompt: Optional[str] = None class GeminiDecisionEngine: """Gemini Flash-powered decision engine for fast routing decisions.""" def __init__(self, api_key: Optional[str] = None): """Initialize Gemini decision engine. Args: api_key: Gemini API key (defaults to env var) """ self.api_key = api_key or os.environ.get("GEMINI_API_KEY") self.model = None self.available = False self._initialize() def _initialize(self) -> None: """Initialize Gemini client.""" if not GEMINI_AVAILABLE: logger.warning("google-generativeai not installed - falling back to heuristics") return if not self.api_key: # Try multiple sources for API key api_key_sources = [ "/opt/pal-mcp-server/.env", # PAL MCP server env (primary) "/etc/shared-ai-credentials/gemini/api-key", # Shared credentials ] for source in api_key_sources: try: if source.endswith('.env'): # Parse .env file with open(source, "r") as f: for line in f: if line.startswith("GEMINI_API_KEY="): self.api_key = line.split("=", 1)[1].strip().strip('"\'') break else: # Plain text file with open(source, "r") as f: self.api_key = f.read().strip() if self.api_key: logger.info(f"Gemini API key loaded from {source}") break except (FileNotFoundError, PermissionError): continue if not self.api_key: logger.warning("Gemini API key not found - falling back to heuristics") return try: genai.configure(api_key=self.api_key) self.model = genai.GenerativeModel("gemini-2.0-flash") self.available = True logger.info("Gemini decision engine initialized (gemini-2.0-flash)") except Exception as e: logger.warning(f"Failed to initialize Gemini: {e}") def analyze_complexity(self, task: str, context: Dict[str, Any] = None) -> Dict[str, Any]: """Analyze task complexity using Gemini Flash. Args: task: Task description context: Optional context about project, history Returns: Complexity analysis result """ if not self.available: return self._heuristic_complexity(task) prompt = f"""Analyze this task's complexity for routing to an AI agent. TASK: {task} CONTEXT: {json.dumps(context or {}, indent=2)} Respond in JSON: {{ "complexity": "trivial|simple|moderate|complex|research", "reasoning": "brief explanation", "confidence": 0.0-1.0, "estimated_steps": ["step1", "step2"], "requires_code_changes": true/false, "requires_file_reads": true/false, "requires_external_calls": true/false, "risk_level": "low|medium|high" }}""" try: response = self.model.generate_content( prompt, generation_config=genai.GenerationConfig( temperature=0.1, max_output_tokens=500 ) ) # Parse JSON from response text = response.text.strip() # Handle markdown code blocks if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] return json.loads(text) except Exception as e: logger.warning(f"Gemini complexity analysis failed: {e}") return self._heuristic_complexity(task) def _heuristic_complexity(self, task: str) -> Dict[str, Any]: """Fallback heuristic-based complexity analysis.""" task_lower = task.lower() # Simple keyword matching for fallback if any(word in task_lower for word in ["list", "show", "check", "status", "what is"]): complexity = "trivial" confidence = 0.7 elif any(word in task_lower for word in ["fix", "update", "change", "add"]): complexity = "simple" confidence = 0.6 elif any(word in task_lower for word in ["implement", "create", "build", "develop"]): complexity = "moderate" confidence = 0.5 elif any(word in task_lower for word in ["refactor", "optimize", "debug", "investigate"]): complexity = "complex" confidence = 0.5 elif any(word in task_lower for word in ["research", "analyze", "design", "architect"]): complexity = "research" confidence = 0.5 else: complexity = "moderate" confidence = 0.4 return { "complexity": complexity, "reasoning": "Heuristic analysis (Gemini unavailable)", "confidence": confidence, "estimated_steps": [], "requires_code_changes": "implement" in task_lower or "fix" in task_lower, "requires_file_reads": True, "requires_external_calls": False, "risk_level": "medium" } def validate_output(self, task: str, output: str, context: Dict[str, Any] = None) -> Dict[str, Any]: """Validate agent output quality. Args: task: Original task output: Agent's output context: Additional context Returns: Validation result """ if not self.available: return self._heuristic_validation(task, output) # Truncate output for validation (avoid huge prompts) output_truncated = output[:3000] if len(output) > 3000 else output prompt = f"""Validate this AI agent's response to a task. TASK: {task} RESPONSE (may be truncated): {output_truncated} Respond in JSON: {{ "is_valid": true/false, "quality_score": 0.0-1.0, "issues": ["issue1", "issue2"], "suggestions": ["suggestion1"], "task_completed": true/false, "needs_follow_up": true/false, "follow_up_prompt": "optional continuation prompt" }}""" try: response = self.model.generate_content( prompt, generation_config=genai.GenerationConfig( temperature=0.1, max_output_tokens=500 ) ) text = response.text.strip() if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] return json.loads(text) except Exception as e: logger.warning(f"Gemini validation failed: {e}") return self._heuristic_validation(task, output) def _heuristic_validation(self, task: str, output: str) -> Dict[str, Any]: """Fallback heuristic output validation.""" # Basic checks has_content = len(output.strip()) > 50 has_code = "```" in output or "def " in output or "function " in output has_error = "error" in output.lower() or "failed" in output.lower() quality = 0.5 if has_content: quality += 0.2 if has_code and ("implement" in task.lower() or "code" in task.lower()): quality += 0.2 if has_error: quality -= 0.3 return { "is_valid": has_content and not has_error, "quality_score": max(0.0, min(1.0, quality)), "issues": ["Error detected in output"] if has_error else [], "suggestions": [], "task_completed": has_content, "needs_follow_up": has_error, "follow_up_prompt": "Please fix the errors and try again" if has_error else None } def route_task(self, task: str, project: str, complexity: str) -> Dict[str, Any]: """Determine optimal agent/model for task. Args: task: Task description project: Target project complexity: Pre-analyzed complexity Returns: Routing recommendation """ if not self.available: return self._heuristic_routing(task, project, complexity) prompt = f"""Recommend the best AI agent configuration for this task. TASK: {task} PROJECT: {project} COMPLEXITY: {complexity} Available agents: - flash: Gemini Flash - Fast, cheap, good for simple tasks - haiku: Claude Haiku - Quick, efficient, good for straightforward coding - sonnet: Claude Sonnet - Balanced, good for most development tasks - opus: Claude Opus - Most capable, for complex analysis - pro: Gemini Pro - Deep reasoning, research tasks Respond in JSON: {{ "recommended_agent": "flash|haiku|sonnet|opus|pro", "reasoning": "why this agent", "backup_agent": "alternative if first fails", "special_instructions": "any task-specific guidance", "estimated_time": "quick|moderate|long", "suggested_tools": ["Read", "Edit", "Bash"] }}""" try: response = self.model.generate_content( prompt, generation_config=genai.GenerationConfig( temperature=0.1, max_output_tokens=400 ) ) text = response.text.strip() if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] return json.loads(text) except Exception as e: logger.warning(f"Gemini routing failed: {e}") return self._heuristic_routing(task, project, complexity) def _heuristic_routing(self, task: str, project: str, complexity: str) -> Dict[str, Any]: """Fallback heuristic task routing.""" # Map complexity to agent complexity_to_agent = { "trivial": "haiku", "simple": "haiku", "moderate": "sonnet", "complex": "sonnet", "research": "pro" } return { "recommended_agent": complexity_to_agent.get(complexity, "sonnet"), "reasoning": f"Heuristic routing for {complexity} task", "backup_agent": "sonnet", "special_instructions": None, "estimated_time": "moderate", "suggested_tools": ["Read", "Edit", "Bash", "Glob", "Grep"] } class SmartRouter: """Main smart routing orchestrator integrating Gemini decisions.""" def __init__(self, api_key: Optional[str] = None): """Initialize smart router. Args: api_key: Optional Gemini API key """ self.decision_engine = GeminiDecisionEngine(api_key) self.routing_history: List[Dict[str, Any]] = [] self.max_history = 100 logger.info(f"SmartRouter initialized (Gemini: {self.decision_engine.available})") def analyze_and_route(self, task: str, project: str, context: Dict[str, Any] = None) -> RoutingDecision: """Full analysis and routing for a task. Args: task: Task description project: Target project context: Additional context Returns: Complete routing decision """ start_time = time.time() # Step 1: Analyze complexity complexity_result = self.decision_engine.analyze_complexity(task, context) complexity = TaskComplexity(complexity_result.get("complexity", "moderate")) # Step 2: Get routing recommendation routing_result = self.decision_engine.route_task( task, project, complexity_result.get("complexity", "moderate") ) # Step 3: Build decision agent_map = { "flash": AgentTier.FLASH, "haiku": AgentTier.HAIKU, "sonnet": AgentTier.SONNET, "opus": AgentTier.OPUS, "pro": AgentTier.PRO } recommended_agent = agent_map.get( routing_result.get("recommended_agent", "sonnet"), AgentTier.SONNET ) # Estimate tokens based on complexity token_estimates = { TaskComplexity.TRIVIAL: 500, TaskComplexity.SIMPLE: 2000, TaskComplexity.MODERATE: 8000, TaskComplexity.COMPLEX: 20000, TaskComplexity.RESEARCH: 50000 } decision = RoutingDecision( complexity=complexity, recommended_agent=recommended_agent, reasoning=f"{complexity_result.get('reasoning', '')} | {routing_result.get('reasoning', '')}", confidence=complexity_result.get("confidence", 0.5), suggested_steps=complexity_result.get("estimated_steps", []), estimated_tokens=token_estimates.get(complexity, 8000), requires_human=complexity_result.get("risk_level", "low") == "high", validation_needed=complexity not in [TaskComplexity.TRIVIAL] ) # Record history elapsed = time.time() - start_time self._record_routing(task, project, decision, elapsed) return decision def validate_response(self, task: str, output: str, context: Dict[str, Any] = None) -> ValidationResult: """Validate agent response quality. Args: task: Original task output: Agent output context: Additional context Returns: Validation result with quality assessment """ result = self.decision_engine.validate_output(task, output, context) return ValidationResult( is_valid=result.get("is_valid", True), quality_score=result.get("quality_score", 0.5), issues=result.get("issues", []), suggestions=result.get("suggestions", []), needs_retry=not result.get("task_completed", True), continuation_prompt=result.get("follow_up_prompt") ) def should_escalate(self, task: str, error: str) -> Tuple[bool, str]: """Determine if a failed task should be escalated. Args: task: Original task error: Error encountered Returns: (should_escalate, reason) """ # Check for patterns that need escalation escalate_patterns = [ "permission denied", "authentication", "security", "production", "database migration", "delete", "remove" ] error_lower = error.lower() task_lower = task.lower() for pattern in escalate_patterns: if pattern in error_lower or pattern in task_lower: return True, f"Task involves sensitive operation: {pattern}" # Check if error suggests human intervention if "requires approval" in error_lower or "blocked" in error_lower: return True, "Task requires human approval" return False, "" def _record_routing(self, task: str, project: str, decision: RoutingDecision, elapsed: float) -> None: """Record routing decision for learning.""" record = { "timestamp": time.time(), "task": task[:200], # Truncate "project": project, "complexity": decision.complexity.value, "agent": decision.recommended_agent.value, "confidence": decision.confidence, "elapsed_ms": round(elapsed * 1000, 2) } self.routing_history.append(record) # Trim history if len(self.routing_history) > self.max_history: self.routing_history = self.routing_history[-self.max_history:] def get_stats(self) -> Dict[str, Any]: """Get routing statistics.""" if not self.routing_history: return {"total_routings": 0} complexities = [r["complexity"] for r in self.routing_history] agents = [r["agent"] for r in self.routing_history] avg_elapsed = sum(r["elapsed_ms"] for r in self.routing_history) / len(self.routing_history) return { "total_routings": len(self.routing_history), "complexity_distribution": {c: complexities.count(c) for c in set(complexities)}, "agent_distribution": {a: agents.count(a) for a in set(agents)}, "avg_routing_time_ms": round(avg_elapsed, 2), "gemini_available": self.decision_engine.available } # CLI for testing if __name__ == "__main__": import sys logger.info("=" * 60) logger.info("Smart Router - Gemini 3 Flash Decision Engine") logger.info("=" * 60) router = SmartRouter() # Test tasks test_cases = [ ("List all running containers", "admin"), ("Fix the bug in track component", "musica"), ("Implement new authentication system with OAuth2", "overbits"), ("Research microservices architecture patterns", "dss"), ("Refactor the entire API layer for better performance", "musica"), ] for task, project in test_cases: logger.info(f"\nTask: '{task}'") logger.info(f"Project: {project}") decision = router.analyze_and_route(task, project) logger.info(f" Complexity: {decision.complexity.value}") logger.info(f" Agent: {decision.recommended_agent.value}") logger.info(f" Confidence: {decision.confidence:.2f}") logger.info(f" Tokens Est: {decision.estimated_tokens}") logger.info(f" Human Required: {decision.requires_human}") if decision.suggested_steps: logger.info(f" Steps: {decision.suggested_steps[:3]}") # Show stats logger.info("\n" + "=" * 60) stats = router.get_stats() logger.info(f"Stats: {json.dumps(stats, indent=2)}") logger.info("=" * 60)