Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
450 lines
15 KiB
Python
450 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Smart Flow Integration - Bridges SmartRouter with Luzia's dispatch flow.
|
|
|
|
Integrates Gemini 3 Flash decision making at key points:
|
|
1. Pre-dispatch: Analyze complexity, select optimal agent
|
|
2. Mid-execution: Validate progress, adjust strategy
|
|
3. Post-execution: Validate output, decide on follow-up
|
|
|
|
This module provides the glue between SmartRouter and existing components.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
|
|
# Import SmartRouter
|
|
try:
|
|
from smart_router import SmartRouter, RoutingDecision, ValidationResult, TaskComplexity, AgentTier
|
|
SMART_ROUTER_AVAILABLE = True
|
|
except ImportError:
|
|
SMART_ROUTER_AVAILABLE = False
|
|
|
|
# Import existing components
|
|
try:
|
|
from flow_intelligence import FlowIntelligence
|
|
FLOW_INTELLIGENCE_AVAILABLE = True
|
|
except ImportError:
|
|
FLOW_INTELLIGENCE_AVAILABLE = False
|
|
|
|
try:
|
|
from semantic_router import SemanticRouter
|
|
SEMANTIC_ROUTER_AVAILABLE = True
|
|
except ImportError:
|
|
SEMANTIC_ROUTER_AVAILABLE = False
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class EnhancedDispatchContext:
|
|
"""Enhanced context for smart dispatch."""
|
|
task: str
|
|
project: str
|
|
routing_decision: Optional[RoutingDecision]
|
|
domain_context: Optional[Dict[str, Any]]
|
|
flow_context: Optional[Dict[str, Any]]
|
|
model_override: Optional[str] = None
|
|
priority: int = 5
|
|
tags: list = None
|
|
|
|
|
|
class SmartFlowOrchestrator:
|
|
"""
|
|
Orchestrates intelligent task dispatch using SmartRouter.
|
|
|
|
Combines:
|
|
- SmartRouter for Gemini-powered decisions
|
|
- SemanticRouter for domain context
|
|
- FlowIntelligence for task flow management
|
|
"""
|
|
|
|
def __init__(self, config_path: Optional[Path] = None):
|
|
"""Initialize smart flow orchestrator.
|
|
|
|
Args:
|
|
config_path: Path to routing.yaml config
|
|
"""
|
|
self.config_path = config_path or Path("/opt/server-agents/claude-flow/config/routing.yaml")
|
|
self.config = self._load_config()
|
|
|
|
# Initialize components
|
|
self.smart_router = None
|
|
self.semantic_router = None
|
|
self.flow_intelligence = None
|
|
|
|
self._init_components()
|
|
|
|
logger.info("SmartFlowOrchestrator initialized")
|
|
logger.info(f" SmartRouter: {self.smart_router is not None}")
|
|
logger.info(f" SemanticRouter: {self.semantic_router is not None}")
|
|
logger.info(f" FlowIntelligence: {self.flow_intelligence is not None}")
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""Load routing configuration."""
|
|
if not self.config_path.exists():
|
|
logger.warning(f"Config not found: {self.config_path}")
|
|
return {}
|
|
|
|
try:
|
|
import yaml
|
|
return yaml.safe_load(self.config_path.read_text())
|
|
except ImportError:
|
|
# Fallback: basic parsing
|
|
logger.warning("PyYAML not available, using defaults")
|
|
return {}
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load config: {e}")
|
|
return {}
|
|
|
|
def _init_components(self) -> None:
|
|
"""Initialize all routing components."""
|
|
if SMART_ROUTER_AVAILABLE:
|
|
try:
|
|
self.smart_router = SmartRouter()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to init SmartRouter: {e}")
|
|
|
|
if SEMANTIC_ROUTER_AVAILABLE:
|
|
try:
|
|
self.semantic_router = SemanticRouter()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to init SemanticRouter: {e}")
|
|
|
|
if FLOW_INTELLIGENCE_AVAILABLE:
|
|
try:
|
|
self.flow_intelligence = FlowIntelligence()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to init FlowIntelligence: {e}")
|
|
|
|
def prepare_dispatch(self, task: str, project: str,
|
|
context: Dict[str, Any] = None) -> EnhancedDispatchContext:
|
|
"""
|
|
Prepare enhanced dispatch context with smart routing.
|
|
|
|
Args:
|
|
task: Task description
|
|
project: Target project
|
|
context: Additional context
|
|
|
|
Returns:
|
|
EnhancedDispatchContext with routing decisions
|
|
"""
|
|
start_time = time.time()
|
|
|
|
# Get routing decision from SmartRouter
|
|
routing_decision = None
|
|
if self.smart_router:
|
|
routing_decision = self.smart_router.analyze_and_route(task, project, context)
|
|
logger.info(f"SmartRouter: {routing_decision.complexity.value} -> {routing_decision.recommended_agent.value}")
|
|
|
|
# Get domain context from SemanticRouter
|
|
domain_context = None
|
|
if self.semantic_router:
|
|
domain_result = self.semantic_router.route(task)
|
|
domain_context = {
|
|
"domain": domain_result.get("primary_domain"),
|
|
"confidence": domain_result.get("confidence", 0),
|
|
"best_practices": domain_result.get("best_practices", []),
|
|
"system_instructions": domain_result.get("system_instructions", "")
|
|
}
|
|
logger.info(f"SemanticRouter: {domain_context['domain']} (conf: {domain_context['confidence']:.2f})")
|
|
|
|
# Get flow context from FlowIntelligence
|
|
flow_context = None
|
|
if self.flow_intelligence:
|
|
recent_flows = self.flow_intelligence.get_recent_flows(project, limit=3)
|
|
flow_context = {
|
|
"recent_tasks": [f.task_description[:100] for f in recent_flows],
|
|
"active_flows": len(self.flow_intelligence.active_flows),
|
|
"completion_rate": self.flow_intelligence.get_stats().get("completion_rate", 0)
|
|
}
|
|
|
|
# Determine model override based on routing
|
|
model_override = self._determine_model(routing_decision, context)
|
|
|
|
elapsed = time.time() - start_time
|
|
logger.info(f"Dispatch prepared in {elapsed*1000:.1f}ms")
|
|
|
|
return EnhancedDispatchContext(
|
|
task=task,
|
|
project=project,
|
|
routing_decision=routing_decision,
|
|
domain_context=domain_context,
|
|
flow_context=flow_context,
|
|
model_override=model_override,
|
|
priority=self._calculate_priority(routing_decision),
|
|
tags=self._extract_tags(task, domain_context)
|
|
)
|
|
|
|
def _determine_model(self, routing: Optional[RoutingDecision],
|
|
context: Dict[str, Any] = None) -> Optional[str]:
|
|
"""Determine the optimal model based on routing."""
|
|
if not routing:
|
|
return None
|
|
|
|
# Map agent tiers to actual model names
|
|
agent_to_model = {
|
|
AgentTier.FLASH: None, # Use Gemini Flash for internal decisions only
|
|
AgentTier.HAIKU: "haiku",
|
|
AgentTier.SONNET: "sonnet",
|
|
AgentTier.OPUS: "opus",
|
|
AgentTier.PRO: None, # Pro is for internal reasoning
|
|
}
|
|
|
|
return agent_to_model.get(routing.recommended_agent)
|
|
|
|
def _calculate_priority(self, routing: Optional[RoutingDecision]) -> int:
|
|
"""Calculate task priority based on routing."""
|
|
if not routing:
|
|
return 5
|
|
|
|
# Higher complexity = higher priority (lower number)
|
|
complexity_priority = {
|
|
TaskComplexity.TRIVIAL: 8,
|
|
TaskComplexity.SIMPLE: 6,
|
|
TaskComplexity.MODERATE: 5,
|
|
TaskComplexity.COMPLEX: 3,
|
|
TaskComplexity.RESEARCH: 4,
|
|
}
|
|
|
|
return complexity_priority.get(routing.complexity, 5)
|
|
|
|
def _extract_tags(self, task: str, domain_context: Optional[Dict]) -> list:
|
|
"""Extract tags for task categorization."""
|
|
tags = []
|
|
|
|
if domain_context:
|
|
tags.append(domain_context.get("domain", "general"))
|
|
|
|
# Add keyword-based tags
|
|
task_lower = task.lower()
|
|
if "bug" in task_lower or "fix" in task_lower:
|
|
tags.append("bugfix")
|
|
if "feature" in task_lower or "implement" in task_lower:
|
|
tags.append("feature")
|
|
if "test" in task_lower:
|
|
tags.append("testing")
|
|
if "refactor" in task_lower:
|
|
tags.append("refactor")
|
|
|
|
return tags
|
|
|
|
def validate_output(self, task: str, output: str,
|
|
context: Dict[str, Any] = None) -> ValidationResult:
|
|
"""
|
|
Validate agent output using SmartRouter.
|
|
|
|
Args:
|
|
task: Original task
|
|
output: Agent output
|
|
context: Additional context
|
|
|
|
Returns:
|
|
ValidationResult with quality assessment
|
|
"""
|
|
if not self.smart_router:
|
|
# Return default valid result
|
|
return ValidationResult(
|
|
is_valid=True,
|
|
quality_score=0.7,
|
|
issues=[],
|
|
suggestions=[],
|
|
needs_retry=False
|
|
)
|
|
|
|
return self.smart_router.validate_response(task, output, context)
|
|
|
|
def should_continue(self, task: str, output: str,
|
|
validation: ValidationResult) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Determine if task needs continuation.
|
|
|
|
Args:
|
|
task: Original task
|
|
output: Current output
|
|
validation: Validation result
|
|
|
|
Returns:
|
|
(should_continue, continuation_prompt)
|
|
"""
|
|
if not validation.is_valid:
|
|
return True, f"Previous attempt had issues: {', '.join(validation.issues)}. Please retry."
|
|
|
|
if validation.needs_retry:
|
|
return True, validation.continuation_prompt
|
|
|
|
if validation.quality_score < 0.6:
|
|
return True, "Output quality is below threshold. Please improve the response."
|
|
|
|
return False, None
|
|
|
|
def create_augmented_prompt(self, context: EnhancedDispatchContext) -> str:
|
|
"""
|
|
Create an augmented prompt with routing context.
|
|
|
|
Args:
|
|
context: Enhanced dispatch context
|
|
|
|
Returns:
|
|
Augmented task prompt
|
|
"""
|
|
parts = [context.task]
|
|
|
|
# Add domain guidance if available
|
|
if context.domain_context:
|
|
practices = context.domain_context.get("best_practices", [])
|
|
if practices:
|
|
parts.append("\n\nBest practices for this task:")
|
|
for practice in practices[:3]:
|
|
parts.append(f"- {practice}")
|
|
|
|
# Add suggested steps if available
|
|
if context.routing_decision and context.routing_decision.suggested_steps:
|
|
parts.append("\n\nSuggested approach:")
|
|
for i, step in enumerate(context.routing_decision.suggested_steps[:5], 1):
|
|
parts.append(f"{i}. {step}")
|
|
|
|
return "\n".join(parts)
|
|
|
|
def record_completion(self, context: EnhancedDispatchContext,
|
|
output: str, success: bool,
|
|
duration_seconds: float) -> None:
|
|
"""
|
|
Record task completion for learning.
|
|
|
|
Args:
|
|
context: Dispatch context
|
|
output: Task output
|
|
success: Whether task succeeded
|
|
duration_seconds: Execution duration
|
|
"""
|
|
if not self.flow_intelligence:
|
|
return
|
|
|
|
# Create or update flow
|
|
try:
|
|
# Simplified recording
|
|
logger.info(f"Task {context.project}: {'Success' if success else 'Failed'} in {duration_seconds:.1f}s")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to record completion: {e}")
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get combined statistics from all components."""
|
|
stats = {
|
|
"smart_router": {},
|
|
"semantic_router": {},
|
|
"flow_intelligence": {}
|
|
}
|
|
|
|
if self.smart_router:
|
|
stats["smart_router"] = self.smart_router.get_stats()
|
|
|
|
if self.flow_intelligence:
|
|
stats["flow_intelligence"] = self.flow_intelligence.get_stats()
|
|
|
|
return stats
|
|
|
|
|
|
# Singleton instance for easy access
|
|
_orchestrator_instance = None
|
|
|
|
|
|
def get_orchestrator() -> SmartFlowOrchestrator:
|
|
"""Get or create singleton orchestrator instance."""
|
|
global _orchestrator_instance
|
|
if _orchestrator_instance is None:
|
|
_orchestrator_instance = SmartFlowOrchestrator()
|
|
return _orchestrator_instance
|
|
|
|
|
|
def smart_dispatch_prepare(task: str, project: str,
|
|
context: Dict[str, Any] = None) -> EnhancedDispatchContext:
|
|
"""
|
|
Convenience function for smart dispatch preparation.
|
|
|
|
Args:
|
|
task: Task description
|
|
project: Target project
|
|
context: Additional context
|
|
|
|
Returns:
|
|
EnhancedDispatchContext
|
|
"""
|
|
return get_orchestrator().prepare_dispatch(task, project, context)
|
|
|
|
|
|
def smart_validate_output(task: str, output: str,
|
|
context: Dict[str, Any] = None) -> ValidationResult:
|
|
"""
|
|
Convenience function for output validation.
|
|
|
|
Args:
|
|
task: Original task
|
|
output: Agent output
|
|
context: Additional context
|
|
|
|
Returns:
|
|
ValidationResult
|
|
"""
|
|
return get_orchestrator().validate_output(task, output, context)
|
|
|
|
|
|
# CLI for testing
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Smart Flow Integration Test")
|
|
logger.info("=" * 60)
|
|
|
|
orchestrator = SmartFlowOrchestrator()
|
|
|
|
# Test cases
|
|
test_cases = [
|
|
("Fix the login button styling", "overbits"),
|
|
("Implement OAuth2 authentication with refresh tokens", "dss"),
|
|
("Research microservices communication patterns", "admin"),
|
|
("List all running Docker containers", "admin"),
|
|
]
|
|
|
|
for task, project in test_cases:
|
|
logger.info(f"\n{'='*40}")
|
|
logger.info(f"Task: {task}")
|
|
logger.info(f"Project: {project}")
|
|
|
|
context = orchestrator.prepare_dispatch(task, project)
|
|
|
|
logger.info(f"\nRouting Decision:")
|
|
if context.routing_decision:
|
|
logger.info(f" Complexity: {context.routing_decision.complexity.value}")
|
|
logger.info(f" Agent: {context.routing_decision.recommended_agent.value}")
|
|
logger.info(f" Confidence: {context.routing_decision.confidence:.2f}")
|
|
logger.info(f" Steps: {context.routing_decision.suggested_steps[:2]}")
|
|
|
|
logger.info(f"\nDomain Context:")
|
|
if context.domain_context:
|
|
logger.info(f" Domain: {context.domain_context.get('domain')}")
|
|
logger.info(f" Confidence: {context.domain_context.get('confidence', 0):.2f}")
|
|
|
|
logger.info(f"\nModel Override: {context.model_override}")
|
|
logger.info(f"Priority: {context.priority}")
|
|
logger.info(f"Tags: {context.tags}")
|
|
|
|
# Test augmented prompt
|
|
augmented = orchestrator.create_augmented_prompt(context)
|
|
logger.info(f"\nAugmented Prompt Preview:")
|
|
logger.info(augmented[:200] + "..." if len(augmented) > 200 else augmented)
|
|
|
|
# Show stats
|
|
logger.info(f"\n{'='*60}")
|
|
stats = orchestrator.get_stats()
|
|
logger.info(f"Stats: {json.dumps(stats, indent=2)}")
|
|
logger.info("=" * 60)
|