#!/usr/bin/env python3 """ Orchestrator Enhancements - Integration of all flow intelligence components This module integrates: - PromptAugmentor: Context injection and documentation - ToolAutoLoader: Dynamic tool discovery - KnownIssuesDetector: Bug pattern detection - WebSearchIntegrator: Web context enhancement - FlowIntelligence: Task continuation and flow management """ import json from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime from prompt_augmentor import PromptAugmentor, PromptTemplateBuilder from tool_auto_loader import ToolAutoLoader from known_issues_detector import KnownIssuesDetector from web_search_integrator import WebSearchIntegrator from flow_intelligence import FlowIntelligence class OrchestratorEnhancements: """Orchestrates all enhancement components""" def __init__(self, config: Dict[str, Any]): """Initialize orchestrator enhancements Args: config: Orchestrator configuration dict """ self.config = config self.prompt_augmentor: Optional[PromptAugmentor] = None self.tool_loader = ToolAutoLoader() self.issue_detector = KnownIssuesDetector() self.web_search = WebSearchIntegrator() self.flow_intelligence = FlowIntelligence() def initialize_for_project(self, project_name: str, project_config: Dict[str, Any]) -> None: """Initialize enhancements for a specific project Args: project_name: Project name project_config: Project configuration """ # Create project-aware prompt augmentor self.prompt_augmentor = PromptAugmentor( project_config={"name": project_name, **project_config}, tools_available=project_config.get("tools", []) ) def enhance_prompt(self, prompt: str, project: str, task_context: Optional[Dict] = None) -> Tuple[str, Dict[str, Any]]: """Enhance a prompt with full context Args: prompt: Original prompt project: Project name task_context: Optional task continuation context Returns: Tuple of (enhanced_prompt, metadata) """ if not self.prompt_augmentor: project_config = self.config.get("projects", {}).get(project, {}) self.initialize_for_project(project, project_config) # Get available tools tools = self.tool_loader.discover_tools(self.config.get("projects", {}).get(project, {})) recommended_tools = self.tool_loader.recommend_tools(prompt, tools) # Augment the prompt augmented = self.prompt_augmentor.augment(prompt, task_context) # Get web references if needed should_search, search_query = self.web_search.should_search(prompt) references = [] if should_search: learned = self.web_search.search_learned_solutions(search_query) if learned: ref_section = self.web_search.generate_context_section([]) if ref_section: augmented += f"\n\n{ref_section}" # Add tool recommendations tool_ref = self.tool_loader.generate_tool_reference(recommended_tools) augmented += f"\n\n{tool_ref}" metadata = { "enhanced_at": datetime.now().isoformat(), "recommended_tools": recommended_tools, "web_search_enabled": should_search, "search_query": search_query if should_search else None, "task_context_provided": task_context is not None } return augmented, metadata def detect_issues_in_output(self, output: str, error: str = "", project: Optional[str] = None) -> Tuple[List[Any], str]: """Detect issues in task output Args: output: Task output error: Error message if any project: Optional project name Returns: Tuple of (detected_issues, formatted_report) """ detected = self.issue_detector.detect_issues(output, error, project) if not detected: return [], "No issues detected." report = self.issue_detector.format_issue_report(detected) return detected, report def continue_task(self, task_id: str, project: str) -> Dict[str, Any]: """Get context for continuing a task Args: task_id: Task ID project: Project name Returns: Context dict for continuation """ context = self.flow_intelligence.get_context_for_continuation(task_id) next_steps = self.flow_intelligence.suggest_next_steps(task_id) context["suggested_next_steps"] = next_steps context["project"] = project return context def start_task_flow(self, task_description: str, project: str, steps: List[str], tags: List[str] = None) -> str: """Start tracking a multi-step task Args: task_description: Task description project: Project name steps: List of step descriptions tags: Optional tags Returns: Task ID """ flow = self.flow_intelligence.create_flow(task_description, project, steps, tags) return flow.task_id def update_task_step(self, task_id: str, step_name: str, output: str, error: Optional[str] = None) -> None: """Update task step progress Args: task_id: Task ID step_name: Step name output: Step output error: Optional error """ self.flow_intelligence.start_step(task_id, step_name) self.flow_intelligence.complete_step(task_id, step_name, output, error) def complete_task(self, task_id: str, result: str) -> List[str]: """Complete a task and get follow-up suggestions Args: task_id: Task ID result: Final result Returns: List of follow-up suggestions """ self.flow_intelligence.complete_flow(task_id, result) flow = next( (f for f in self.flow_intelligence.completed_flows if f.task_id == task_id), None ) return flow.continuation_suggestions if flow else [] def record_tool_usage(self, tool: str) -> None: """Record that a tool was used Args: tool: Tool name """ self.tool_loader.record_tool_usage(tool) def record_learned_solution(self, problem: str, solution: str, references: List[str], tags: List[str], confidence: float = 0.8) -> None: """Record a learned solution for future reference Args: problem: Problem description solution: Solution description references: Reference URLs tags: Topic tags confidence: Confidence level """ self.web_search.learn_solution(problem, solution, references, tags, confidence) def generate_analysis_prompt(self, topic: str, context: str, focus_areas: List[str]) -> str: """Generate an analysis task prompt Args: topic: Analysis topic context: Context information focus_areas: Areas to focus on Returns: Analysis prompt """ return PromptTemplateBuilder.build_analysis_prompt(topic, context, focus_areas) def generate_debug_prompt(self, issue: str, symptoms: str, relevant_files: List[str]) -> str: """Generate a debugging task prompt Args: issue: Issue description symptoms: Symptoms relevant_files: Relevant file paths Returns: Debug prompt """ return PromptTemplateBuilder.build_debug_prompt(issue, symptoms, relevant_files) def generate_implementation_prompt(self, feature: str, requirements: List[str], constraints: List[str]) -> str: """Generate an implementation task prompt Args: feature: Feature to implement requirements: Requirements list constraints: Constraints list Returns: Implementation prompt """ return PromptTemplateBuilder.build_implementation_prompt(feature, requirements, constraints) def get_project_intelligence_summary(self, project: str) -> Dict[str, Any]: """Get summary of intelligence about a project Args: project: Project name Returns: Intelligence summary """ recent_flows = self.flow_intelligence.get_recent_flows(project, limit=5) recent_issues = self.issue_detector.get_recent_issues(limit=5) web_stats = self.web_search.get_stats() tool_stats = self.tool_loader.usage_stats return { "project": project, "recent_tasks": [ { "id": f.task_id, "description": f.task_description, "status": f.status } for f in recent_flows ], "recent_issues": recent_issues, "web_search_stats": web_stats, "top_tools": self.tool_loader.get_top_tools( self.tool_loader.discover_tools( self.config.get("projects", {}).get(project, {}) ), limit=5 ) } def export_all_analytics(self, output_dir: Path) -> None: """Export all analytics and learned data Args: output_dir: Directory to export to """ output_dir.mkdir(parents=True, exist_ok=True) # Flow history self.flow_intelligence.export_flow_history(output_dir / "flows.json") # Issue statistics issue_stats = self.issue_detector.get_issue_statistics() (output_dir / "issue_stats.json").write_text( json.dumps(issue_stats, indent=2) ) # Web search learning self.web_search.export_learning_data(output_dir / "learning.json") # Tool usage (output_dir / "tool_usage.json").write_text( json.dumps(self.tool_loader.usage_stats, indent=2) ) print(f"[Orchestrator] Analytics exported to {output_dir}") def get_orchestration_status(self) -> Dict[str, Any]: """Get overall orchestration status Returns: Status dict """ flow_stats = self.flow_intelligence.get_stats() issue_stats = self.issue_detector.get_issue_statistics() return { "timestamp": datetime.now().isoformat(), "active_tasks": flow_stats["active_flows"], "completed_tasks": flow_stats["completed_flows"], "total_steps": flow_stats["total_steps"], "step_completion_rate": f"{flow_stats['completion_rate']:.1%}", "issues_detected": issue_stats["total_detected"], "fixes_applied": issue_stats["fixes_attempted"], "fix_success_rate": f"{issue_stats['fix_success_rate']:.1%}", "tools_available": len(self.tool_loader.tools_cache), "learning_records": len(self.web_search.learning_db) }