Files
luzia/lib/cockpit.py
admin b2a0dec79b Add load-aware cockpit queue dispatcher
- New CockpitQueueDispatcher: per-project serialized task queues
- LoadMonitor: checks system load/memory before dispatching
- Parallel execution across projects with round-robin fairness
- CLI commands: cockpit queue, cockpit dispatch

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 11:35:33 -03:00

1280 lines
42 KiB
Python

#!/usr/bin/env python3
"""
Luzia Cockpit - Human-in-the-Loop Claude Sessions
Provides Docker container management for pausable Claude agent sessions.
Key features:
- Docker stop/start freezes/resumes entire session state
- Claude sessions persist via --session-id and --resume
- tmux for human attachment when needed
- Multi-turn conversation support
Architecture:
Imports TmuxCLIController from claude-code-tools-local package and extends
it with DockerTmuxAdapter for Docker-containerized tmux sessions.
This enables importing upstream improvements while adding Docker support.
Usage:
luzia cockpit start <project> Start cockpit container
luzia cockpit stop <project> Stop (freeze) cockpit
luzia cockpit send <project> <msg> Send message to cockpit
luzia cockpit respond <project> <answer> Respond to pending question
luzia cockpit status [project] Show cockpit status
luzia cockpit output <project> Get recent output
luzia cockpit attach <project> Attach to tmux session
"""
import json
import os
import subprocess
import uuid
import time
import hashlib
import re
from pathlib import Path
from typing import Dict, Optional, Tuple, List
# Import TmuxCLIController from installed claude-code-tools-local package
from claude_code_tools.tmux_cli_controller import TmuxCLIController
class DockerTmuxAdapter(TmuxCLIController):
"""
Docker adapter for TmuxCLIController.
Extends TmuxCLIController to execute tmux commands inside a Docker container
rather than on the host system. This enables:
- Inheriting all TmuxCLIController methods automatically
- Pulling upstream improvements from claude-code-tools
- Adding Docker-specific functionality
The key override is _run_tmux_command() which prepends 'docker exec' to all
tmux commands.
"""
def __init__(self, container_name: str, tmux_session: str = "agent", tmux_window: str = "main"):
"""
Initialize adapter for a Docker container's tmux session.
Args:
container_name: Docker container name
tmux_session: tmux session name inside container (default: agent)
tmux_window: tmux window name (default: main)
"""
super().__init__(session_name=tmux_session, window_name=tmux_window)
self.container_name = container_name
self.tmux_session = tmux_session
self.tmux_window = tmux_window
self.target = f"{tmux_session}:{tmux_window}"
def _run_tmux_command(self, command: List[str]) -> Tuple[str, int]:
"""
Override: Run tmux command inside the Docker container.
This is the key override that makes all inherited TmuxCLIController
methods work inside Docker. Instead of running 'tmux <cmd>', we run
'docker exec <container> tmux <cmd>'.
Args:
command: List of command components (without 'tmux' prefix)
Returns:
Tuple of (output, exit_code)
"""
if not self.is_container_running():
return "", 1
cmd = ["docker", "exec", self.container_name, "tmux"] + command
result = subprocess.run(cmd, capture_output=True, text=True)
return result.stdout.strip(), result.returncode
def is_container_running(self) -> bool:
"""Check if the Docker container is running."""
result = subprocess.run(
["docker", "ps", "--filter", f"name={self.container_name}", "--format", "{{.Names}}"],
capture_output=True, text=True
)
return self.container_name in result.stdout
def send_keys(self, text: str, enter: bool = True, delay_enter: float = 0.5) -> bool:
"""
Send keystrokes to the tmux pane.
Args:
text: Text to send
enter: Whether to press Enter after text
delay_enter: Delay in seconds before pressing Enter
Returns:
True if successful
"""
if not self.is_container_running():
return False
# Send text first
_, code = self._run_tmux_command(["send-keys", "-t", self.target, text])
if code != 0:
return False
if enter:
if delay_enter > 0:
time.sleep(delay_enter)
self._run_tmux_command(["send-keys", "-t", self.target, "Enter"])
return True
def capture_pane(self, lines: int = 200) -> str:
"""
Capture output from the tmux pane.
Args:
lines: Number of lines to capture from scrollback
Returns:
Captured text content
"""
if not self.is_container_running():
return ""
output, code = self._run_tmux_command([
"capture-pane", "-t", self.target, "-p", "-S", f"-{lines}"
])
return output if code == 0 else ""
def wait_for_prompt(self, prompt_pattern: str, timeout: int = 60,
check_interval: float = 1.0) -> bool:
"""
Wait for a specific prompt pattern to appear.
Args:
prompt_pattern: Regex pattern to match
timeout: Maximum seconds to wait
check_interval: Seconds between checks
Returns:
True if pattern found, False on timeout
"""
pattern = re.compile(prompt_pattern)
start_time = time.time()
while time.time() - start_time < timeout:
content = self.capture_pane(lines=50)
if pattern.search(content):
return True
time.sleep(check_interval)
return False
def wait_for_idle(self, idle_time: float = 3.0, check_interval: float = 1.0,
timeout: int = 600) -> Tuple[bool, str]:
"""
Wait for pane output to stabilize (no changes for idle_time seconds).
Args:
idle_time: Seconds of no change to consider idle
check_interval: Seconds between checks
timeout: Maximum seconds to wait
Returns:
Tuple of (is_idle, final_content)
"""
start_time = time.time()
last_change_time = time.time()
last_hash = ""
final_content = ""
while time.time() - start_time < timeout:
content = self.capture_pane()
content_hash = hashlib.md5(content.encode()).hexdigest()
if content_hash != last_hash:
last_hash = content_hash
last_change_time = time.time()
final_content = content
elif time.time() - last_change_time >= idle_time:
return True, final_content
time.sleep(check_interval)
return False, final_content
def wait_for_shell_prompt(self, timeout: int = 600) -> Tuple[bool, str]:
"""
Wait for shell prompt to appear, indicating command completion.
Detects common shell prompts like:
- root@hostname:/path#
- user@hostname:/path$
Args:
timeout: Maximum seconds to wait
Returns:
Tuple of (found_prompt, final_content)
"""
# Pattern matches common shell prompts
shell_pattern = r"(root|[\w-]+)@[\w-]+:.*[#$]\s*$"
start_time = time.time()
last_content = ""
while time.time() - start_time < timeout:
content = self.capture_pane(lines=50)
lines = content.strip().split("\n")
# Check last few lines for shell prompt
for line in reversed(lines[-5:]):
line = line.strip()
if re.match(shell_pattern, line):
return True, content
last_content = content
time.sleep(1)
return False, last_content
def extract_response(self, full_output: str, command_marker: str,
session_id: str = None) -> str:
"""
Extract Claude's response from tmux output.
Filters out:
- The command line itself
- Shell prompts
- Session ID continuation lines
Args:
full_output: Raw captured output
command_marker: Marker to identify command line (e.g., "claude --print")
session_id: Claude session ID to filter
Returns:
Clean response text
"""
lines = full_output.strip().split("\n")
response_lines = []
in_response = False
shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$")
for line in lines:
stripped = line.strip()
# Skip empty lines before we start
if not in_response and not stripped:
continue
# Start capturing after command marker
if command_marker in line:
in_response = True
continue
# Skip session ID lines (continuation of wrapped command)
if in_response and session_id and session_id in line:
continue
# Stop at shell prompt
if in_response and shell_pattern.match(stripped):
break
# Capture response content
if in_response:
response_lines.append(line.rstrip())
return "\n".join(response_lines).strip()
def send_interrupt(self) -> bool:
"""Send Ctrl+C to interrupt running command."""
if not self.is_container_running():
return False
_, code = self._run_tmux_command(["send-keys", "-t", self.target, "C-c"])
return code == 0
def clear_pane(self) -> bool:
"""Clear the pane screen."""
if not self.is_container_running():
return False
_, code = self._run_tmux_command(["send-keys", "-t", self.target, "C-l"])
return code == 0
# Backward compatibility alias
DockerTmuxController = DockerTmuxAdapter
# Constants
COCKPIT_IMAGE = "luzia-cockpit:latest"
COCKPIT_PREFIX = "luzia-cockpit-"
COCKPIT_STATE_DIR = Path("/var/lib/luz-orchestrator/cockpits")
# Ensure state directory exists
COCKPIT_STATE_DIR.mkdir(parents=True, exist_ok=True)
def get_container_name(project: str) -> str:
"""Get cockpit container name for a project."""
return f"{COCKPIT_PREFIX}{project}"
def get_state_file(project: str) -> Path:
"""Get state file path for a project's cockpit."""
return COCKPIT_STATE_DIR / f"{project}.json"
def load_state(project: str) -> Dict:
"""Load cockpit state for a project."""
state_file = get_state_file(project)
if state_file.exists():
return json.loads(state_file.read_text())
return {
"project": project,
"session_id": None,
"status": "not_started",
"last_output": None,
"awaiting_response": False,
"last_question": None,
}
def save_state(project: str, state: Dict) -> None:
"""Save cockpit state for a project."""
state_file = get_state_file(project)
state_file.write_text(json.dumps(state, indent=2))
def container_exists(project: str) -> bool:
"""Check if cockpit container exists."""
result = subprocess.run(
["docker", "ps", "-a", "--filter", f"name={get_container_name(project)}", "--format", "{{.Names}}"],
capture_output=True, text=True
)
return get_container_name(project) in result.stdout
def container_running(project: str) -> bool:
"""Check if cockpit container is running."""
result = subprocess.run(
["docker", "ps", "--filter", f"name={get_container_name(project)}", "--format", "{{.Names}}"],
capture_output=True, text=True
)
return get_container_name(project) in result.stdout
def cockpit_start(project: str, config: dict) -> Dict:
"""
Start or resume a cockpit container for a project.
Returns: {"success": bool, "message": str, "container": str, "session_id": str}
"""
container_name = get_container_name(project)
state = load_state(project)
# Check if project exists
projects = config.get("projects", {})
if project not in projects and project != "admin":
return {"success": False, "message": f"Unknown project: {project}"}
# Get project home directory
if project == "admin":
home_dir = "/home/admin"
else:
home_dir = projects[project].get("home", f"/home/{project}")
# If container exists but stopped, restart it
if container_exists(project) and not container_running(project):
result = subprocess.run(["docker", "start", container_name], capture_output=True, text=True)
if result.returncode == 0:
state["status"] = "running"
save_state(project, state)
return {
"success": True,
"message": f"Resumed cockpit for {project}",
"container": container_name,
"session_id": state.get("session_id")
}
return {"success": False, "message": f"Failed to resume: {result.stderr}"}
# If container is running, return info
if container_running(project):
return {
"success": True,
"message": f"Cockpit already running for {project}",
"container": container_name,
"session_id": state.get("session_id")
}
# Create new container
# Mount project workspace and Claude credentials
cmd = [
"docker", "run", "-d",
"--name", container_name,
"-v", f"{home_dir}:/workspace",
"-v", "/home/admin/.claude:/root/.claude", # Claude credentials
"-v", f"{COCKPIT_STATE_DIR}:/var/cockpit", # State persistence
COCKPIT_IMAGE
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return {"success": False, "message": f"Failed to start: {result.stderr}"}
# Initialize state
state = {
"project": project,
"session_id": str(uuid.uuid4()),
"status": "running",
"session_started": False, # True after first message sent
"last_output": None,
"awaiting_response": False,
"last_question": None,
}
save_state(project, state)
return {
"success": True,
"message": f"Started cockpit for {project}",
"container": container_name,
"session_id": state["session_id"]
}
def cockpit_stop(project: str) -> Dict:
"""
Stop (freeze) a cockpit container.
Returns: {"success": bool, "message": str}
"""
container_name = get_container_name(project)
if not container_exists(project):
return {"success": False, "message": f"No cockpit found for {project}"}
if not container_running(project):
return {"success": True, "message": f"Cockpit already stopped for {project}"}
result = subprocess.run(["docker", "stop", container_name], capture_output=True, text=True)
if result.returncode == 0:
state = load_state(project)
state["status"] = "stopped"
save_state(project, state)
return {"success": True, "message": f"Stopped cockpit for {project}"}
return {"success": False, "message": f"Failed to stop: {result.stderr}"}
def cockpit_remove(project: str) -> Dict:
"""
Remove a cockpit container completely.
Returns: {"success": bool, "message": str}
"""
container_name = get_container_name(project)
if not container_exists(project):
return {"success": False, "message": f"No cockpit found for {project}"}
# Force remove
result = subprocess.run(["docker", "rm", "-f", container_name], capture_output=True, text=True)
if result.returncode == 0:
# Clean up state file
state_file = get_state_file(project)
if state_file.exists():
state_file.unlink()
return {"success": True, "message": f"Removed cockpit for {project}"}
return {"success": False, "message": f"Failed to remove: {result.stderr}"}
def cockpit_send(project: str, message: str, is_response: bool = False) -> Dict:
"""
Send a message to the cockpit Claude session.
Uses DockerTmuxController for robust tmux interaction.
Args:
project: Project name
message: Message to send
is_response: If True, this is responding to a previous question (use --resume)
Returns: {"success": bool, "message": str, "output": str, "awaiting_response": bool}
"""
container_name = get_container_name(project)
state = load_state(project)
if not container_running(project):
return {"success": False, "message": f"Cockpit not running for {project}. Run 'luzia cockpit start {project}' first."}
# Initialize DockerTmuxController
controller = DockerTmuxController(container_name)
session_id = state.get("session_id")
if not session_id:
session_id = str(uuid.uuid4())
state["session_id"] = session_id
# Build Claude command
# Use --session-id for first message, --resume for all subsequent messages
# Escape single quotes in message
escaped_message = message.replace("'", "'\\''")
if state.get("session_started", False):
# Continue existing session
claude_cmd = f"echo '{escaped_message}' | claude --print -p --resume {session_id}"
else:
# Start new session
claude_cmd = f"echo '{escaped_message}' | claude --print -p --session-id {session_id}"
state["session_started"] = True
save_state(project, state)
# If responding to a waiting state, notify queue system to unblock
if is_response and state.get("awaiting_response"):
try:
from task_completion import resume_from_human
task_id = state.get("task_id", f"cockpit-{project}")
resume_from_human(task_id, message, project)
except ImportError:
pass # task_completion not available
# Clear awaiting state locally
state["awaiting_response"] = False
state["last_question"] = None
save_state(project, state)
# Capture output before sending for comparison
pre_output = controller.capture_pane()
# Send command using controller
if not controller.send_keys(claude_cmd, enter=True, delay_enter=0.5):
return {"success": False, "message": "Failed to send command to tmux"}
# Wait for shell prompt to return (indicates completion)
found_prompt, raw_output = controller.wait_for_shell_prompt(timeout=60)
# Extract clean response
response = controller.extract_response(
raw_output,
command_marker="claude --print",
session_id=session_id
)
# Detect if Claude is asking a question
awaiting = False
question = None
if response:
response_lines = [l.strip() for l in response.split("\n") if l.strip()]
if response_lines and response_lines[-1].endswith("?"):
awaiting = True
question = response_lines[-1]
state["last_output"] = response
state["awaiting_response"] = awaiting
state["last_question"] = question
save_state(project, state)
# If awaiting human response, notify queue system to block project
if awaiting:
try:
from task_completion import set_awaiting_human
task_id = state.get("task_id", f"cockpit-{project}")
set_awaiting_human(task_id, question, project)
except ImportError:
pass
return {
"success": True,
"message": "Message sent",
"output": response,
"awaiting_response": awaiting,
"question": question
}
def cockpit_output(project: str) -> Dict:
"""
Get recent output from the cockpit tmux session.
Uses DockerTmuxController for clean capture.
Returns: {"success": bool, "output": str}
"""
container_name = get_container_name(project)
if not container_running(project):
return {"success": False, "output": "", "message": "Cockpit not running"}
# Use the DockerTmuxController for consistent tmux interaction
controller = DockerTmuxController(container_name)
output = controller.capture_pane(lines=200)
if not output and controller.is_container_running():
# Container running but no output - might be empty pane
return {"success": True, "output": ""}
return {"success": True, "output": output}
def cockpit_status(project: Optional[str] = None) -> Dict:
"""
Get cockpit status for one or all projects.
Returns: {"success": bool, "cockpits": [{"project": str, "status": str, ...}]}
"""
# Try to load project knowledge loader for RAG status
knowledge_loader = None
try:
from project_knowledge_loader import ProjectKnowledgeLoader
knowledge_loader = ProjectKnowledgeLoader()
except ImportError:
pass
def get_has_knowledge(proj: str) -> bool:
"""Check if project has .knowledge/ directory."""
if knowledge_loader:
try:
return knowledge_loader.has_knowledge(proj)
except:
pass
return False
if project:
state = load_state(project)
running = container_running(project)
exists = container_exists(project)
status = "running" if running else ("stopped" if exists else "not_started")
state["status"] = status
return {
"success": True,
"cockpits": [{
"project": project,
"status": status,
"container": get_container_name(project),
"session_id": state.get("session_id"),
"awaiting_response": state.get("awaiting_response", False),
"last_question": state.get("last_question"),
"has_knowledge": get_has_knowledge(project),
}]
}
# List all cockpits
cockpits = []
result = subprocess.run(
["docker", "ps", "-a", "--filter", f"name={COCKPIT_PREFIX}", "--format", "{{.Names}}\t{{.Status}}"],
capture_output=True, text=True
)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
container_name = parts[0]
container_status = parts[1] if len(parts) > 1 else "unknown"
# Extract project name
proj = container_name.replace(COCKPIT_PREFIX, "")
state = load_state(proj)
running = "Up" in container_status
cockpits.append({
"project": proj,
"status": "running" if running else "stopped",
"container": container_name,
"docker_status": container_status,
"session_id": state.get("session_id"),
"awaiting_response": state.get("awaiting_response", False),
"has_knowledge": get_has_knowledge(proj),
})
return {"success": True, "cockpits": cockpits}
def cockpit_attach_cmd(project: str) -> str:
"""
Get the command to attach to a cockpit's tmux session.
Returns the docker exec command string.
"""
container_name = get_container_name(project)
return f"docker exec -it {container_name} tmux attach-session -t agent"
def cockpit_queue_task(project: str, task: str, context: str = "",
priority: str = "normal") -> Dict:
"""
Queue a task for background dispatch.
Tasks are queued per-project and dispatched serially within each project,
but in parallel across projects (with load awareness).
Args:
project: Target project name
task: Task description
context: Project context
priority: "high" or "normal"
Returns: {"success": bool, "task_id": str, "message": str}
"""
try:
from cockpit_queue_dispatcher import CockpitQueueDispatcher
import yaml
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
if config_path.exists():
config = yaml.safe_load(config_path.read_text())
else:
config = {"projects": {}}
dispatcher = CockpitQueueDispatcher(config)
task_id = dispatcher.enqueue_task(project, task, context, priority)
return {
"success": True,
"task_id": task_id,
"message": f"Task queued for {project}",
"queue_position": len(dispatcher.get_pending_tasks(project))
}
except ImportError:
return {"success": False, "message": "Queue dispatcher not available"}
except Exception as e:
return {"success": False, "message": str(e)}
def cockpit_queue_status() -> Dict:
"""
Get status of the task queue and dispatcher.
Returns: {"success": bool, "status": dict}
"""
try:
from cockpit_queue_dispatcher import CockpitQueueDispatcher
import yaml
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
if config_path.exists():
config = yaml.safe_load(config_path.read_text())
else:
config = {"projects": {}}
dispatcher = CockpitQueueDispatcher(config)
return {"success": True, "status": dispatcher.get_status()}
except ImportError:
return {"success": False, "message": "Queue dispatcher not available"}
except Exception as e:
return {"success": False, "message": str(e)}
def cockpit_dispatch_task(project: str, task: str, context: str, config: dict,
show_output: bool = True, timeout: int = 600) -> Dict:
"""
Dispatch a task to cockpit and stream output in real-time.
This is the main entry point for all task dispatch in luzia.
Uses DockerTmuxController for robust tmux interaction.
Args:
project: Project name
task: Task description
context: Project context string
config: Luzia config dict
show_output: If True, print output in real-time
timeout: Max seconds to wait for task completion
Returns: {"success": bool, "output": str, "awaiting_response": bool, "session_id": str}
"""
import sys
from datetime import datetime
# Generate task ID for tracking
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:]
# 1. Start cockpit if not running
start_result = cockpit_start(project, config)
if not start_result["success"]:
return {"success": False, "error": start_result["message"], "task_id": task_id}
session_id = start_result["session_id"]
container_name = start_result["container"]
# Initialize DockerTmuxController
controller = DockerTmuxController(container_name)
# Update state with task info
state = load_state(project)
state["task_id"] = task_id
state["current_task"] = task
state["task_started"] = datetime.now().isoformat()
save_state(project, state)
# 2. Build full prompt
project_config = config.get("projects", {}).get(project, {})
project_path = project_config.get("path", f"/home/{project}")
prompt = f"""You are a project agent working on the **{project}** project.
{context}
## Your Task
{task}
## Execution Environment
- Working directory: {project_path}
- You have FULL permission to read, write, and execute files
- Use standard Claude tools (Read, Write, Edit, Bash) directly
- All file operations are pre-authorized
## Guidelines
- Complete the task step by step
- If you need clarification, ask a clear question (ending with ?)
- If you encounter errors, debug and fix them
- Provide a summary when complete
## IMPORTANT: Human Collaboration
If you need human input at any point, ASK. The human is available.
End questions with ? to signal you're waiting for a response."""
# Build Claude command
if state.get("session_started", False):
claude_cmd = f"claude --print -p --resume {session_id}"
else:
claude_cmd = f"claude --print -p --session-id {session_id}"
state["session_started"] = True
save_state(project, state)
# Write prompt to temp file in container for safety (handles special chars)
write_prompt_cmd = f"cat > /tmp/task_prompt.txt << 'ENDOFPROMPT'\n{prompt}\nENDOFPROMPT"
subprocess.run(
["docker", "exec", container_name, "bash", "-c", write_prompt_cmd],
capture_output=True, text=True
)
# Record output before sending for comparison
pre_output = controller.capture_pane()
# Build and send command
exec_cmd = f"cat /tmp/task_prompt.txt | {claude_cmd}"
if show_output:
print(f"[cockpit:{project}:{task_id}] Task sent")
print("-" * 60)
if not controller.send_keys(exec_cmd, enter=True, delay_enter=0.5):
return {"success": False, "error": "Failed to send task to tmux", "task_id": task_id}
# 3. Stream output with proper completion detection
start_time = time.time()
final_output = ""
awaiting_response = False
timed_out = False
question = None
last_printed_len = 0
stable_count = 0
last_hash = ""
# Initial wait for Claude to start
time.sleep(2)
while time.time() - start_time < timeout:
time.sleep(1)
# Capture current output
raw_output = controller.capture_pane()
# Extract Claude response
response = controller.extract_response(
raw_output,
command_marker="claude --print",
session_id=session_id
)
# Stream new content
if response and len(response) > last_printed_len:
new_text = response[last_printed_len:]
if show_output and new_text.strip():
print(new_text, end='', flush=True)
last_printed_len = len(response)
stable_count = 0 # Reset stability counter
else:
stable_count += 1
final_output = response
# Check for shell prompt (completion) using controller method
lines = raw_output.strip().split("\n")
shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$")
found_prompt = False
for line in reversed(lines[-5:]):
if shell_pattern.match(line.strip()):
found_prompt = True
break
if found_prompt and response:
# Check if Claude is asking a question
response_lines = [l.strip() for l in response.split("\n") if l.strip()]
if response_lines and response_lines[-1].endswith("?"):
awaiting_response = True
question = response_lines[-1]
break
# Also break if output stable for 5 seconds and we have a response
if stable_count >= 5 and response:
break
else:
timed_out = True
if show_output:
print("\n" + "-" * 60)
# 4. Update state
state = load_state(project)
state["last_output"] = final_output
state["awaiting_response"] = awaiting_response
state["timed_out"] = timed_out
state["last_question"] = question
state["task_completed"] = datetime.now().isoformat() if not awaiting_response and not timed_out else None
save_state(project, state)
# Notify queue system if awaiting human
if awaiting_response:
try:
from task_completion import set_awaiting_human
set_awaiting_human(task_id, question, project)
except ImportError:
pass
if show_output:
print(f"\n[AWAITING RESPONSE] Claude is waiting for input:")
print(f" {question}")
print(f"\nTo respond: luzia cockpit respond {project} <your answer>")
print(f"Or continue: luzia {project} <follow-up task>")
elif timed_out:
if show_output:
print(f"[STILL RUNNING] Task continues in background (timeout {timeout}s)")
print(f" Monitor: luzia cockpit output {project}")
print(f" Attach: luzia cockpit attach {project}")
else:
if show_output:
print(f"[COMPLETED] Task finished")
return {
"success": True,
"task_id": task_id,
"session_id": session_id,
"output": final_output,
"awaiting_response": awaiting_response,
"timed_out": timed_out,
"question": question
}
def cockpit_continue(project: str, message: str, config: dict,
show_output: bool = True, timeout: int = 600) -> Dict:
"""
Continue an existing cockpit session with a follow-up message.
Uses DockerTmuxController for robust tmux interaction.
Args:
project: Project name
message: Follow-up message
config: Luzia config dict
show_output: If True, print output in real-time
timeout: Max seconds to wait
Returns: {"success": bool, "output": str, "awaiting_response": bool}
"""
from datetime import datetime
state = load_state(project)
container_name = get_container_name(project)
# Check if cockpit is running
if not container_running(project):
start_result = cockpit_start(project, config)
if not start_result["success"]:
return {"success": False, "error": start_result["message"]}
state = load_state(project)
# Initialize DockerTmuxController
controller = DockerTmuxController(container_name)
session_id = state.get("session_id")
if not session_id:
return {"success": False, "error": "No session ID found"}
task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(message) & 0xffff)[2:]
# Escape single quotes in message
escaped_message = message.replace("'", "'\\''")
# Build Claude command - always resume for continuation
claude_cmd = f"echo '{escaped_message}' | claude --print -p --resume {session_id}"
if show_output:
print(f"[cockpit:{project}:{task_id}] Continuing session")
print("-" * 60)
# Send via controller
if not controller.send_keys(claude_cmd, enter=True, delay_enter=0.5):
return {"success": False, "error": "Failed to send command to tmux"}
# Stream output with proper completion detection
start_time = time.time()
final_output = ""
awaiting_response = False
question = None
last_printed_len = 0
stable_count = 0
time.sleep(2) # Give Claude time to start
while time.time() - start_time < timeout:
time.sleep(1)
# Capture current output
raw_output = controller.capture_pane()
# Extract Claude response
response = controller.extract_response(
raw_output,
command_marker="claude --print",
session_id=session_id
)
# Stream new content
if response and len(response) > last_printed_len:
new_text = response[last_printed_len:]
if show_output and new_text.strip():
print(new_text, end='', flush=True)
last_printed_len = len(response)
stable_count = 0
else:
stable_count += 1
final_output = response
# Check for shell prompt (completion)
lines = raw_output.strip().split("\n")
shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$")
found_prompt = False
for line in reversed(lines[-5:]):
if shell_pattern.match(line.strip()):
found_prompt = True
break
if found_prompt and response:
response_lines = [l.strip() for l in response.split("\n") if l.strip()]
if response_lines and response_lines[-1].endswith("?"):
awaiting_response = True
question = response_lines[-1]
break
# Break if output stable for 3 seconds with response
if stable_count >= 3 and response:
break
if show_output:
print("\n" + "-" * 60)
# Update state
state["last_output"] = final_output
state["awaiting_response"] = awaiting_response
state["last_question"] = question
save_state(project, state)
if awaiting_response and show_output:
print(f"[AWAITING RESPONSE] {question}")
print(f"Respond: luzia cockpit respond {project} <answer>")
elif show_output:
print("[COMPLETED]")
return {
"success": True,
"task_id": task_id,
"output": final_output,
"awaiting_response": awaiting_response,
"question": question
}
# CLI Handler for luzia integration
def route_cockpit(config: dict, args: list, kwargs: dict) -> int:
"""
Route cockpit subcommands.
luzia cockpit start <project>
luzia cockpit stop <project>
luzia cockpit remove <project>
luzia cockpit send <project> <message>
luzia cockpit respond <project> <answer>
luzia cockpit output <project>
luzia cockpit status [project]
luzia cockpit attach <project>
"""
if not args:
print("Usage: luzia cockpit <command> [args]")
print("")
print("Commands:")
print(" start <project> Start cockpit container")
print(" stop <project> Stop (freeze) cockpit")
print(" remove <project> Remove cockpit completely")
print(" send <project> <msg> Send new message to Claude")
print(" respond <project> <ans> Respond to pending question")
print(" output <project> Get recent output")
print(" status [project] Show cockpit status")
print(" attach <project> Show attach command")
print("")
print("Queue commands (per-project serialized, parallel across projects):")
print(" queue <project> <task> Queue task for background dispatch")
print(" queue --status Show dispatcher status")
print(" dispatch Run one dispatch cycle")
return 0
subcommand = args[0]
subargs = args[1:]
if subcommand == "start":
if not subargs:
print("Usage: luzia cockpit start <project>")
return 1
result = cockpit_start(subargs[0], config)
if result["success"]:
print(f"OK: {result['message']}")
print(f" Container: {result['container']}")
print(f" Session: {result['session_id']}")
return 0
print(f"Error: {result['message']}")
return 1
if subcommand == "stop":
if not subargs:
print("Usage: luzia cockpit stop <project>")
return 1
result = cockpit_stop(subargs[0])
print(result["message"])
return 0 if result["success"] else 1
if subcommand == "remove":
if not subargs:
print("Usage: luzia cockpit remove <project>")
return 1
result = cockpit_remove(subargs[0])
print(result["message"])
return 0 if result["success"] else 1
if subcommand == "send":
if len(subargs) < 2:
print("Usage: luzia cockpit send <project> <message>")
return 1
project = subargs[0]
message = " ".join(subargs[1:])
result = cockpit_send(project, message, is_response=False)
if result["success"]:
print("--- Claude Output ---")
print(result.get("output", ""))
if result.get("awaiting_response"):
print("\n--- AWAITING RESPONSE ---")
print(f"Question: {result.get('question')}")
return 0
print(f"Error: {result['message']}")
return 1
if subcommand == "respond":
if len(subargs) < 2:
print("Usage: luzia cockpit respond <project> <answer>")
return 1
project = subargs[0]
answer = " ".join(subargs[1:])
result = cockpit_send(project, answer, is_response=True)
if result["success"]:
print("--- Claude Output ---")
print(result.get("output", ""))
if result.get("awaiting_response"):
print("\n--- AWAITING RESPONSE ---")
print(f"Question: {result.get('question')}")
return 0
print(f"Error: {result['message']}")
return 1
if subcommand == "output":
if not subargs:
print("Usage: luzia cockpit output <project>")
return 1
result = cockpit_output(subargs[0])
if result["success"]:
print(result["output"])
return 0
print(f"Error: {result.get('message', 'Unknown error')}")
return 1
if subcommand == "status":
project = subargs[0] if subargs else None
result = cockpit_status(project)
if not result["cockpits"]:
print("No cockpits found")
return 0
print(f"{'PROJECT':<15} {'STATUS':<10} {'SESSION':<36} {'WAITING'}")
print("-" * 80)
for cp in result["cockpits"]:
waiting = "YES - " + (cp.get("last_question", "")[:20] + "..." if cp.get("last_question") else "") if cp.get("awaiting_response") else "no"
session_id = cp.get('session_id') or '-' # Handle None values
print(f"{cp['project']:<15} {cp['status']:<10} {session_id:<36} {waiting}")
return 0
if subcommand == "attach":
if not subargs:
print("Usage: luzia cockpit attach <project>")
return 1
cmd = cockpit_attach_cmd(subargs[0])
print(f"Run this command to attach:")
print(f" {cmd}")
return 0
if subcommand == "queue":
if len(subargs) < 2:
print("Usage: luzia cockpit queue <project> <task>")
print(" luzia cockpit queue --status")
return 1
if subargs[0] == "--status":
result = cockpit_queue_status()
if result["success"]:
print(json.dumps(result["status"], indent=2))
return 0
print(f"Error: {result['message']}")
return 1
project = subargs[0]
task = " ".join(subargs[1:])
result = cockpit_queue_task(project, task)
if result["success"]:
print(f"OK: {result['message']}")
print(f" Task ID: {result['task_id']}")
print(f" Queue position: {result.get('queue_position', 'unknown')}")
return 0
print(f"Error: {result['message']}")
return 1
if subcommand == "dispatch":
# Run one dispatch cycle
try:
from cockpit_queue_dispatcher import CockpitQueueDispatcher
import yaml
config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml")
if config_path.exists():
cfg = yaml.safe_load(config_path.read_text())
else:
cfg = config
dispatcher = CockpitQueueDispatcher(cfg)
result = dispatcher.run_dispatch_cycle()
print(json.dumps(result, indent=2))
return 0
except Exception as e:
print(f"Error: {e}")
return 1
print(f"Unknown subcommand: {subcommand}")
return 1