Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
495 lines
16 KiB
Python
495 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Flow Intelligence - Intelligent task continuation and flow management
|
|
|
|
Features:
|
|
1. Track task execution flow and state
|
|
2. Detect task continuation opportunities
|
|
3. Suggest next steps intelligently
|
|
4. Learn from completed tasks
|
|
5. Optimize execution paths
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict, field
|
|
import hashlib
|
|
|
|
@dataclass
|
|
class TaskStep:
|
|
"""A single step in task execution"""
|
|
name: str
|
|
description: str
|
|
status: str # pending, in_progress, completed, failed
|
|
output: Optional[str] = None
|
|
error: Optional[str] = None
|
|
duration_seconds: Optional[float] = None
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
|
|
@dataclass
|
|
class TaskFlow:
|
|
"""Tracking flow of a multi-step task"""
|
|
task_id: str
|
|
task_description: str
|
|
project: str
|
|
created_at: str
|
|
completed_at: Optional[str] = None
|
|
status: str = "active" # active, completed, failed, paused
|
|
steps: List[TaskStep] = field(default_factory=list)
|
|
context: Dict[str, Any] = field(default_factory=dict)
|
|
result: Optional[str] = None
|
|
continuation_suggestions: List[str] = field(default_factory=list)
|
|
tags: List[str] = field(default_factory=list)
|
|
|
|
class FlowIntelligence:
|
|
"""Manages intelligent task flow and continuation"""
|
|
|
|
def __init__(self, flows_dir: Optional[Path] = None):
|
|
"""Initialize flow intelligence
|
|
|
|
Args:
|
|
flows_dir: Directory to store flow records
|
|
"""
|
|
self.flows_dir = flows_dir or Path("/tmp/.luzia-flows")
|
|
self.flows_dir.mkdir(parents=True, exist_ok=True)
|
|
self.active_flows: Dict[str, TaskFlow] = {}
|
|
self.completed_flows: List[TaskFlow] = []
|
|
self.load_flows()
|
|
|
|
def load_flows(self) -> None:
|
|
"""Load flow history from disk"""
|
|
if self.flows_dir.exists():
|
|
for flow_file in self.flows_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(flow_file.read_text())
|
|
flow = self._dict_to_flow(data)
|
|
if flow.status == "active":
|
|
self.active_flows[flow.task_id] = flow
|
|
else:
|
|
self.completed_flows.append(flow)
|
|
except Exception as e:
|
|
print(f"[Warning] Failed to load flow {flow_file}: {e}")
|
|
|
|
def _dict_to_flow(self, data: Dict) -> TaskFlow:
|
|
"""Convert dict to TaskFlow"""
|
|
steps = [
|
|
TaskStep(
|
|
name=s.get("name", ""),
|
|
description=s.get("description", ""),
|
|
status=s.get("status", "pending"),
|
|
output=s.get("output"),
|
|
error=s.get("error"),
|
|
duration_seconds=s.get("duration_seconds"),
|
|
started_at=s.get("started_at"),
|
|
completed_at=s.get("completed_at")
|
|
)
|
|
for s in data.get("steps", [])
|
|
]
|
|
return TaskFlow(
|
|
task_id=data.get("task_id", ""),
|
|
task_description=data.get("task_description", ""),
|
|
project=data.get("project", ""),
|
|
created_at=data.get("created_at", ""),
|
|
completed_at=data.get("completed_at"),
|
|
status=data.get("status", "active"),
|
|
steps=steps,
|
|
context=data.get("context", {}),
|
|
result=data.get("result"),
|
|
continuation_suggestions=data.get("continuation_suggestions", []),
|
|
tags=data.get("tags", [])
|
|
)
|
|
|
|
def create_flow(self, task_description: str, project: str,
|
|
steps: List[str], tags: List[str] = None) -> TaskFlow:
|
|
"""Create a new task flow
|
|
|
|
Args:
|
|
task_description: Description of task
|
|
project: Project name
|
|
steps: List of step descriptions
|
|
tags: Optional tags for categorization
|
|
|
|
Returns:
|
|
Created TaskFlow
|
|
"""
|
|
flow = TaskFlow(
|
|
task_id=self._generate_task_id(task_description),
|
|
task_description=task_description,
|
|
project=project,
|
|
created_at=datetime.now().isoformat(),
|
|
steps=[
|
|
TaskStep(
|
|
name=f"step_{i+1}",
|
|
description=step,
|
|
status="pending"
|
|
)
|
|
for i, step in enumerate(steps)
|
|
],
|
|
tags=tags or []
|
|
)
|
|
self.active_flows[flow.task_id] = flow
|
|
self.save_flow(flow)
|
|
return flow
|
|
|
|
def _generate_task_id(self, task_description: str) -> str:
|
|
"""Generate unique task ID"""
|
|
hash_str = hashlib.md5(
|
|
f"{task_description}{datetime.now().isoformat()}".encode()
|
|
).hexdigest()[:12]
|
|
return f"task_{hash_str}"
|
|
|
|
def start_step(self, task_id: str, step_name: str) -> None:
|
|
"""Mark a step as in progress
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
step_name: Step name
|
|
"""
|
|
flow = self.active_flows.get(task_id)
|
|
if not flow:
|
|
return
|
|
|
|
for step in flow.steps:
|
|
if step.name == step_name:
|
|
step.status = "in_progress"
|
|
step.started_at = datetime.now().isoformat()
|
|
break
|
|
|
|
self.save_flow(flow)
|
|
|
|
def complete_step(self, task_id: str, step_name: str,
|
|
output: str, error: Optional[str] = None) -> None:
|
|
"""Mark a step as completed
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
step_name: Step name
|
|
output: Step output
|
|
error: Optional error message
|
|
"""
|
|
flow = self.active_flows.get(task_id)
|
|
if not flow:
|
|
return
|
|
|
|
for step in flow.steps:
|
|
if step.name == step_name:
|
|
step.status = "completed" if not error else "failed"
|
|
step.output = output
|
|
step.error = error
|
|
step.completed_at = datetime.now().isoformat()
|
|
if step.started_at:
|
|
started = datetime.fromisoformat(step.started_at)
|
|
completed = datetime.fromisoformat(step.completed_at)
|
|
step.duration_seconds = (completed - started).total_seconds()
|
|
break
|
|
|
|
self.save_flow(flow)
|
|
|
|
def get_context_for_continuation(self, task_id: str) -> Dict[str, Any]:
|
|
"""Get context for continuing a task
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
|
|
Returns:
|
|
Context dict with previous results and state
|
|
"""
|
|
flow = self.active_flows.get(task_id)
|
|
if not flow:
|
|
return {}
|
|
|
|
# Build context from completed steps
|
|
context = {
|
|
"task_description": flow.task_description,
|
|
"project": flow.project,
|
|
"previous_results": {},
|
|
"state": flow.context,
|
|
"completed_steps": [],
|
|
"next_steps": [],
|
|
"issues": []
|
|
}
|
|
|
|
for i, step in enumerate(flow.steps):
|
|
if step.status == "completed":
|
|
context["completed_steps"].append({
|
|
"name": step.name,
|
|
"description": step.description,
|
|
"output": step.output[:500] if step.output else "" # Truncate
|
|
})
|
|
if step.output:
|
|
context["previous_results"][step.name] = step.output
|
|
elif step.status == "failed":
|
|
context["issues"].append(f"{step.name}: {step.error}")
|
|
elif step.status == "pending":
|
|
context["next_steps"].append(step.description)
|
|
|
|
return context
|
|
|
|
def suggest_next_steps(self, task_id: str) -> List[str]:
|
|
"""Suggest intelligent next steps for task
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
|
|
Returns:
|
|
List of suggested next steps
|
|
"""
|
|
flow = self.active_flows.get(task_id)
|
|
if not flow:
|
|
return []
|
|
|
|
suggestions = []
|
|
|
|
# Pending steps
|
|
pending = [s for s in flow.steps if s.status == "pending"]
|
|
for step in pending[:2]: # Suggest next 2 pending steps
|
|
suggestions.append(step.description)
|
|
|
|
# Failed steps should be retried
|
|
failed = [s for s in flow.steps if s.status == "failed"]
|
|
if failed:
|
|
suggestions.append(f"Retry failed step: {failed[0].description}")
|
|
|
|
# Pattern-based suggestions
|
|
if not suggestions:
|
|
# If all steps done, suggest related tasks
|
|
suggestions = self._suggest_related_tasks(flow)
|
|
|
|
return suggestions
|
|
|
|
def _suggest_related_tasks(self, flow: TaskFlow) -> List[str]:
|
|
"""Suggest related tasks based on completed flow"""
|
|
suggestions = []
|
|
|
|
# Check for common follow-up patterns
|
|
if "test" in flow.task_description.lower():
|
|
suggestions.append("Document test results")
|
|
suggestions.append("Update test coverage metrics")
|
|
elif "build" in flow.task_description.lower():
|
|
suggestions.append("Run integration tests")
|
|
suggestions.append("Deploy to staging")
|
|
elif "debug" in flow.task_description.lower():
|
|
suggestions.append("Write regression test for this bug")
|
|
suggestions.append("Update error handling")
|
|
|
|
return suggestions
|
|
|
|
def complete_flow(self, task_id: str, result: str) -> None:
|
|
"""Mark entire flow as completed
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
result: Final result summary
|
|
"""
|
|
flow = self.active_flows.get(task_id)
|
|
if not flow:
|
|
return
|
|
|
|
flow.status = "completed"
|
|
flow.result = result
|
|
flow.completed_at = datetime.now().isoformat()
|
|
flow.continuation_suggestions = self._suggest_follow_ups(flow)
|
|
|
|
# Move to completed
|
|
self.completed_flows.append(flow)
|
|
del self.active_flows[task_id]
|
|
self.save_flow(flow)
|
|
|
|
def fail_flow(self, task_id: str, error: str) -> None:
|
|
"""Mark flow as failed
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
error: Error message
|
|
"""
|
|
flow = self.active_flows.get(task_id)
|
|
if not flow:
|
|
return
|
|
|
|
flow.status = "failed"
|
|
flow.result = error
|
|
flow.completed_at = datetime.now().isoformat()
|
|
|
|
# Suggest recovery steps
|
|
flow.continuation_suggestions = [
|
|
"Review error details",
|
|
"Check logs for root cause",
|
|
"Attempt recovery with different approach"
|
|
]
|
|
|
|
self.completed_flows.append(flow)
|
|
del self.active_flows[task_id]
|
|
self.save_flow(flow)
|
|
|
|
def _suggest_follow_ups(self, flow: TaskFlow) -> List[str]:
|
|
"""Suggest follow-up tasks after completion
|
|
|
|
Args:
|
|
flow: Completed flow
|
|
|
|
Returns:
|
|
List of suggested follow-ups
|
|
"""
|
|
suggestions = []
|
|
|
|
# Based on task type
|
|
task_lower = flow.task_description.lower()
|
|
|
|
if any(word in task_lower for word in ["implement", "feature", "add"]):
|
|
suggestions.extend([
|
|
"Write tests for the new feature",
|
|
"Update documentation",
|
|
"Create deployment checklist"
|
|
])
|
|
elif any(word in task_lower for word in ["refactor", "optimize"]):
|
|
suggestions.extend([
|
|
"Benchmark performance improvements",
|
|
"Update code documentation",
|
|
"Deploy and monitor in production"
|
|
])
|
|
elif any(word in task_lower for word in ["debug", "fix", "issue"]):
|
|
suggestions.extend([
|
|
"Add regression test",
|
|
"Document the fix",
|
|
"Review similar issues"
|
|
])
|
|
|
|
return suggestions
|
|
|
|
def save_flow(self, flow: TaskFlow) -> None:
|
|
"""Save flow to disk
|
|
|
|
Args:
|
|
flow: TaskFlow to save
|
|
"""
|
|
flow_file = self.flows_dir / f"{flow.task_id}.json"
|
|
flow_file.write_text(json.dumps(asdict(flow), indent=2))
|
|
|
|
def get_flow_summary(self, task_id: str) -> str:
|
|
"""Get human-readable flow summary
|
|
|
|
Args:
|
|
task_id: Task ID
|
|
|
|
Returns:
|
|
Formatted summary
|
|
"""
|
|
flow = self.active_flows.get(task_id) or next(
|
|
(f for f in self.completed_flows if f.task_id == task_id),
|
|
None
|
|
)
|
|
|
|
if not flow:
|
|
return "Flow not found"
|
|
|
|
lines = [
|
|
f"# Task Flow: {flow.task_description}",
|
|
f"**Status:** {flow.status}",
|
|
f"**Project:** {flow.project}",
|
|
f"**Created:** {flow.created_at}",
|
|
""
|
|
]
|
|
|
|
# Steps
|
|
lines.append("## Steps")
|
|
for step in flow.steps:
|
|
status_icon = {
|
|
"completed": "✅",
|
|
"in_progress": "⏳",
|
|
"failed": "❌",
|
|
"pending": "⭕"
|
|
}.get(step.status, "?")
|
|
lines.append(f"{status_icon} {step.name}: {step.description}")
|
|
if step.error:
|
|
lines.append(f" Error: {step.error}")
|
|
|
|
# Result
|
|
if flow.result:
|
|
lines.append(f"\n## Result\n{flow.result}")
|
|
|
|
# Suggestions
|
|
if flow.continuation_suggestions:
|
|
lines.append("\n## Next Steps")
|
|
for suggestion in flow.continuation_suggestions:
|
|
lines.append(f"- {suggestion}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def get_recent_flows(self, project: Optional[str] = None, limit: int = 10) -> List[TaskFlow]:
|
|
"""Get recent flows, optionally filtered by project
|
|
|
|
Args:
|
|
project: Optional project filter
|
|
limit: Max flows to return
|
|
|
|
Returns:
|
|
List of recent flows
|
|
"""
|
|
flows = list(self.active_flows.values()) + self.completed_flows
|
|
if project:
|
|
flows = [f for f in flows if f.project == project]
|
|
|
|
# Sort by creation time
|
|
flows.sort(
|
|
key=lambda f: f.created_at,
|
|
reverse=True
|
|
)
|
|
|
|
return flows[:limit]
|
|
|
|
def export_flow_history(self, output_path: Path) -> None:
|
|
"""Export flow history for analysis
|
|
|
|
Args:
|
|
output_path: Path to write export
|
|
"""
|
|
all_flows = list(self.active_flows.values()) + self.completed_flows
|
|
export = {
|
|
"total_tasks": len(all_flows),
|
|
"active_tasks": len(self.active_flows),
|
|
"completed_tasks": len(self.completed_flows),
|
|
"by_project": {},
|
|
"flows": [asdict(f) for f in all_flows]
|
|
}
|
|
|
|
# Group by project
|
|
for flow in all_flows:
|
|
if flow.project not in export["by_project"]:
|
|
export["by_project"][flow.project] = 0
|
|
export["by_project"][flow.project] += 1
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(json.dumps(export, indent=2))
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get statistics about task flows
|
|
|
|
Returns:
|
|
Statistics dict
|
|
"""
|
|
all_flows = list(self.active_flows.values()) + self.completed_flows
|
|
completed = self.completed_flows
|
|
|
|
total_steps = sum(len(f.steps) for f in all_flows)
|
|
completed_steps = sum(
|
|
len([s for s in f.steps if s.status == "completed"])
|
|
for f in all_flows
|
|
)
|
|
failed_steps = sum(
|
|
len([s for s in f.steps if s.status == "failed"])
|
|
for f in all_flows
|
|
)
|
|
|
|
return {
|
|
"total_flows": len(all_flows),
|
|
"active_flows": len(self.active_flows),
|
|
"completed_flows": len(completed),
|
|
"total_steps": total_steps,
|
|
"completed_steps": completed_steps,
|
|
"failed_steps": failed_steps,
|
|
"completion_rate": completed_steps / total_steps if total_steps > 0 else 0
|
|
}
|