Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
446 lines
15 KiB
Python
446 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sub-Agent Context Management - Intelligent task context propagation
|
|
|
|
Features:
|
|
1. Parent task context injection into sub-agents
|
|
2. Sub-agent discovery and sibling awareness
|
|
3. 9-phase flow execution for context understanding
|
|
4. Context preservation across execution boundaries
|
|
5. Sub-agent coordination and messaging
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Set, Tuple
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict, field
|
|
import hashlib
|
|
import uuid
|
|
|
|
|
|
@dataclass
|
|
class FlowPhase:
|
|
"""A single phase of the 9-phase unified flow"""
|
|
phase_name: str # CONTEXT_PREP, RECEIVED, PREDICTING, ANALYZING, CONSENSUS_CHECK, etc.
|
|
status: str # pending, in_progress, completed, failed
|
|
description: str = ""
|
|
output: Optional[str] = None
|
|
error: Optional[str] = None
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
duration_seconds: Optional[float] = None
|
|
|
|
|
|
@dataclass
|
|
class SubAgentContext:
|
|
"""Context passed from parent task to sub-agents"""
|
|
parent_task_id: str
|
|
parent_project: str
|
|
parent_description: str
|
|
sub_agent_id: str
|
|
created_at: str
|
|
parent_context: Dict[str, Any] = field(default_factory=dict)
|
|
parent_tags: List[str] = field(default_factory=list)
|
|
parent_metadata: Dict[str, Any] = field(default_factory=dict)
|
|
phase_progression: List[FlowPhase] = field(default_factory=list)
|
|
sibling_agents: Set[str] = field(default_factory=set)
|
|
coordination_messages: List[Dict[str, Any]] = field(default_factory=list)
|
|
|
|
|
|
class SubAgentContextManager:
|
|
"""Manages sub-agent context propagation and coordination"""
|
|
|
|
def __init__(self, context_dir: Optional[Path] = None):
|
|
"""Initialize sub-agent context manager
|
|
|
|
Args:
|
|
context_dir: Directory to store sub-agent context
|
|
"""
|
|
self.context_dir = context_dir or Path("/tmp/.luzia-sub-agents")
|
|
self.context_dir.mkdir(parents=True, exist_ok=True)
|
|
self.active_contexts: Dict[str, SubAgentContext] = {}
|
|
self.parent_tasks: Dict[str, List[str]] = {} # parent_id -> [sub_agent_ids]
|
|
self.sibling_graph: Dict[str, Set[str]] = {} # agent_id -> sibling_ids
|
|
self.load_contexts()
|
|
|
|
def create_sub_agent_context(
|
|
self,
|
|
parent_task_id: str,
|
|
parent_project: str,
|
|
parent_description: str,
|
|
parent_context: Optional[Dict[str, Any]] = None,
|
|
parent_tags: Optional[List[str]] = None,
|
|
parent_metadata: Optional[Dict[str, Any]] = None,
|
|
) -> SubAgentContext:
|
|
"""Create context for a new sub-agent
|
|
|
|
Args:
|
|
parent_task_id: ID of parent task
|
|
parent_project: Parent project name
|
|
parent_description: Parent task description
|
|
parent_context: Parent task context data
|
|
parent_tags: Tags from parent task
|
|
parent_metadata: Additional metadata from parent
|
|
|
|
Returns:
|
|
SubAgentContext for the new sub-agent
|
|
"""
|
|
sub_agent_id = str(uuid.uuid4())
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
context = SubAgentContext(
|
|
parent_task_id=parent_task_id,
|
|
parent_project=parent_project,
|
|
parent_description=parent_description,
|
|
sub_agent_id=sub_agent_id,
|
|
created_at=now,
|
|
parent_context=parent_context or {},
|
|
parent_tags=parent_tags or [],
|
|
parent_metadata=parent_metadata or {},
|
|
)
|
|
|
|
# Initialize 9-phase progression
|
|
phases = [
|
|
"CONTEXT_PREP",
|
|
"RECEIVED",
|
|
"PREDICTING",
|
|
"ANALYZING",
|
|
"CONSENSUS_CHECK",
|
|
"AWAITING_APPROVAL",
|
|
"STRATEGIZING",
|
|
"EXECUTING",
|
|
"LEARNING",
|
|
]
|
|
context.phase_progression = [
|
|
FlowPhase(phase_name=phase, status="pending") for phase in phases
|
|
]
|
|
|
|
# Register this sub-agent
|
|
self.active_contexts[sub_agent_id] = context
|
|
if parent_task_id not in self.parent_tasks:
|
|
self.parent_tasks[parent_task_id] = []
|
|
self.parent_tasks[parent_task_id].append(sub_agent_id)
|
|
|
|
# Discover siblings for this agent
|
|
if parent_task_id in self.parent_tasks:
|
|
siblings = set(self.parent_tasks[parent_task_id]) - {sub_agent_id}
|
|
context.sibling_agents = siblings
|
|
self.sibling_graph[sub_agent_id] = siblings
|
|
|
|
# Register reverse sibling relationship
|
|
for sibling_id in siblings:
|
|
if sibling_id in self.sibling_graph:
|
|
self.sibling_graph[sibling_id].add(sub_agent_id)
|
|
else:
|
|
self.sibling_graph[sibling_id] = {sub_agent_id}
|
|
|
|
self.save_context(context)
|
|
return context
|
|
|
|
def get_sub_agent_context(self, sub_agent_id: str) -> Optional[SubAgentContext]:
|
|
"""Retrieve context for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
SubAgentContext if found, None otherwise
|
|
"""
|
|
if sub_agent_id in self.active_contexts:
|
|
return self.active_contexts[sub_agent_id]
|
|
|
|
# Try loading from disk
|
|
context_file = self.context_dir / f"{sub_agent_id}.json"
|
|
if context_file.exists():
|
|
try:
|
|
data = json.loads(context_file.read_text())
|
|
context = self._dict_to_context(data)
|
|
self.active_contexts[sub_agent_id] = context
|
|
return context
|
|
except Exception as e:
|
|
print(f"[Error] Failed to load context for {sub_agent_id}: {e}")
|
|
return None
|
|
|
|
def update_phase(
|
|
self,
|
|
sub_agent_id: str,
|
|
phase_name: str,
|
|
status: str,
|
|
output: Optional[str] = None,
|
|
error: Optional[str] = None,
|
|
) -> bool:
|
|
"""Update phase status for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
phase_name: Name of phase to update
|
|
status: New status (pending, in_progress, completed, failed)
|
|
output: Phase output/results
|
|
error: Error message if failed
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
if not context:
|
|
return False
|
|
|
|
for phase in context.phase_progression:
|
|
if phase.phase_name == phase_name:
|
|
phase.status = status
|
|
phase.output = output
|
|
phase.error = error
|
|
if status == "in_progress":
|
|
phase.started_at = datetime.utcnow().isoformat()
|
|
elif status in ["completed", "failed"]:
|
|
phase.completed_at = datetime.utcnow().isoformat()
|
|
if phase.started_at:
|
|
start = datetime.fromisoformat(phase.started_at)
|
|
end = datetime.fromisoformat(phase.completed_at)
|
|
phase.duration_seconds = (end - start).total_seconds()
|
|
break
|
|
|
|
self.save_context(context)
|
|
return True
|
|
|
|
def get_current_phase(self, sub_agent_id: str) -> Optional[str]:
|
|
"""Get current active phase for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
Current phase name or None
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
if not context:
|
|
return None
|
|
|
|
# Return first in_progress phase, or first pending if none in progress
|
|
for phase in context.phase_progression:
|
|
if phase.status == "in_progress":
|
|
return phase.phase_name
|
|
|
|
for phase in context.phase_progression:
|
|
if phase.status == "pending":
|
|
return phase.phase_name
|
|
|
|
return None
|
|
|
|
def get_phase_progression(self, sub_agent_id: str) -> List[FlowPhase]:
|
|
"""Get full phase progression for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
List of FlowPhase objects
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
return context.phase_progression if context else []
|
|
|
|
def send_message_to_sibling(
|
|
self,
|
|
from_agent_id: str,
|
|
to_agent_id: str,
|
|
message_type: str,
|
|
content: Dict[str, Any],
|
|
) -> bool:
|
|
"""Send coordination message to sibling agent
|
|
|
|
Args:
|
|
from_agent_id: Sending sub-agent
|
|
to_agent_id: Receiving sub-agent
|
|
message_type: Type of message (request, update, result, etc.)
|
|
content: Message content
|
|
|
|
Returns:
|
|
True if sent successfully, False otherwise
|
|
"""
|
|
context = self.get_sub_agent_context(from_agent_id)
|
|
if not context or to_agent_id not in context.sibling_agents:
|
|
return False
|
|
|
|
message = {
|
|
"from": from_agent_id,
|
|
"to": to_agent_id,
|
|
"type": message_type,
|
|
"content": content,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
context.coordination_messages.append(message)
|
|
self.save_context(context)
|
|
|
|
# Also add to recipient's message log for visibility
|
|
recipient_context = self.get_sub_agent_context(to_agent_id)
|
|
if recipient_context:
|
|
recipient_context.coordination_messages.append(message)
|
|
self.save_context(recipient_context)
|
|
|
|
return True
|
|
|
|
def get_sibling_agents(self, sub_agent_id: str) -> Set[str]:
|
|
"""Get all sibling agents for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
Set of sibling agent IDs
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
return context.sibling_agents if context else set()
|
|
|
|
def get_parent_task_info(
|
|
self, sub_agent_id: str
|
|
) -> Optional[Tuple[str, str, str]]:
|
|
"""Get parent task information for a sub-agent
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
Tuple of (parent_task_id, parent_project, parent_description) or None
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
if context:
|
|
return (context.parent_task_id, context.parent_project, context.parent_description)
|
|
return None
|
|
|
|
def get_sub_agents_for_parent(self, parent_task_id: str) -> List[str]:
|
|
"""Get all sub-agents for a parent task
|
|
|
|
Args:
|
|
parent_task_id: ID of parent task
|
|
|
|
Returns:
|
|
List of sub-agent IDs
|
|
"""
|
|
return self.parent_tasks.get(parent_task_id, [])
|
|
|
|
def mark_sub_agent_complete(
|
|
self, sub_agent_id: str, final_result: Optional[str] = None
|
|
) -> bool:
|
|
"""Mark sub-agent as complete after all phases
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
final_result: Final result/output
|
|
|
|
Returns:
|
|
True if marked successfully
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
if not context:
|
|
return False
|
|
|
|
# Mark all phases as completed
|
|
for phase in context.phase_progression:
|
|
if phase.status in ["pending", "in_progress"]:
|
|
phase.status = "completed"
|
|
|
|
self.save_context(context)
|
|
return True
|
|
|
|
def save_context(self, context: SubAgentContext) -> None:
|
|
"""Save context to disk
|
|
|
|
Args:
|
|
context: SubAgentContext to save
|
|
"""
|
|
context_file = self.context_dir / f"{context.sub_agent_id}.json"
|
|
try:
|
|
data = {
|
|
**asdict(context),
|
|
"sibling_agents": list(context.sibling_agents),
|
|
"phase_progression": [asdict(p) for p in context.phase_progression],
|
|
}
|
|
context_file.write_text(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
print(f"[Error] Failed to save context for {context.sub_agent_id}: {e}")
|
|
|
|
def load_contexts(self) -> None:
|
|
"""Load all contexts from disk"""
|
|
if self.context_dir.exists():
|
|
for context_file in self.context_dir.glob("*.json"):
|
|
try:
|
|
data = json.loads(context_file.read_text())
|
|
context = self._dict_to_context(data)
|
|
self.active_contexts[context.sub_agent_id] = context
|
|
|
|
# Rebuild parent task registry
|
|
parent_id = context.parent_task_id
|
|
if parent_id not in self.parent_tasks:
|
|
self.parent_tasks[parent_id] = []
|
|
if context.sub_agent_id not in self.parent_tasks[parent_id]:
|
|
self.parent_tasks[parent_id].append(context.sub_agent_id)
|
|
|
|
# Rebuild sibling graph
|
|
self.sibling_graph[context.sub_agent_id] = context.sibling_agents
|
|
except Exception as e:
|
|
print(f"[Error] Failed to load context {context_file}: {e}")
|
|
|
|
def _dict_to_context(self, data: Dict) -> SubAgentContext:
|
|
"""Convert dict to SubAgentContext"""
|
|
phases = [
|
|
FlowPhase(
|
|
phase_name=p.get("phase_name", ""),
|
|
status=p.get("status", "pending"),
|
|
description=p.get("description", ""),
|
|
output=p.get("output"),
|
|
error=p.get("error"),
|
|
started_at=p.get("started_at"),
|
|
completed_at=p.get("completed_at"),
|
|
duration_seconds=p.get("duration_seconds"),
|
|
)
|
|
for p in data.get("phase_progression", [])
|
|
]
|
|
|
|
return SubAgentContext(
|
|
parent_task_id=data.get("parent_task_id", ""),
|
|
parent_project=data.get("parent_project", ""),
|
|
parent_description=data.get("parent_description", ""),
|
|
sub_agent_id=data.get("sub_agent_id", ""),
|
|
created_at=data.get("created_at", ""),
|
|
parent_context=data.get("parent_context", {}),
|
|
parent_tags=data.get("parent_tags", []),
|
|
parent_metadata=data.get("parent_metadata", {}),
|
|
phase_progression=phases,
|
|
sibling_agents=set(data.get("sibling_agents", [])),
|
|
coordination_messages=data.get("coordination_messages", []),
|
|
)
|
|
|
|
def get_context_summary(self, sub_agent_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get human-readable summary of sub-agent context
|
|
|
|
Args:
|
|
sub_agent_id: ID of sub-agent
|
|
|
|
Returns:
|
|
Summary dict or None
|
|
"""
|
|
context = self.get_sub_agent_context(sub_agent_id)
|
|
if not context:
|
|
return None
|
|
|
|
phase_statuses = [
|
|
{"phase": p.phase_name, "status": p.status, "duration": p.duration_seconds}
|
|
for p in context.phase_progression
|
|
]
|
|
|
|
return {
|
|
"sub_agent_id": sub_agent_id,
|
|
"parent_task_id": context.parent_task_id,
|
|
"parent_project": context.parent_project,
|
|
"parent_description": context.parent_description,
|
|
"created_at": context.created_at,
|
|
"sibling_count": len(context.sibling_agents),
|
|
"siblings": list(context.sibling_agents),
|
|
"message_count": len(context.coordination_messages),
|
|
"phase_progression": phase_statuses,
|
|
"parent_context_keys": list(context.parent_context.keys()),
|
|
"parent_tags": context.parent_tags,
|
|
}
|