- New CockpitQueueDispatcher: per-project serialized task queues - LoadMonitor: checks system load/memory before dispatching - Parallel execution across projects with round-robin fairness - CLI commands: cockpit queue, cockpit dispatch Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1280 lines
42 KiB
Python
1280 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Luzia Cockpit - Human-in-the-Loop Claude Sessions
|
|
|
|
Provides Docker container management for pausable Claude agent sessions.
|
|
Key features:
|
|
- Docker stop/start freezes/resumes entire session state
|
|
- Claude sessions persist via --session-id and --resume
|
|
- tmux for human attachment when needed
|
|
- Multi-turn conversation support
|
|
|
|
Architecture:
|
|
Imports TmuxCLIController from claude-code-tools-local package and extends
|
|
it with DockerTmuxAdapter for Docker-containerized tmux sessions.
|
|
This enables importing upstream improvements while adding Docker support.
|
|
|
|
Usage:
|
|
luzia cockpit start <project> Start cockpit container
|
|
luzia cockpit stop <project> Stop (freeze) cockpit
|
|
luzia cockpit send <project> <msg> Send message to cockpit
|
|
luzia cockpit respond <project> <answer> Respond to pending question
|
|
luzia cockpit status [project] Show cockpit status
|
|
luzia cockpit output <project> Get recent output
|
|
luzia cockpit attach <project> Attach to tmux session
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import uuid
|
|
import time
|
|
import hashlib
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Tuple, List
|
|
|
|
# Import TmuxCLIController from installed claude-code-tools-local package
|
|
from claude_code_tools.tmux_cli_controller import TmuxCLIController
|
|
|
|
|
|
class DockerTmuxAdapter(TmuxCLIController):
|
|
"""
|
|
Docker adapter for TmuxCLIController.
|
|
|
|
Extends TmuxCLIController to execute tmux commands inside a Docker container
|
|
rather than on the host system. This enables:
|
|
- Inheriting all TmuxCLIController methods automatically
|
|
- Pulling upstream improvements from claude-code-tools
|
|
- Adding Docker-specific functionality
|
|
|
|
The key override is _run_tmux_command() which prepends 'docker exec' to all
|
|
tmux commands.
|
|
"""
|
|
|
|
def __init__(self, container_name: str, tmux_session: str = "agent", tmux_window: str = "main"):
|
|
"""
|
|
Initialize adapter for a Docker container's tmux session.
|
|
|
|
Args:
|
|
container_name: Docker container name
|
|
tmux_session: tmux session name inside container (default: agent)
|
|
tmux_window: tmux window name (default: main)
|
|
"""
|
|
super().__init__(session_name=tmux_session, window_name=tmux_window)
|
|
self.container_name = container_name
|
|
self.tmux_session = tmux_session
|
|
self.tmux_window = tmux_window
|
|
self.target = f"{tmux_session}:{tmux_window}"
|
|
|
|
def _run_tmux_command(self, command: List[str]) -> Tuple[str, int]:
|
|
"""
|
|
Override: Run tmux command inside the Docker container.
|
|
|
|
This is the key override that makes all inherited TmuxCLIController
|
|
methods work inside Docker. Instead of running 'tmux <cmd>', we run
|
|
'docker exec <container> tmux <cmd>'.
|
|
|
|
Args:
|
|
command: List of command components (without 'tmux' prefix)
|
|
|
|
Returns:
|
|
Tuple of (output, exit_code)
|
|
"""
|
|
if not self.is_container_running():
|
|
return "", 1
|
|
|
|
cmd = ["docker", "exec", self.container_name, "tmux"] + command
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return result.stdout.strip(), result.returncode
|
|
|
|
def is_container_running(self) -> bool:
|
|
"""Check if the Docker container is running."""
|
|
result = subprocess.run(
|
|
["docker", "ps", "--filter", f"name={self.container_name}", "--format", "{{.Names}}"],
|
|
capture_output=True, text=True
|
|
)
|
|
return self.container_name in result.stdout
|
|
|
|
def send_keys(self, text: str, enter: bool = True, delay_enter: float = 0.5) -> bool:
|
|
"""
|
|
Send keystrokes to the tmux pane.
|
|
|
|
Args:
|
|
text: Text to send
|
|
enter: Whether to press Enter after text
|
|
delay_enter: Delay in seconds before pressing Enter
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
if not self.is_container_running():
|
|
return False
|
|
|
|
# Send text first
|
|
_, code = self._run_tmux_command(["send-keys", "-t", self.target, text])
|
|
if code != 0:
|
|
return False
|
|
|
|
if enter:
|
|
if delay_enter > 0:
|
|
time.sleep(delay_enter)
|
|
self._run_tmux_command(["send-keys", "-t", self.target, "Enter"])
|
|
|
|
return True
|
|
|
|
def capture_pane(self, lines: int = 200) -> str:
|
|
"""
|
|
Capture output from the tmux pane.
|
|
|
|
Args:
|
|
lines: Number of lines to capture from scrollback
|
|
|
|
Returns:
|
|
Captured text content
|
|
"""
|
|
if not self.is_container_running():
|
|
return ""
|
|
|
|
output, code = self._run_tmux_command([
|
|
"capture-pane", "-t", self.target, "-p", "-S", f"-{lines}"
|
|
])
|
|
return output if code == 0 else ""
|
|
|
|
def wait_for_prompt(self, prompt_pattern: str, timeout: int = 60,
|
|
check_interval: float = 1.0) -> bool:
|
|
"""
|
|
Wait for a specific prompt pattern to appear.
|
|
|
|
Args:
|
|
prompt_pattern: Regex pattern to match
|
|
timeout: Maximum seconds to wait
|
|
check_interval: Seconds between checks
|
|
|
|
Returns:
|
|
True if pattern found, False on timeout
|
|
"""
|
|
pattern = re.compile(prompt_pattern)
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
content = self.capture_pane(lines=50)
|
|
if pattern.search(content):
|
|
return True
|
|
time.sleep(check_interval)
|
|
|
|
return False
|
|
|
|
def wait_for_idle(self, idle_time: float = 3.0, check_interval: float = 1.0,
|
|
timeout: int = 600) -> Tuple[bool, str]:
|
|
"""
|
|
Wait for pane output to stabilize (no changes for idle_time seconds).
|
|
|
|
Args:
|
|
idle_time: Seconds of no change to consider idle
|
|
check_interval: Seconds between checks
|
|
timeout: Maximum seconds to wait
|
|
|
|
Returns:
|
|
Tuple of (is_idle, final_content)
|
|
"""
|
|
start_time = time.time()
|
|
last_change_time = time.time()
|
|
last_hash = ""
|
|
final_content = ""
|
|
|
|
while time.time() - start_time < timeout:
|
|
content = self.capture_pane()
|
|
content_hash = hashlib.md5(content.encode()).hexdigest()
|
|
|
|
if content_hash != last_hash:
|
|
last_hash = content_hash
|
|
last_change_time = time.time()
|
|
final_content = content
|
|
elif time.time() - last_change_time >= idle_time:
|
|
return True, final_content
|
|
|
|
time.sleep(check_interval)
|
|
|
|
return False, final_content
|
|
|
|
def wait_for_shell_prompt(self, timeout: int = 600) -> Tuple[bool, str]:
|
|
"""
|
|
Wait for shell prompt to appear, indicating command completion.
|
|
|
|
Detects common shell prompts like:
|
|
- root@hostname:/path#
|
|
- user@hostname:/path$
|
|
|
|
Args:
|
|
timeout: Maximum seconds to wait
|
|
|
|
Returns:
|
|
Tuple of (found_prompt, final_content)
|
|
"""
|
|
# Pattern matches common shell prompts
|
|
shell_pattern = r"(root|[\w-]+)@[\w-]+:.*[#$]\s*$"
|
|
start_time = time.time()
|
|
last_content = ""
|
|
|
|
while time.time() - start_time < timeout:
|
|
content = self.capture_pane(lines=50)
|
|
lines = content.strip().split("\n")
|
|
|
|
# Check last few lines for shell prompt
|
|
for line in reversed(lines[-5:]):
|
|
line = line.strip()
|
|
if re.match(shell_pattern, line):
|
|
return True, content
|
|
|
|
last_content = content
|
|
time.sleep(1)
|
|
|
|
return False, last_content
|
|
|
|
def extract_response(self, full_output: str, command_marker: str,
|
|
session_id: str = None) -> str:
|
|
"""
|
|
Extract Claude's response from tmux output.
|
|
|
|
Filters out:
|
|
- The command line itself
|
|
- Shell prompts
|
|
- Session ID continuation lines
|
|
|
|
Args:
|
|
full_output: Raw captured output
|
|
command_marker: Marker to identify command line (e.g., "claude --print")
|
|
session_id: Claude session ID to filter
|
|
|
|
Returns:
|
|
Clean response text
|
|
"""
|
|
lines = full_output.strip().split("\n")
|
|
response_lines = []
|
|
in_response = False
|
|
shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$")
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
|
|
# Skip empty lines before we start
|
|
if not in_response and not stripped:
|
|
continue
|
|
|
|
# Start capturing after command marker
|
|
if command_marker in line:
|
|
in_response = True
|
|
continue
|
|
|
|
# Skip session ID lines (continuation of wrapped command)
|
|
if in_response and session_id and session_id in line:
|
|
continue
|
|
|
|
# Stop at shell prompt
|
|
if in_response and shell_pattern.match(stripped):
|
|
break
|
|
|
|
# Capture response content
|
|
if in_response:
|
|
response_lines.append(line.rstrip())
|
|
|
|
return "\n".join(response_lines).strip()
|
|
|
|
def send_interrupt(self) -> bool:
|
|
"""Send Ctrl+C to interrupt running command."""
|
|
if not self.is_container_running():
|
|
return False
|
|
_, code = self._run_tmux_command(["send-keys", "-t", self.target, "C-c"])
|
|
return code == 0
|
|
|
|
def clear_pane(self) -> bool:
|
|
"""Clear the pane screen."""
|
|
if not self.is_container_running():
|
|
return False
|
|
_, code = self._run_tmux_command(["send-keys", "-t", self.target, "C-l"])
|
|
return code == 0
|
|
|
|
|
|
# Backward compatibility alias
|
|
DockerTmuxController = DockerTmuxAdapter
|
|
|
|
# Constants
|
|
COCKPIT_IMAGE = "luzia-cockpit:latest"
|
|
COCKPIT_PREFIX = "luzia-cockpit-"
|
|
COCKPIT_STATE_DIR = Path("/var/lib/luz-orchestrator/cockpits")
|
|
|
|
# Ensure state directory exists
|
|
COCKPIT_STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def get_container_name(project: str) -> str:
|
|
"""Get cockpit container name for a project."""
|
|
return f"{COCKPIT_PREFIX}{project}"
|
|
|
|
|
|
def get_state_file(project: str) -> Path:
|
|
"""Get state file path for a project's cockpit."""
|
|
return COCKPIT_STATE_DIR / f"{project}.json"
|
|
|
|
|
|
def load_state(project: str) -> Dict:
|
|
"""Load cockpit state for a project."""
|
|
state_file = get_state_file(project)
|
|
if state_file.exists():
|
|
return json.loads(state_file.read_text())
|
|
return {
|
|
"project": project,
|
|
"session_id": None,
|
|
"status": "not_started",
|
|
"last_output": None,
|
|
"awaiting_response": False,
|
|
"last_question": None,
|
|
}
|
|
|
|
|
|
def save_state(project: str, state: Dict) -> None:
|
|
"""Save cockpit state for a project."""
|
|
state_file = get_state_file(project)
|
|
state_file.write_text(json.dumps(state, indent=2))
|
|
|
|
|
|
def container_exists(project: str) -> bool:
|
|
"""Check if cockpit container exists."""
|
|
result = subprocess.run(
|
|
["docker", "ps", "-a", "--filter", f"name={get_container_name(project)}", "--format", "{{.Names}}"],
|
|
capture_output=True, text=True
|
|
)
|
|
return get_container_name(project) in result.stdout
|
|
|
|
|
|
def container_running(project: str) -> bool:
|
|
"""Check if cockpit container is running."""
|
|
result = subprocess.run(
|
|
["docker", "ps", "--filter", f"name={get_container_name(project)}", "--format", "{{.Names}}"],
|
|
capture_output=True, text=True
|
|
)
|
|
return get_container_name(project) in result.stdout
|
|
|
|
|
|
def cockpit_start(project: str, config: dict) -> Dict:
|
|
"""
|
|
Start or resume a cockpit container for a project.
|
|
|
|
Returns: {"success": bool, "message": str, "container": str, "session_id": str}
|
|
"""
|
|
container_name = get_container_name(project)
|
|
state = load_state(project)
|
|
|
|
# Check if project exists
|
|
projects = config.get("projects", {})
|
|
if project not in projects and project != "admin":
|
|
return {"success": False, "message": f"Unknown project: {project}"}
|
|
|
|
# Get project home directory
|
|
if project == "admin":
|
|
home_dir = "/home/admin"
|
|
else:
|
|
home_dir = projects[project].get("home", f"/home/{project}")
|
|
|
|
# If container exists but stopped, restart it
|
|
if container_exists(project) and not container_running(project):
|
|
result = subprocess.run(["docker", "start", container_name], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
state["status"] = "running"
|
|
save_state(project, state)
|
|
return {
|
|
"success": True,
|
|
"message": f"Resumed cockpit for {project}",
|
|
"container": container_name,
|
|
"session_id": state.get("session_id")
|
|
}
|
|
return {"success": False, "message": f"Failed to resume: {result.stderr}"}
|
|
|
|
# If container is running, return info
|
|
if container_running(project):
|
|
return {
|
|
"success": True,
|
|
"message": f"Cockpit already running for {project}",
|
|
"container": container_name,
|
|
"session_id": state.get("session_id")
|
|
}
|
|
|
|
# Create new container
|
|
# Mount project workspace and Claude credentials
|
|
cmd = [
|
|
"docker", "run", "-d",
|
|
"--name", container_name,
|
|
"-v", f"{home_dir}:/workspace",
|
|
"-v", "/home/admin/.claude:/root/.claude", # Claude credentials
|
|
"-v", f"{COCKPIT_STATE_DIR}:/var/cockpit", # State persistence
|
|
COCKPIT_IMAGE
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return {"success": False, "message": f"Failed to start: {result.stderr}"}
|
|
|
|
# Initialize state
|
|
state = {
|
|
"project": project,
|
|
"session_id": str(uuid.uuid4()),
|
|
"status": "running",
|
|
"session_started": False, # True after first message sent
|
|
"last_output": None,
|
|
"awaiting_response": False,
|
|
"last_question": None,
|
|
}
|
|
save_state(project, state)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Started cockpit for {project}",
|
|
"container": container_name,
|
|
"session_id": state["session_id"]
|
|
}
|
|
|
|
|
|
def cockpit_stop(project: str) -> Dict:
|
|
"""
|
|
Stop (freeze) a cockpit container.
|
|
|
|
Returns: {"success": bool, "message": str}
|
|
"""
|
|
container_name = get_container_name(project)
|
|
|
|
if not container_exists(project):
|
|
return {"success": False, "message": f"No cockpit found for {project}"}
|
|
|
|
if not container_running(project):
|
|
return {"success": True, "message": f"Cockpit already stopped for {project}"}
|
|
|
|
result = subprocess.run(["docker", "stop", container_name], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
state = load_state(project)
|
|
state["status"] = "stopped"
|
|
save_state(project, state)
|
|
return {"success": True, "message": f"Stopped cockpit for {project}"}
|
|
|
|
return {"success": False, "message": f"Failed to stop: {result.stderr}"}
|
|
|
|
|
|
def cockpit_remove(project: str) -> Dict:
|
|
"""
|
|
Remove a cockpit container completely.
|
|
|
|
Returns: {"success": bool, "message": str}
|
|
"""
|
|
container_name = get_container_name(project)
|
|
|
|
if not container_exists(project):
|
|
return {"success": False, "message": f"No cockpit found for {project}"}
|
|
|
|
# Force remove
|
|
result = subprocess.run(["docker", "rm", "-f", container_name], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
# Clean up state file
|
|
state_file = get_state_file(project)
|
|
if state_file.exists():
|
|
state_file.unlink()
|
|
return {"success": True, "message": f"Removed cockpit for {project}"}
|
|
|
|
return {"success": False, "message": f"Failed to remove: {result.stderr}"}
|
|
|
|
|
|
def cockpit_send(project: str, message: str, is_response: bool = False) -> Dict:
|
|
"""
|
|
Send a message to the cockpit Claude session.
|
|
|
|
Uses DockerTmuxController for robust tmux interaction.
|
|
|
|
Args:
|
|
project: Project name
|
|
message: Message to send
|
|
is_response: If True, this is responding to a previous question (use --resume)
|
|
|
|
Returns: {"success": bool, "message": str, "output": str, "awaiting_response": bool}
|
|
"""
|
|
container_name = get_container_name(project)
|
|
state = load_state(project)
|
|
|
|
if not container_running(project):
|
|
return {"success": False, "message": f"Cockpit not running for {project}. Run 'luzia cockpit start {project}' first."}
|
|
|
|
# Initialize DockerTmuxController
|
|
controller = DockerTmuxController(container_name)
|
|
|
|
session_id = state.get("session_id")
|
|
if not session_id:
|
|
session_id = str(uuid.uuid4())
|
|
state["session_id"] = session_id
|
|
|
|
# Build Claude command
|
|
# Use --session-id for first message, --resume for all subsequent messages
|
|
# Escape single quotes in message
|
|
escaped_message = message.replace("'", "'\\''")
|
|
|
|
if state.get("session_started", False):
|
|
# Continue existing session
|
|
claude_cmd = f"echo '{escaped_message}' | claude --print -p --resume {session_id}"
|
|
else:
|
|
# Start new session
|
|
claude_cmd = f"echo '{escaped_message}' | claude --print -p --session-id {session_id}"
|
|
state["session_started"] = True
|
|
save_state(project, state)
|
|
|
|
# If responding to a waiting state, notify queue system to unblock
|
|
if is_response and state.get("awaiting_response"):
|
|
try:
|
|
from task_completion import resume_from_human
|
|
task_id = state.get("task_id", f"cockpit-{project}")
|
|
resume_from_human(task_id, message, project)
|
|
except ImportError:
|
|
pass # task_completion not available
|
|
# Clear awaiting state locally
|
|
state["awaiting_response"] = False
|
|
state["last_question"] = None
|
|
save_state(project, state)
|
|
|
|
# Capture output before sending for comparison
|
|
pre_output = controller.capture_pane()
|
|
|
|
# Send command using controller
|
|
if not controller.send_keys(claude_cmd, enter=True, delay_enter=0.5):
|
|
return {"success": False, "message": "Failed to send command to tmux"}
|
|
|
|
# Wait for shell prompt to return (indicates completion)
|
|
found_prompt, raw_output = controller.wait_for_shell_prompt(timeout=60)
|
|
|
|
# Extract clean response
|
|
response = controller.extract_response(
|
|
raw_output,
|
|
command_marker="claude --print",
|
|
session_id=session_id
|
|
)
|
|
|
|
# Detect if Claude is asking a question
|
|
awaiting = False
|
|
question = None
|
|
if response:
|
|
response_lines = [l.strip() for l in response.split("\n") if l.strip()]
|
|
if response_lines and response_lines[-1].endswith("?"):
|
|
awaiting = True
|
|
question = response_lines[-1]
|
|
|
|
state["last_output"] = response
|
|
state["awaiting_response"] = awaiting
|
|
state["last_question"] = question
|
|
save_state(project, state)
|
|
|
|
# If awaiting human response, notify queue system to block project
|
|
if awaiting:
|
|
try:
|
|
from task_completion import set_awaiting_human
|
|
task_id = state.get("task_id", f"cockpit-{project}")
|
|
set_awaiting_human(task_id, question, project)
|
|
except ImportError:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Message sent",
|
|
"output": response,
|
|
"awaiting_response": awaiting,
|
|
"question": question
|
|
}
|
|
|
|
|
|
def cockpit_output(project: str) -> Dict:
|
|
"""
|
|
Get recent output from the cockpit tmux session.
|
|
|
|
Uses DockerTmuxController for clean capture.
|
|
|
|
Returns: {"success": bool, "output": str}
|
|
"""
|
|
container_name = get_container_name(project)
|
|
|
|
if not container_running(project):
|
|
return {"success": False, "output": "", "message": "Cockpit not running"}
|
|
|
|
# Use the DockerTmuxController for consistent tmux interaction
|
|
controller = DockerTmuxController(container_name)
|
|
output = controller.capture_pane(lines=200)
|
|
|
|
if not output and controller.is_container_running():
|
|
# Container running but no output - might be empty pane
|
|
return {"success": True, "output": ""}
|
|
|
|
return {"success": True, "output": output}
|
|
|
|
|
|
def cockpit_status(project: Optional[str] = None) -> Dict:
|
|
"""
|
|
Get cockpit status for one or all projects.
|
|
|
|
Returns: {"success": bool, "cockpits": [{"project": str, "status": str, ...}]}
|
|
"""
|
|
# Try to load project knowledge loader for RAG status
|
|
knowledge_loader = None
|
|
try:
|
|
from project_knowledge_loader import ProjectKnowledgeLoader
|
|
knowledge_loader = ProjectKnowledgeLoader()
|
|
except ImportError:
|
|
pass
|
|
|
|
def get_has_knowledge(proj: str) -> bool:
|
|
"""Check if project has .knowledge/ directory."""
|
|
if knowledge_loader:
|
|
try:
|
|
return knowledge_loader.has_knowledge(proj)
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
if project:
|
|
state = load_state(project)
|
|
running = container_running(project)
|
|
exists = container_exists(project)
|
|
|
|
status = "running" if running else ("stopped" if exists else "not_started")
|
|
state["status"] = status
|
|
|
|
return {
|
|
"success": True,
|
|
"cockpits": [{
|
|
"project": project,
|
|
"status": status,
|
|
"container": get_container_name(project),
|
|
"session_id": state.get("session_id"),
|
|
"awaiting_response": state.get("awaiting_response", False),
|
|
"last_question": state.get("last_question"),
|
|
"has_knowledge": get_has_knowledge(project),
|
|
}]
|
|
}
|
|
|
|
# List all cockpits
|
|
cockpits = []
|
|
result = subprocess.run(
|
|
["docker", "ps", "-a", "--filter", f"name={COCKPIT_PREFIX}", "--format", "{{.Names}}\t{{.Status}}"],
|
|
capture_output=True, text=True
|
|
)
|
|
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line:
|
|
continue
|
|
parts = line.split("\t")
|
|
container_name = parts[0]
|
|
container_status = parts[1] if len(parts) > 1 else "unknown"
|
|
|
|
# Extract project name
|
|
proj = container_name.replace(COCKPIT_PREFIX, "")
|
|
state = load_state(proj)
|
|
|
|
running = "Up" in container_status
|
|
cockpits.append({
|
|
"project": proj,
|
|
"status": "running" if running else "stopped",
|
|
"container": container_name,
|
|
"docker_status": container_status,
|
|
"session_id": state.get("session_id"),
|
|
"awaiting_response": state.get("awaiting_response", False),
|
|
"has_knowledge": get_has_knowledge(proj),
|
|
})
|
|
|
|
return {"success": True, "cockpits": cockpits}
|
|
|
|
|
|
def cockpit_attach_cmd(project: str) -> str:
|
|
"""
|
|
Get the command to attach to a cockpit's tmux session.
|
|
|
|
Returns the docker exec command string.
|
|
"""
|
|
container_name = get_container_name(project)
|
|
return f"docker exec -it {container_name} tmux attach-session -t agent"
|
|
|
|
|
|
def cockpit_queue_task(project: str, task: str, context: str = "",
|
|
priority: str = "normal") -> Dict:
|
|
"""
|
|
Queue a task for background dispatch.
|
|
|
|
Tasks are queued per-project and dispatched serially within each project,
|
|
but in parallel across projects (with load awareness).
|
|
|
|
Args:
|
|
project: Target project name
|
|
task: Task description
|
|
context: Project context
|
|
priority: "high" or "normal"
|
|
|
|
Returns: {"success": bool, "task_id": str, "message": str}
|
|
"""
|
|
try:
|
|
from cockpit_queue_dispatcher import CockpitQueueDispatcher
|
|
import yaml
|
|
|
|
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
|
|
if config_path.exists():
|
|
config = yaml.safe_load(config_path.read_text())
|
|
else:
|
|
config = {"projects": {}}
|
|
|
|
dispatcher = CockpitQueueDispatcher(config)
|
|
task_id = dispatcher.enqueue_task(project, task, context, priority)
|
|
|
|
return {
|
|
"success": True,
|
|
"task_id": task_id,
|
|
"message": f"Task queued for {project}",
|
|
"queue_position": len(dispatcher.get_pending_tasks(project))
|
|
}
|
|
except ImportError:
|
|
return {"success": False, "message": "Queue dispatcher not available"}
|
|
except Exception as e:
|
|
return {"success": False, "message": str(e)}
|
|
|
|
|
|
def cockpit_queue_status() -> Dict:
|
|
"""
|
|
Get status of the task queue and dispatcher.
|
|
|
|
Returns: {"success": bool, "status": dict}
|
|
"""
|
|
try:
|
|
from cockpit_queue_dispatcher import CockpitQueueDispatcher
|
|
import yaml
|
|
|
|
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
|
|
if config_path.exists():
|
|
config = yaml.safe_load(config_path.read_text())
|
|
else:
|
|
config = {"projects": {}}
|
|
|
|
dispatcher = CockpitQueueDispatcher(config)
|
|
return {"success": True, "status": dispatcher.get_status()}
|
|
except ImportError:
|
|
return {"success": False, "message": "Queue dispatcher not available"}
|
|
except Exception as e:
|
|
return {"success": False, "message": str(e)}
|
|
|
|
|
|
def cockpit_dispatch_task(project: str, task: str, context: str, config: dict,
|
|
show_output: bool = True, timeout: int = 600) -> Dict:
|
|
"""
|
|
Dispatch a task to cockpit and stream output in real-time.
|
|
|
|
This is the main entry point for all task dispatch in luzia.
|
|
Uses DockerTmuxController for robust tmux interaction.
|
|
|
|
Args:
|
|
project: Project name
|
|
task: Task description
|
|
context: Project context string
|
|
config: Luzia config dict
|
|
show_output: If True, print output in real-time
|
|
timeout: Max seconds to wait for task completion
|
|
|
|
Returns: {"success": bool, "output": str, "awaiting_response": bool, "session_id": str}
|
|
"""
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# Generate task ID for tracking
|
|
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
|
|
|
|
# 1. Start cockpit if not running
|
|
start_result = cockpit_start(project, config)
|
|
if not start_result["success"]:
|
|
return {"success": False, "error": start_result["message"], "task_id": task_id}
|
|
|
|
session_id = start_result["session_id"]
|
|
container_name = start_result["container"]
|
|
|
|
# Initialize DockerTmuxController
|
|
controller = DockerTmuxController(container_name)
|
|
|
|
# Update state with task info
|
|
state = load_state(project)
|
|
state["task_id"] = task_id
|
|
state["current_task"] = task
|
|
state["task_started"] = datetime.now().isoformat()
|
|
save_state(project, state)
|
|
|
|
# 2. Build full prompt
|
|
project_config = config.get("projects", {}).get(project, {})
|
|
project_path = project_config.get("path", f"/home/{project}")
|
|
|
|
prompt = f"""You are a project agent working on the **{project}** project.
|
|
|
|
{context}
|
|
|
|
## Your Task
|
|
{task}
|
|
|
|
## Execution Environment
|
|
- Working directory: {project_path}
|
|
- You have FULL permission to read, write, and execute files
|
|
- Use standard Claude tools (Read, Write, Edit, Bash) directly
|
|
- All file operations are pre-authorized
|
|
|
|
## Guidelines
|
|
- Complete the task step by step
|
|
- If you need clarification, ask a clear question (ending with ?)
|
|
- If you encounter errors, debug and fix them
|
|
- Provide a summary when complete
|
|
|
|
## IMPORTANT: Human Collaboration
|
|
If you need human input at any point, ASK. The human is available.
|
|
End questions with ? to signal you're waiting for a response."""
|
|
|
|
# Build Claude command
|
|
if state.get("session_started", False):
|
|
claude_cmd = f"claude --print -p --resume {session_id}"
|
|
else:
|
|
claude_cmd = f"claude --print -p --session-id {session_id}"
|
|
state["session_started"] = True
|
|
save_state(project, state)
|
|
|
|
# Write prompt to temp file in container for safety (handles special chars)
|
|
write_prompt_cmd = f"cat > /tmp/task_prompt.txt << 'ENDOFPROMPT'\n{prompt}\nENDOFPROMPT"
|
|
subprocess.run(
|
|
["docker", "exec", container_name, "bash", "-c", write_prompt_cmd],
|
|
capture_output=True, text=True
|
|
)
|
|
|
|
# Record output before sending for comparison
|
|
pre_output = controller.capture_pane()
|
|
|
|
# Build and send command
|
|
exec_cmd = f"cat /tmp/task_prompt.txt | {claude_cmd}"
|
|
|
|
if show_output:
|
|
print(f"[cockpit:{project}:{task_id}] Task sent")
|
|
print("-" * 60)
|
|
|
|
if not controller.send_keys(exec_cmd, enter=True, delay_enter=0.5):
|
|
return {"success": False, "error": "Failed to send task to tmux", "task_id": task_id}
|
|
|
|
# 3. Stream output with proper completion detection
|
|
start_time = time.time()
|
|
final_output = ""
|
|
awaiting_response = False
|
|
timed_out = False
|
|
question = None
|
|
last_printed_len = 0
|
|
stable_count = 0
|
|
last_hash = ""
|
|
|
|
# Initial wait for Claude to start
|
|
time.sleep(2)
|
|
|
|
while time.time() - start_time < timeout:
|
|
time.sleep(1)
|
|
|
|
# Capture current output
|
|
raw_output = controller.capture_pane()
|
|
|
|
# Extract Claude response
|
|
response = controller.extract_response(
|
|
raw_output,
|
|
command_marker="claude --print",
|
|
session_id=session_id
|
|
)
|
|
|
|
# Stream new content
|
|
if response and len(response) > last_printed_len:
|
|
new_text = response[last_printed_len:]
|
|
if show_output and new_text.strip():
|
|
print(new_text, end='', flush=True)
|
|
last_printed_len = len(response)
|
|
stable_count = 0 # Reset stability counter
|
|
else:
|
|
stable_count += 1
|
|
|
|
final_output = response
|
|
|
|
# Check for shell prompt (completion) using controller method
|
|
lines = raw_output.strip().split("\n")
|
|
shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$")
|
|
|
|
found_prompt = False
|
|
for line in reversed(lines[-5:]):
|
|
if shell_pattern.match(line.strip()):
|
|
found_prompt = True
|
|
break
|
|
|
|
if found_prompt and response:
|
|
# Check if Claude is asking a question
|
|
response_lines = [l.strip() for l in response.split("\n") if l.strip()]
|
|
if response_lines and response_lines[-1].endswith("?"):
|
|
awaiting_response = True
|
|
question = response_lines[-1]
|
|
break
|
|
|
|
# Also break if output stable for 5 seconds and we have a response
|
|
if stable_count >= 5 and response:
|
|
break
|
|
|
|
else:
|
|
timed_out = True
|
|
|
|
if show_output:
|
|
print("\n" + "-" * 60)
|
|
|
|
# 4. Update state
|
|
state = load_state(project)
|
|
state["last_output"] = final_output
|
|
state["awaiting_response"] = awaiting_response
|
|
state["timed_out"] = timed_out
|
|
state["last_question"] = question
|
|
state["task_completed"] = datetime.now().isoformat() if not awaiting_response and not timed_out else None
|
|
save_state(project, state)
|
|
|
|
# Notify queue system if awaiting human
|
|
if awaiting_response:
|
|
try:
|
|
from task_completion import set_awaiting_human
|
|
set_awaiting_human(task_id, question, project)
|
|
except ImportError:
|
|
pass
|
|
|
|
if show_output:
|
|
print(f"\n[AWAITING RESPONSE] Claude is waiting for input:")
|
|
print(f" {question}")
|
|
print(f"\nTo respond: luzia cockpit respond {project} <your answer>")
|
|
print(f"Or continue: luzia {project} <follow-up task>")
|
|
elif timed_out:
|
|
if show_output:
|
|
print(f"[STILL RUNNING] Task continues in background (timeout {timeout}s)")
|
|
print(f" Monitor: luzia cockpit output {project}")
|
|
print(f" Attach: luzia cockpit attach {project}")
|
|
else:
|
|
if show_output:
|
|
print(f"[COMPLETED] Task finished")
|
|
|
|
return {
|
|
"success": True,
|
|
"task_id": task_id,
|
|
"session_id": session_id,
|
|
"output": final_output,
|
|
"awaiting_response": awaiting_response,
|
|
"timed_out": timed_out,
|
|
"question": question
|
|
}
|
|
|
|
|
|
def cockpit_continue(project: str, message: str, config: dict,
|
|
show_output: bool = True, timeout: int = 600) -> Dict:
|
|
"""
|
|
Continue an existing cockpit session with a follow-up message.
|
|
|
|
Uses DockerTmuxController for robust tmux interaction.
|
|
|
|
Args:
|
|
project: Project name
|
|
message: Follow-up message
|
|
config: Luzia config dict
|
|
show_output: If True, print output in real-time
|
|
timeout: Max seconds to wait
|
|
|
|
Returns: {"success": bool, "output": str, "awaiting_response": bool}
|
|
"""
|
|
from datetime import datetime
|
|
|
|
state = load_state(project)
|
|
container_name = get_container_name(project)
|
|
|
|
# Check if cockpit is running
|
|
if not container_running(project):
|
|
start_result = cockpit_start(project, config)
|
|
if not start_result["success"]:
|
|
return {"success": False, "error": start_result["message"]}
|
|
state = load_state(project)
|
|
|
|
# Initialize DockerTmuxController
|
|
controller = DockerTmuxController(container_name)
|
|
|
|
session_id = state.get("session_id")
|
|
if not session_id:
|
|
return {"success": False, "error": "No session ID found"}
|
|
|
|
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(message) & 0xffff)[2:]
|
|
|
|
# Escape single quotes in message
|
|
escaped_message = message.replace("'", "'\\''")
|
|
|
|
# Build Claude command - always resume for continuation
|
|
claude_cmd = f"echo '{escaped_message}' | claude --print -p --resume {session_id}"
|
|
|
|
if show_output:
|
|
print(f"[cockpit:{project}:{task_id}] Continuing session")
|
|
print("-" * 60)
|
|
|
|
# Send via controller
|
|
if not controller.send_keys(claude_cmd, enter=True, delay_enter=0.5):
|
|
return {"success": False, "error": "Failed to send command to tmux"}
|
|
|
|
# Stream output with proper completion detection
|
|
start_time = time.time()
|
|
final_output = ""
|
|
awaiting_response = False
|
|
question = None
|
|
last_printed_len = 0
|
|
stable_count = 0
|
|
|
|
time.sleep(2) # Give Claude time to start
|
|
|
|
while time.time() - start_time < timeout:
|
|
time.sleep(1)
|
|
|
|
# Capture current output
|
|
raw_output = controller.capture_pane()
|
|
|
|
# Extract Claude response
|
|
response = controller.extract_response(
|
|
raw_output,
|
|
command_marker="claude --print",
|
|
session_id=session_id
|
|
)
|
|
|
|
# Stream new content
|
|
if response and len(response) > last_printed_len:
|
|
new_text = response[last_printed_len:]
|
|
if show_output and new_text.strip():
|
|
print(new_text, end='', flush=True)
|
|
last_printed_len = len(response)
|
|
stable_count = 0
|
|
else:
|
|
stable_count += 1
|
|
|
|
final_output = response
|
|
|
|
# Check for shell prompt (completion)
|
|
lines = raw_output.strip().split("\n")
|
|
shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$")
|
|
|
|
found_prompt = False
|
|
for line in reversed(lines[-5:]):
|
|
if shell_pattern.match(line.strip()):
|
|
found_prompt = True
|
|
break
|
|
|
|
if found_prompt and response:
|
|
response_lines = [l.strip() for l in response.split("\n") if l.strip()]
|
|
if response_lines and response_lines[-1].endswith("?"):
|
|
awaiting_response = True
|
|
question = response_lines[-1]
|
|
break
|
|
|
|
# Break if output stable for 3 seconds with response
|
|
if stable_count >= 3 and response:
|
|
break
|
|
|
|
if show_output:
|
|
print("\n" + "-" * 60)
|
|
|
|
# Update state
|
|
state["last_output"] = final_output
|
|
state["awaiting_response"] = awaiting_response
|
|
state["last_question"] = question
|
|
save_state(project, state)
|
|
|
|
if awaiting_response and show_output:
|
|
print(f"[AWAITING RESPONSE] {question}")
|
|
print(f"Respond: luzia cockpit respond {project} <answer>")
|
|
elif show_output:
|
|
print("[COMPLETED]")
|
|
|
|
return {
|
|
"success": True,
|
|
"task_id": task_id,
|
|
"output": final_output,
|
|
"awaiting_response": awaiting_response,
|
|
"question": question
|
|
}
|
|
|
|
|
|
# CLI Handler for luzia integration
|
|
def route_cockpit(config: dict, args: list, kwargs: dict) -> int:
|
|
"""
|
|
Route cockpit subcommands.
|
|
|
|
luzia cockpit start <project>
|
|
luzia cockpit stop <project>
|
|
luzia cockpit remove <project>
|
|
luzia cockpit send <project> <message>
|
|
luzia cockpit respond <project> <answer>
|
|
luzia cockpit output <project>
|
|
luzia cockpit status [project]
|
|
luzia cockpit attach <project>
|
|
"""
|
|
if not args:
|
|
print("Usage: luzia cockpit <command> [args]")
|
|
print("")
|
|
print("Commands:")
|
|
print(" start <project> Start cockpit container")
|
|
print(" stop <project> Stop (freeze) cockpit")
|
|
print(" remove <project> Remove cockpit completely")
|
|
print(" send <project> <msg> Send new message to Claude")
|
|
print(" respond <project> <ans> Respond to pending question")
|
|
print(" output <project> Get recent output")
|
|
print(" status [project] Show cockpit status")
|
|
print(" attach <project> Show attach command")
|
|
print("")
|
|
print("Queue commands (per-project serialized, parallel across projects):")
|
|
print(" queue <project> <task> Queue task for background dispatch")
|
|
print(" queue --status Show dispatcher status")
|
|
print(" dispatch Run one dispatch cycle")
|
|
return 0
|
|
|
|
subcommand = args[0]
|
|
subargs = args[1:]
|
|
|
|
if subcommand == "start":
|
|
if not subargs:
|
|
print("Usage: luzia cockpit start <project>")
|
|
return 1
|
|
result = cockpit_start(subargs[0], config)
|
|
if result["success"]:
|
|
print(f"OK: {result['message']}")
|
|
print(f" Container: {result['container']}")
|
|
print(f" Session: {result['session_id']}")
|
|
return 0
|
|
print(f"Error: {result['message']}")
|
|
return 1
|
|
|
|
if subcommand == "stop":
|
|
if not subargs:
|
|
print("Usage: luzia cockpit stop <project>")
|
|
return 1
|
|
result = cockpit_stop(subargs[0])
|
|
print(result["message"])
|
|
return 0 if result["success"] else 1
|
|
|
|
if subcommand == "remove":
|
|
if not subargs:
|
|
print("Usage: luzia cockpit remove <project>")
|
|
return 1
|
|
result = cockpit_remove(subargs[0])
|
|
print(result["message"])
|
|
return 0 if result["success"] else 1
|
|
|
|
if subcommand == "send":
|
|
if len(subargs) < 2:
|
|
print("Usage: luzia cockpit send <project> <message>")
|
|
return 1
|
|
project = subargs[0]
|
|
message = " ".join(subargs[1:])
|
|
result = cockpit_send(project, message, is_response=False)
|
|
if result["success"]:
|
|
print("--- Claude Output ---")
|
|
print(result.get("output", ""))
|
|
if result.get("awaiting_response"):
|
|
print("\n--- AWAITING RESPONSE ---")
|
|
print(f"Question: {result.get('question')}")
|
|
return 0
|
|
print(f"Error: {result['message']}")
|
|
return 1
|
|
|
|
if subcommand == "respond":
|
|
if len(subargs) < 2:
|
|
print("Usage: luzia cockpit respond <project> <answer>")
|
|
return 1
|
|
project = subargs[0]
|
|
answer = " ".join(subargs[1:])
|
|
result = cockpit_send(project, answer, is_response=True)
|
|
if result["success"]:
|
|
print("--- Claude Output ---")
|
|
print(result.get("output", ""))
|
|
if result.get("awaiting_response"):
|
|
print("\n--- AWAITING RESPONSE ---")
|
|
print(f"Question: {result.get('question')}")
|
|
return 0
|
|
print(f"Error: {result['message']}")
|
|
return 1
|
|
|
|
if subcommand == "output":
|
|
if not subargs:
|
|
print("Usage: luzia cockpit output <project>")
|
|
return 1
|
|
result = cockpit_output(subargs[0])
|
|
if result["success"]:
|
|
print(result["output"])
|
|
return 0
|
|
print(f"Error: {result.get('message', 'Unknown error')}")
|
|
return 1
|
|
|
|
if subcommand == "status":
|
|
project = subargs[0] if subargs else None
|
|
result = cockpit_status(project)
|
|
|
|
if not result["cockpits"]:
|
|
print("No cockpits found")
|
|
return 0
|
|
|
|
print(f"{'PROJECT':<15} {'STATUS':<10} {'SESSION':<36} {'WAITING'}")
|
|
print("-" * 80)
|
|
for cp in result["cockpits"]:
|
|
waiting = "YES - " + (cp.get("last_question", "")[:20] + "..." if cp.get("last_question") else "") if cp.get("awaiting_response") else "no"
|
|
session_id = cp.get('session_id') or '-' # Handle None values
|
|
print(f"{cp['project']:<15} {cp['status']:<10} {session_id:<36} {waiting}")
|
|
return 0
|
|
|
|
if subcommand == "attach":
|
|
if not subargs:
|
|
print("Usage: luzia cockpit attach <project>")
|
|
return 1
|
|
cmd = cockpit_attach_cmd(subargs[0])
|
|
print(f"Run this command to attach:")
|
|
print(f" {cmd}")
|
|
return 0
|
|
|
|
if subcommand == "queue":
|
|
if len(subargs) < 2:
|
|
print("Usage: luzia cockpit queue <project> <task>")
|
|
print(" luzia cockpit queue --status")
|
|
return 1
|
|
if subargs[0] == "--status":
|
|
result = cockpit_queue_status()
|
|
if result["success"]:
|
|
print(json.dumps(result["status"], indent=2))
|
|
return 0
|
|
print(f"Error: {result['message']}")
|
|
return 1
|
|
|
|
project = subargs[0]
|
|
task = " ".join(subargs[1:])
|
|
result = cockpit_queue_task(project, task)
|
|
if result["success"]:
|
|
print(f"OK: {result['message']}")
|
|
print(f" Task ID: {result['task_id']}")
|
|
print(f" Queue position: {result.get('queue_position', 'unknown')}")
|
|
return 0
|
|
print(f"Error: {result['message']}")
|
|
return 1
|
|
|
|
if subcommand == "dispatch":
|
|
# Run one dispatch cycle
|
|
try:
|
|
from cockpit_queue_dispatcher import CockpitQueueDispatcher
|
|
import yaml
|
|
|
|
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
|
|
if config_path.exists():
|
|
cfg = yaml.safe_load(config_path.read_text())
|
|
else:
|
|
cfg = config
|
|
|
|
dispatcher = CockpitQueueDispatcher(cfg)
|
|
result = dispatcher.run_dispatch_cycle()
|
|
print(json.dumps(result, indent=2))
|
|
return 0
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return 1
|
|
|
|
print(f"Unknown subcommand: {subcommand}")
|
|
return 1
|