Files
luzia/lib/flow_intelligence.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

495 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Flow Intelligence - Intelligent task continuation and flow management
Features:
1. Track task execution flow and state
2. Detect task continuation opportunities
3. Suggest next steps intelligently
4. Learn from completed tasks
5. Optimize execution paths
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from dataclasses import dataclass, asdict, field
import hashlib
@dataclass
class TaskStep:
"""A single step in task execution"""
name: str
description: str
status: str # pending, in_progress, completed, failed
output: Optional[str] = None
error: Optional[str] = None
duration_seconds: Optional[float] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None
@dataclass
class TaskFlow:
"""Tracking flow of a multi-step task"""
task_id: str
task_description: str
project: str
created_at: str
completed_at: Optional[str] = None
status: str = "active" # active, completed, failed, paused
steps: List[TaskStep] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
result: Optional[str] = None
continuation_suggestions: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
class FlowIntelligence:
"""Manages intelligent task flow and continuation"""
def __init__(self, flows_dir: Optional[Path] = None):
"""Initialize flow intelligence
Args:
flows_dir: Directory to store flow records
"""
self.flows_dir = flows_dir or Path("/tmp/.luzia-flows")
self.flows_dir.mkdir(parents=True, exist_ok=True)
self.active_flows: Dict[str, TaskFlow] = {}
self.completed_flows: List[TaskFlow] = []
self.load_flows()
def load_flows(self) -> None:
"""Load flow history from disk"""
if self.flows_dir.exists():
for flow_file in self.flows_dir.glob("*.json"):
try:
data = json.loads(flow_file.read_text())
flow = self._dict_to_flow(data)
if flow.status == "active":
self.active_flows[flow.task_id] = flow
else:
self.completed_flows.append(flow)
except Exception as e:
print(f"[Warning] Failed to load flow {flow_file}: {e}")
def _dict_to_flow(self, data: Dict) -> TaskFlow:
"""Convert dict to TaskFlow"""
steps = [
TaskStep(
name=s.get("name", ""),
description=s.get("description", ""),
status=s.get("status", "pending"),
output=s.get("output"),
error=s.get("error"),
duration_seconds=s.get("duration_seconds"),
started_at=s.get("started_at"),
completed_at=s.get("completed_at")
)
for s in data.get("steps", [])
]
return TaskFlow(
task_id=data.get("task_id", ""),
task_description=data.get("task_description", ""),
project=data.get("project", ""),
created_at=data.get("created_at", ""),
completed_at=data.get("completed_at"),
status=data.get("status", "active"),
steps=steps,
context=data.get("context", {}),
result=data.get("result"),
continuation_suggestions=data.get("continuation_suggestions", []),
tags=data.get("tags", [])
)
def create_flow(self, task_description: str, project: str,
steps: List[str], tags: List[str] = None) -> TaskFlow:
"""Create a new task flow
Args:
task_description: Description of task
project: Project name
steps: List of step descriptions
tags: Optional tags for categorization
Returns:
Created TaskFlow
"""
flow = TaskFlow(
task_id=self._generate_task_id(task_description),
task_description=task_description,
project=project,
created_at=datetime.now().isoformat(),
steps=[
TaskStep(
name=f"step_{i+1}",
description=step,
status="pending"
)
for i, step in enumerate(steps)
],
tags=tags or []
)
self.active_flows[flow.task_id] = flow
self.save_flow(flow)
return flow
def _generate_task_id(self, task_description: str) -> str:
"""Generate unique task ID"""
hash_str = hashlib.md5(
f"{task_description}{datetime.now().isoformat()}".encode()
).hexdigest()[:12]
return f"task_{hash_str}"
def start_step(self, task_id: str, step_name: str) -> None:
"""Mark a step as in progress
Args:
task_id: Task ID
step_name: Step name
"""
flow = self.active_flows.get(task_id)
if not flow:
return
for step in flow.steps:
if step.name == step_name:
step.status = "in_progress"
step.started_at = datetime.now().isoformat()
break
self.save_flow(flow)
def complete_step(self, task_id: str, step_name: str,
output: str, error: Optional[str] = None) -> None:
"""Mark a step as completed
Args:
task_id: Task ID
step_name: Step name
output: Step output
error: Optional error message
"""
flow = self.active_flows.get(task_id)
if not flow:
return
for step in flow.steps:
if step.name == step_name:
step.status = "completed" if not error else "failed"
step.output = output
step.error = error
step.completed_at = datetime.now().isoformat()
if step.started_at:
started = datetime.fromisoformat(step.started_at)
completed = datetime.fromisoformat(step.completed_at)
step.duration_seconds = (completed - started).total_seconds()
break
self.save_flow(flow)
def get_context_for_continuation(self, task_id: str) -> Dict[str, Any]:
"""Get context for continuing a task
Args:
task_id: Task ID
Returns:
Context dict with previous results and state
"""
flow = self.active_flows.get(task_id)
if not flow:
return {}
# Build context from completed steps
context = {
"task_description": flow.task_description,
"project": flow.project,
"previous_results": {},
"state": flow.context,
"completed_steps": [],
"next_steps": [],
"issues": []
}
for i, step in enumerate(flow.steps):
if step.status == "completed":
context["completed_steps"].append({
"name": step.name,
"description": step.description,
"output": step.output[:500] if step.output else "" # Truncate
})
if step.output:
context["previous_results"][step.name] = step.output
elif step.status == "failed":
context["issues"].append(f"{step.name}: {step.error}")
elif step.status == "pending":
context["next_steps"].append(step.description)
return context
def suggest_next_steps(self, task_id: str) -> List[str]:
"""Suggest intelligent next steps for task
Args:
task_id: Task ID
Returns:
List of suggested next steps
"""
flow = self.active_flows.get(task_id)
if not flow:
return []
suggestions = []
# Pending steps
pending = [s for s in flow.steps if s.status == "pending"]
for step in pending[:2]: # Suggest next 2 pending steps
suggestions.append(step.description)
# Failed steps should be retried
failed = [s for s in flow.steps if s.status == "failed"]
if failed:
suggestions.append(f"Retry failed step: {failed[0].description}")
# Pattern-based suggestions
if not suggestions:
# If all steps done, suggest related tasks
suggestions = self._suggest_related_tasks(flow)
return suggestions
def _suggest_related_tasks(self, flow: TaskFlow) -> List[str]:
"""Suggest related tasks based on completed flow"""
suggestions = []
# Check for common follow-up patterns
if "test" in flow.task_description.lower():
suggestions.append("Document test results")
suggestions.append("Update test coverage metrics")
elif "build" in flow.task_description.lower():
suggestions.append("Run integration tests")
suggestions.append("Deploy to staging")
elif "debug" in flow.task_description.lower():
suggestions.append("Write regression test for this bug")
suggestions.append("Update error handling")
return suggestions
def complete_flow(self, task_id: str, result: str) -> None:
"""Mark entire flow as completed
Args:
task_id: Task ID
result: Final result summary
"""
flow = self.active_flows.get(task_id)
if not flow:
return
flow.status = "completed"
flow.result = result
flow.completed_at = datetime.now().isoformat()
flow.continuation_suggestions = self._suggest_follow_ups(flow)
# Move to completed
self.completed_flows.append(flow)
del self.active_flows[task_id]
self.save_flow(flow)
def fail_flow(self, task_id: str, error: str) -> None:
"""Mark flow as failed
Args:
task_id: Task ID
error: Error message
"""
flow = self.active_flows.get(task_id)
if not flow:
return
flow.status = "failed"
flow.result = error
flow.completed_at = datetime.now().isoformat()
# Suggest recovery steps
flow.continuation_suggestions = [
"Review error details",
"Check logs for root cause",
"Attempt recovery with different approach"
]
self.completed_flows.append(flow)
del self.active_flows[task_id]
self.save_flow(flow)
def _suggest_follow_ups(self, flow: TaskFlow) -> List[str]:
"""Suggest follow-up tasks after completion
Args:
flow: Completed flow
Returns:
List of suggested follow-ups
"""
suggestions = []
# Based on task type
task_lower = flow.task_description.lower()
if any(word in task_lower for word in ["implement", "feature", "add"]):
suggestions.extend([
"Write tests for the new feature",
"Update documentation",
"Create deployment checklist"
])
elif any(word in task_lower for word in ["refactor", "optimize"]):
suggestions.extend([
"Benchmark performance improvements",
"Update code documentation",
"Deploy and monitor in production"
])
elif any(word in task_lower for word in ["debug", "fix", "issue"]):
suggestions.extend([
"Add regression test",
"Document the fix",
"Review similar issues"
])
return suggestions
def save_flow(self, flow: TaskFlow) -> None:
"""Save flow to disk
Args:
flow: TaskFlow to save
"""
flow_file = self.flows_dir / f"{flow.task_id}.json"
flow_file.write_text(json.dumps(asdict(flow), indent=2))
def get_flow_summary(self, task_id: str) -> str:
"""Get human-readable flow summary
Args:
task_id: Task ID
Returns:
Formatted summary
"""
flow = self.active_flows.get(task_id) or next(
(f for f in self.completed_flows if f.task_id == task_id),
None
)
if not flow:
return "Flow not found"
lines = [
f"# Task Flow: {flow.task_description}",
f"**Status:** {flow.status}",
f"**Project:** {flow.project}",
f"**Created:** {flow.created_at}",
""
]
# Steps
lines.append("## Steps")
for step in flow.steps:
status_icon = {
"completed": "",
"in_progress": "",
"failed": "",
"pending": ""
}.get(step.status, "?")
lines.append(f"{status_icon} {step.name}: {step.description}")
if step.error:
lines.append(f" Error: {step.error}")
# Result
if flow.result:
lines.append(f"\n## Result\n{flow.result}")
# Suggestions
if flow.continuation_suggestions:
lines.append("\n## Next Steps")
for suggestion in flow.continuation_suggestions:
lines.append(f"- {suggestion}")
return "\n".join(lines)
def get_recent_flows(self, project: Optional[str] = None, limit: int = 10) -> List[TaskFlow]:
"""Get recent flows, optionally filtered by project
Args:
project: Optional project filter
limit: Max flows to return
Returns:
List of recent flows
"""
flows = list(self.active_flows.values()) + self.completed_flows
if project:
flows = [f for f in flows if f.project == project]
# Sort by creation time
flows.sort(
key=lambda f: f.created_at,
reverse=True
)
return flows[:limit]
def export_flow_history(self, output_path: Path) -> None:
"""Export flow history for analysis
Args:
output_path: Path to write export
"""
all_flows = list(self.active_flows.values()) + self.completed_flows
export = {
"total_tasks": len(all_flows),
"active_tasks": len(self.active_flows),
"completed_tasks": len(self.completed_flows),
"by_project": {},
"flows": [asdict(f) for f in all_flows]
}
# Group by project
for flow in all_flows:
if flow.project not in export["by_project"]:
export["by_project"][flow.project] = 0
export["by_project"][flow.project] += 1
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(export, indent=2))
def get_stats(self) -> Dict[str, Any]:
"""Get statistics about task flows
Returns:
Statistics dict
"""
all_flows = list(self.active_flows.values()) + self.completed_flows
completed = self.completed_flows
total_steps = sum(len(f.steps) for f in all_flows)
completed_steps = sum(
len([s for s in f.steps if s.status == "completed"])
for f in all_flows
)
failed_steps = sum(
len([s for s in f.steps if s.status == "failed"])
for f in all_flows
)
return {
"total_flows": len(all_flows),
"active_flows": len(self.active_flows),
"completed_flows": len(completed),
"total_steps": total_steps,
"completed_steps": completed_steps,
"failed_steps": failed_steps,
"completion_rate": completed_steps / total_steps if total_steps > 0 else 0
}