#!/usr/bin/env python3 """ Flow Intelligence - Intelligent task continuation and flow management Features: 1. Track task execution flow and state 2. Detect task continuation opportunities 3. Suggest next steps intelligently 4. Learn from completed tasks 5. Optimize execution paths """ import json from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime from dataclasses import dataclass, asdict, field import hashlib @dataclass class TaskStep: """A single step in task execution""" name: str description: str status: str # pending, in_progress, completed, failed output: Optional[str] = None error: Optional[str] = None duration_seconds: Optional[float] = None started_at: Optional[str] = None completed_at: Optional[str] = None @dataclass class TaskFlow: """Tracking flow of a multi-step task""" task_id: str task_description: str project: str created_at: str completed_at: Optional[str] = None status: str = "active" # active, completed, failed, paused steps: List[TaskStep] = field(default_factory=list) context: Dict[str, Any] = field(default_factory=dict) result: Optional[str] = None continuation_suggestions: List[str] = field(default_factory=list) tags: List[str] = field(default_factory=list) class FlowIntelligence: """Manages intelligent task flow and continuation""" def __init__(self, flows_dir: Optional[Path] = None): """Initialize flow intelligence Args: flows_dir: Directory to store flow records """ self.flows_dir = flows_dir or Path("/tmp/.luzia-flows") self.flows_dir.mkdir(parents=True, exist_ok=True) self.active_flows: Dict[str, TaskFlow] = {} self.completed_flows: List[TaskFlow] = [] self.load_flows() def load_flows(self) -> None: """Load flow history from disk""" if self.flows_dir.exists(): for flow_file in self.flows_dir.glob("*.json"): try: data = json.loads(flow_file.read_text()) flow = self._dict_to_flow(data) if flow.status == "active": self.active_flows[flow.task_id] = flow else: self.completed_flows.append(flow) except Exception as e: print(f"[Warning] Failed to load flow {flow_file}: {e}") def _dict_to_flow(self, data: Dict) -> TaskFlow: """Convert dict to TaskFlow""" steps = [ TaskStep( name=s.get("name", ""), description=s.get("description", ""), status=s.get("status", "pending"), output=s.get("output"), error=s.get("error"), duration_seconds=s.get("duration_seconds"), started_at=s.get("started_at"), completed_at=s.get("completed_at") ) for s in data.get("steps", []) ] return TaskFlow( task_id=data.get("task_id", ""), task_description=data.get("task_description", ""), project=data.get("project", ""), created_at=data.get("created_at", ""), completed_at=data.get("completed_at"), status=data.get("status", "active"), steps=steps, context=data.get("context", {}), result=data.get("result"), continuation_suggestions=data.get("continuation_suggestions", []), tags=data.get("tags", []) ) def create_flow(self, task_description: str, project: str, steps: List[str], tags: List[str] = None) -> TaskFlow: """Create a new task flow Args: task_description: Description of task project: Project name steps: List of step descriptions tags: Optional tags for categorization Returns: Created TaskFlow """ flow = TaskFlow( task_id=self._generate_task_id(task_description), task_description=task_description, project=project, created_at=datetime.now().isoformat(), steps=[ TaskStep( name=f"step_{i+1}", description=step, status="pending" ) for i, step in enumerate(steps) ], tags=tags or [] ) self.active_flows[flow.task_id] = flow self.save_flow(flow) return flow def _generate_task_id(self, task_description: str) -> str: """Generate unique task ID""" hash_str = hashlib.md5( f"{task_description}{datetime.now().isoformat()}".encode() ).hexdigest()[:12] return f"task_{hash_str}" def start_step(self, task_id: str, step_name: str) -> None: """Mark a step as in progress Args: task_id: Task ID step_name: Step name """ flow = self.active_flows.get(task_id) if not flow: return for step in flow.steps: if step.name == step_name: step.status = "in_progress" step.started_at = datetime.now().isoformat() break self.save_flow(flow) def complete_step(self, task_id: str, step_name: str, output: str, error: Optional[str] = None) -> None: """Mark a step as completed Args: task_id: Task ID step_name: Step name output: Step output error: Optional error message """ flow = self.active_flows.get(task_id) if not flow: return for step in flow.steps: if step.name == step_name: step.status = "completed" if not error else "failed" step.output = output step.error = error step.completed_at = datetime.now().isoformat() if step.started_at: started = datetime.fromisoformat(step.started_at) completed = datetime.fromisoformat(step.completed_at) step.duration_seconds = (completed - started).total_seconds() break self.save_flow(flow) def get_context_for_continuation(self, task_id: str) -> Dict[str, Any]: """Get context for continuing a task Args: task_id: Task ID Returns: Context dict with previous results and state """ flow = self.active_flows.get(task_id) if not flow: return {} # Build context from completed steps context = { "task_description": flow.task_description, "project": flow.project, "previous_results": {}, "state": flow.context, "completed_steps": [], "next_steps": [], "issues": [] } for i, step in enumerate(flow.steps): if step.status == "completed": context["completed_steps"].append({ "name": step.name, "description": step.description, "output": step.output[:500] if step.output else "" # Truncate }) if step.output: context["previous_results"][step.name] = step.output elif step.status == "failed": context["issues"].append(f"{step.name}: {step.error}") elif step.status == "pending": context["next_steps"].append(step.description) return context def suggest_next_steps(self, task_id: str) -> List[str]: """Suggest intelligent next steps for task Args: task_id: Task ID Returns: List of suggested next steps """ flow = self.active_flows.get(task_id) if not flow: return [] suggestions = [] # Pending steps pending = [s for s in flow.steps if s.status == "pending"] for step in pending[:2]: # Suggest next 2 pending steps suggestions.append(step.description) # Failed steps should be retried failed = [s for s in flow.steps if s.status == "failed"] if failed: suggestions.append(f"Retry failed step: {failed[0].description}") # Pattern-based suggestions if not suggestions: # If all steps done, suggest related tasks suggestions = self._suggest_related_tasks(flow) return suggestions def _suggest_related_tasks(self, flow: TaskFlow) -> List[str]: """Suggest related tasks based on completed flow""" suggestions = [] # Check for common follow-up patterns if "test" in flow.task_description.lower(): suggestions.append("Document test results") suggestions.append("Update test coverage metrics") elif "build" in flow.task_description.lower(): suggestions.append("Run integration tests") suggestions.append("Deploy to staging") elif "debug" in flow.task_description.lower(): suggestions.append("Write regression test for this bug") suggestions.append("Update error handling") return suggestions def complete_flow(self, task_id: str, result: str) -> None: """Mark entire flow as completed Args: task_id: Task ID result: Final result summary """ flow = self.active_flows.get(task_id) if not flow: return flow.status = "completed" flow.result = result flow.completed_at = datetime.now().isoformat() flow.continuation_suggestions = self._suggest_follow_ups(flow) # Move to completed self.completed_flows.append(flow) del self.active_flows[task_id] self.save_flow(flow) def fail_flow(self, task_id: str, error: str) -> None: """Mark flow as failed Args: task_id: Task ID error: Error message """ flow = self.active_flows.get(task_id) if not flow: return flow.status = "failed" flow.result = error flow.completed_at = datetime.now().isoformat() # Suggest recovery steps flow.continuation_suggestions = [ "Review error details", "Check logs for root cause", "Attempt recovery with different approach" ] self.completed_flows.append(flow) del self.active_flows[task_id] self.save_flow(flow) def _suggest_follow_ups(self, flow: TaskFlow) -> List[str]: """Suggest follow-up tasks after completion Args: flow: Completed flow Returns: List of suggested follow-ups """ suggestions = [] # Based on task type task_lower = flow.task_description.lower() if any(word in task_lower for word in ["implement", "feature", "add"]): suggestions.extend([ "Write tests for the new feature", "Update documentation", "Create deployment checklist" ]) elif any(word in task_lower for word in ["refactor", "optimize"]): suggestions.extend([ "Benchmark performance improvements", "Update code documentation", "Deploy and monitor in production" ]) elif any(word in task_lower for word in ["debug", "fix", "issue"]): suggestions.extend([ "Add regression test", "Document the fix", "Review similar issues" ]) return suggestions def save_flow(self, flow: TaskFlow) -> None: """Save flow to disk Args: flow: TaskFlow to save """ flow_file = self.flows_dir / f"{flow.task_id}.json" flow_file.write_text(json.dumps(asdict(flow), indent=2)) def get_flow_summary(self, task_id: str) -> str: """Get human-readable flow summary Args: task_id: Task ID Returns: Formatted summary """ flow = self.active_flows.get(task_id) or next( (f for f in self.completed_flows if f.task_id == task_id), None ) if not flow: return "Flow not found" lines = [ f"# Task Flow: {flow.task_description}", f"**Status:** {flow.status}", f"**Project:** {flow.project}", f"**Created:** {flow.created_at}", "" ] # Steps lines.append("## Steps") for step in flow.steps: status_icon = { "completed": "✅", "in_progress": "⏳", "failed": "❌", "pending": "⭕" }.get(step.status, "?") lines.append(f"{status_icon} {step.name}: {step.description}") if step.error: lines.append(f" Error: {step.error}") # Result if flow.result: lines.append(f"\n## Result\n{flow.result}") # Suggestions if flow.continuation_suggestions: lines.append("\n## Next Steps") for suggestion in flow.continuation_suggestions: lines.append(f"- {suggestion}") return "\n".join(lines) def get_recent_flows(self, project: Optional[str] = None, limit: int = 10) -> List[TaskFlow]: """Get recent flows, optionally filtered by project Args: project: Optional project filter limit: Max flows to return Returns: List of recent flows """ flows = list(self.active_flows.values()) + self.completed_flows if project: flows = [f for f in flows if f.project == project] # Sort by creation time flows.sort( key=lambda f: f.created_at, reverse=True ) return flows[:limit] def export_flow_history(self, output_path: Path) -> None: """Export flow history for analysis Args: output_path: Path to write export """ all_flows = list(self.active_flows.values()) + self.completed_flows export = { "total_tasks": len(all_flows), "active_tasks": len(self.active_flows), "completed_tasks": len(self.completed_flows), "by_project": {}, "flows": [asdict(f) for f in all_flows] } # Group by project for flow in all_flows: if flow.project not in export["by_project"]: export["by_project"][flow.project] = 0 export["by_project"][flow.project] += 1 output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(export, indent=2)) def get_stats(self) -> Dict[str, Any]: """Get statistics about task flows Returns: Statistics dict """ all_flows = list(self.active_flows.values()) + self.completed_flows completed = self.completed_flows total_steps = sum(len(f.steps) for f in all_flows) completed_steps = sum( len([s for s in f.steps if s.status == "completed"]) for f in all_flows ) failed_steps = sum( len([s for s in f.steps if s.status == "failed"]) for f in all_flows ) return { "total_flows": len(all_flows), "active_flows": len(self.active_flows), "completed_flows": len(completed), "total_steps": total_steps, "completed_steps": completed_steps, "failed_steps": failed_steps, "completion_rate": completed_steps / total_steps if total_steps > 0 else 0 }