Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
324 lines
12 KiB
Python
324 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sub-Agent Flow Integration - Integrates sub-agent context with Luzia 9-phase flow
|
|
|
|
Features:
|
|
1. Inject parent context into sub-agent flow execution
|
|
2. Execute 9-phase flow with context awareness
|
|
3. Coordinate sub-agent execution
|
|
4. Aggregate results from sub-agents
|
|
5. Propagate learnings back to parent task
|
|
"""
|
|
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from datetime import datetime
|
|
from sub_agent_context import SubAgentContextManager, SubAgentContext
|
|
|
|
|
|
class SubAgentFlowIntegrator:
|
|
"""Integrates sub-agent context management with task execution flow"""
|
|
|
|
def __init__(self, context_manager: Optional[SubAgentContextManager] = None):
|
|
"""Initialize flow integrator
|
|
|
|
Args:
|
|
context_manager: SubAgentContextManager instance
|
|
"""
|
|
self.context_manager = context_manager or SubAgentContextManager()
|
|
self.phase_handlers: Dict[str, Callable] = {}
|
|
|
|
def register_phase_handler(
|
|
self, phase_name: str, handler: Callable[[SubAgentContext], Dict[str, Any]]
|
|
) -> None:
|
|
"""Register handler for a specific phase
|
|
|
|
Args:
|
|
phase_name: Name of phase (e.g., 'CONTEXT_PREP', 'ANALYZING')
|
|
handler: Callable that executes phase logic
|
|
"""
|
|
self.phase_handlers[phase_name] = handler
|
|
|
|
def execute_sub_agent_flow(
|
|
self,
|
|
parent_task_id: str,
|
|
parent_project: str,
|
|
parent_description: str,
|
|
parent_context: Optional[Dict[str, Any]] = None,
|
|
parent_tags: Optional[List[str]] = None,
|
|
parent_metadata: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Execute full 9-phase flow for a sub-agent
|
|
|
|
Args:
|
|
parent_task_id: ID of parent task
|
|
parent_project: Parent project
|
|
parent_description: Parent task description
|
|
parent_context: Parent task context
|
|
parent_tags: Tags from parent
|
|
parent_metadata: Metadata from parent
|
|
|
|
Returns:
|
|
Results from all phases
|
|
"""
|
|
# Create sub-agent context
|
|
context = self.context_manager.create_sub_agent_context(
|
|
parent_task_id=parent_task_id,
|
|
parent_project=parent_project,
|
|
parent_description=parent_description,
|
|
parent_context=parent_context,
|
|
parent_tags=parent_tags,
|
|
parent_metadata=parent_metadata,
|
|
)
|
|
|
|
results = {"sub_agent_id": context.sub_agent_id, "phases": {}}
|
|
|
|
# Execute each phase
|
|
phases = [p.phase_name for p in context.phase_progression]
|
|
for phase_name in phases:
|
|
phase_result = self.execute_phase(context.sub_agent_id, phase_name)
|
|
results["phases"][phase_name] = phase_result
|
|
|
|
return results
|
|
|
|
def execute_phase(self, sub_agent_id: str, phase_name: str) -> Dict[str, Any]:
|
|
"""Execute a single phase for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
phase_name: Name of phase to execute
|
|
|
|
Returns:
|
|
Phase execution results
|
|
"""
|
|
context = self.context_manager.get_sub_agent_context(sub_agent_id)
|
|
if not context:
|
|
return {"error": f"Context not found for {sub_agent_id}"}
|
|
|
|
# Mark phase as in progress
|
|
self.context_manager.update_phase(sub_agent_id, phase_name, "in_progress")
|
|
|
|
try:
|
|
# Execute phase handler if registered
|
|
if phase_name in self.phase_handlers:
|
|
handler = self.phase_handlers[phase_name]
|
|
output = handler(context)
|
|
else:
|
|
# Default phase execution
|
|
output = self._execute_default_phase(context, phase_name)
|
|
|
|
# Mark phase as complete
|
|
self.context_manager.update_phase(
|
|
sub_agent_id, phase_name, "completed", output=str(output)
|
|
)
|
|
|
|
return {"status": "completed", "output": output}
|
|
|
|
except Exception as e:
|
|
# Mark phase as failed
|
|
self.context_manager.update_phase(
|
|
sub_agent_id, phase_name, "failed", error=str(e)
|
|
)
|
|
return {"status": "failed", "error": str(e)}
|
|
|
|
def _execute_default_phase(self, context: SubAgentContext, phase_name: str) -> Dict[str, Any]:
|
|
"""Execute default behavior for a phase
|
|
|
|
Args:
|
|
context: SubAgentContext
|
|
phase_name: Name of phase
|
|
|
|
Returns:
|
|
Phase output
|
|
"""
|
|
output = {
|
|
"phase": phase_name,
|
|
"parent_task": context.parent_task_id,
|
|
"parent_project": context.parent_project,
|
|
"sibling_agents": len(context.sibling_agents),
|
|
}
|
|
|
|
if phase_name == "CONTEXT_PREP":
|
|
output["action"] = "Preparing context from parent task"
|
|
output["parent_description"] = context.parent_description
|
|
output["context_keys"] = list(context.parent_context.keys())
|
|
|
|
elif phase_name == "RECEIVED":
|
|
output["action"] = "Received and registered sub-agent"
|
|
output["sub_agent_id"] = context.sub_agent_id
|
|
output["created_at"] = context.created_at
|
|
|
|
elif phase_name == "PREDICTING":
|
|
output["action"] = "Predicting sub-agent requirements"
|
|
output["parent_tags"] = context.parent_tags
|
|
output["metadata_available"] = bool(context.parent_metadata)
|
|
|
|
elif phase_name == "ANALYZING":
|
|
output["action"] = "Analyzing parent task context"
|
|
output["parent_tags_count"] = len(context.parent_tags)
|
|
output["coordination_needed"] = len(context.sibling_agents) > 0
|
|
|
|
elif phase_name == "CONSENSUS_CHECK":
|
|
output["action"] = "Checking consensus with siblings"
|
|
output["sibling_agents"] = list(context.sibling_agents)
|
|
output["messages_sent"] = len(context.coordination_messages)
|
|
|
|
elif phase_name == "AWAITING_APPROVAL":
|
|
output["action"] = "Awaiting approval to proceed"
|
|
output["ready_to_execute"] = True
|
|
|
|
elif phase_name == "STRATEGIZING":
|
|
output["action"] = "Strategizing execution approach"
|
|
output["strategy"] = f"Execute sub-task within parent context"
|
|
|
|
elif phase_name == "EXECUTING":
|
|
output["action"] = "Executing sub-agent task"
|
|
output["execution_start"] = datetime.utcnow().isoformat()
|
|
|
|
elif phase_name == "LEARNING":
|
|
output["action"] = "Learning from execution"
|
|
output["parent_project"] = context.parent_project
|
|
output["completion_status"] = "ready"
|
|
|
|
return output
|
|
|
|
def get_sub_agent_progress(self, sub_agent_id: str) -> Dict[str, Any]:
|
|
"""Get progress report for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
Progress information
|
|
"""
|
|
context = self.context_manager.get_sub_agent_context(sub_agent_id)
|
|
if not context:
|
|
return {"error": f"Context not found for {sub_agent_id}"}
|
|
|
|
phases = context.phase_progression
|
|
completed_phases = [p for p in phases if p.status == "completed"]
|
|
in_progress_phases = [p for p in phases if p.status == "in_progress"]
|
|
failed_phases = [p for p in phases if p.status == "failed"]
|
|
|
|
current_phase = self.context_manager.get_current_phase(sub_agent_id)
|
|
total_duration = sum(p.duration_seconds or 0 for p in completed_phases)
|
|
|
|
return {
|
|
"sub_agent_id": sub_agent_id,
|
|
"total_phases": len(phases),
|
|
"completed_phases": len(completed_phases),
|
|
"in_progress_phases": len(in_progress_phases),
|
|
"failed_phases": len(failed_phases),
|
|
"current_phase": current_phase,
|
|
"progress_percentage": (len(completed_phases) / len(phases)) * 100 if phases else 0,
|
|
"total_duration_seconds": total_duration,
|
|
"phase_details": [
|
|
{
|
|
"name": p.phase_name,
|
|
"status": p.status,
|
|
"duration": p.duration_seconds,
|
|
}
|
|
for p in phases
|
|
],
|
|
}
|
|
|
|
def coordinate_sub_agents(
|
|
self,
|
|
parent_task_id: str,
|
|
coordination_strategy: str = "sequential",
|
|
) -> Dict[str, Any]:
|
|
"""Coordinate execution of multiple sub-agents for a parent task
|
|
|
|
Args:
|
|
parent_task_id: ID of parent task
|
|
coordination_strategy: 'sequential', 'parallel', or 'dependency-based'
|
|
|
|
Returns:
|
|
Coordination results
|
|
"""
|
|
sub_agent_ids = self.context_manager.get_sub_agents_for_parent(parent_task_id)
|
|
|
|
if not sub_agent_ids:
|
|
return {"status": "no_sub_agents", "parent_task_id": parent_task_id}
|
|
|
|
results = {
|
|
"parent_task_id": parent_task_id,
|
|
"strategy": coordination_strategy,
|
|
"sub_agents": sub_agent_ids,
|
|
"coordination_details": [],
|
|
}
|
|
|
|
if coordination_strategy == "sequential":
|
|
# Execute sub-agents sequentially
|
|
for i, sub_agent_id in enumerate(sub_agent_ids):
|
|
context = self.context_manager.get_sub_agent_context(sub_agent_id)
|
|
results["coordination_details"].append(
|
|
{
|
|
"order": i + 1,
|
|
"sub_agent_id": sub_agent_id,
|
|
"siblings_count": len(context.sibling_agents) if context else 0,
|
|
"strategy": "Execute after previous sub-agent completes",
|
|
}
|
|
)
|
|
|
|
elif coordination_strategy == "parallel":
|
|
# Mark all sub-agents for parallel execution
|
|
for sub_agent_id in sub_agent_ids:
|
|
results["coordination_details"].append(
|
|
{
|
|
"sub_agent_id": sub_agent_id,
|
|
"strategy": "Execute simultaneously with other sub-agents",
|
|
}
|
|
)
|
|
|
|
elif coordination_strategy == "dependency-based":
|
|
# Analyze sibling relationships for dependency-based execution
|
|
for sub_agent_id in sub_agent_ids:
|
|
siblings = self.context_manager.get_sibling_agents(sub_agent_id)
|
|
results["coordination_details"].append(
|
|
{
|
|
"sub_agent_id": sub_agent_id,
|
|
"depends_on": list(siblings),
|
|
"strategy": "Execute considering dependencies on siblings",
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
def collect_sub_agent_results(
|
|
self, parent_task_id: str
|
|
) -> Dict[str, Any]:
|
|
"""Collect and aggregate results from all sub-agents
|
|
|
|
Args:
|
|
parent_task_id: ID of parent task
|
|
|
|
Returns:
|
|
Aggregated results
|
|
"""
|
|
sub_agent_ids = self.context_manager.get_sub_agents_for_parent(parent_task_id)
|
|
results = {
|
|
"parent_task_id": parent_task_id,
|
|
"sub_agents_total": len(sub_agent_ids),
|
|
"sub_agents": [],
|
|
}
|
|
|
|
for sub_agent_id in sub_agent_ids:
|
|
progress = self.get_sub_agent_progress(sub_agent_id)
|
|
summary = self.context_manager.get_context_summary(sub_agent_id)
|
|
results["sub_agents"].append(
|
|
{
|
|
"sub_agent_id": sub_agent_id,
|
|
"progress": progress,
|
|
"summary": summary,
|
|
}
|
|
)
|
|
|
|
# Aggregate status
|
|
all_completed = all(
|
|
p.get("completed_phases") == p.get("total_phases")
|
|
for p in [s["progress"] for s in results["sub_agents"]]
|
|
)
|
|
results["all_sub_agents_complete"] = all_completed
|
|
|
|
return results
|