#!/usr/bin/env python3 """ Luzia Cockpit - Human-in-the-Loop Claude Sessions Provides Docker container management for pausable Claude agent sessions. Key features: - Docker stop/start freezes/resumes entire session state - Claude sessions persist via --session-id and --resume - tmux for human attachment when needed - Multi-turn conversation support Architecture: Imports TmuxCLIController from claude-code-tools-local package and extends it with DockerTmuxAdapter for Docker-containerized tmux sessions. This enables importing upstream improvements while adding Docker support. Usage: luzia cockpit start Start cockpit container luzia cockpit stop Stop (freeze) cockpit luzia cockpit send Send message to cockpit luzia cockpit respond Respond to pending question luzia cockpit status [project] Show cockpit status luzia cockpit output Get recent output luzia cockpit attach Attach to tmux session """ import json import os import subprocess import uuid import time import hashlib import re from pathlib import Path from typing import Dict, Optional, Tuple, List # Import TmuxCLIController from installed claude-code-tools-local package from claude_code_tools.tmux_cli_controller import TmuxCLIController class DockerTmuxAdapter(TmuxCLIController): """ Docker adapter for TmuxCLIController. Extends TmuxCLIController to execute tmux commands inside a Docker container rather than on the host system. This enables: - Inheriting all TmuxCLIController methods automatically - Pulling upstream improvements from claude-code-tools - Adding Docker-specific functionality The key override is _run_tmux_command() which prepends 'docker exec' to all tmux commands. """ def __init__(self, container_name: str, tmux_session: str = "agent", tmux_window: str = "main"): """ Initialize adapter for a Docker container's tmux session. Args: container_name: Docker container name tmux_session: tmux session name inside container (default: agent) tmux_window: tmux window name (default: main) """ super().__init__(session_name=tmux_session, window_name=tmux_window) self.container_name = container_name self.tmux_session = tmux_session self.tmux_window = tmux_window self.target = f"{tmux_session}:{tmux_window}" def _run_tmux_command(self, command: List[str]) -> Tuple[str, int]: """ Override: Run tmux command inside the Docker container. This is the key override that makes all inherited TmuxCLIController methods work inside Docker. Instead of running 'tmux ', we run 'docker exec tmux '. Args: command: List of command components (without 'tmux' prefix) Returns: Tuple of (output, exit_code) """ if not self.is_container_running(): return "", 1 cmd = ["docker", "exec", self.container_name, "tmux"] + command result = subprocess.run(cmd, capture_output=True, text=True) return result.stdout.strip(), result.returncode def is_container_running(self) -> bool: """Check if the Docker container is running.""" result = subprocess.run( ["docker", "ps", "--filter", f"name={self.container_name}", "--format", "{{.Names}}"], capture_output=True, text=True ) return self.container_name in result.stdout def send_keys(self, text: str, enter: bool = True, delay_enter: float = 0.5) -> bool: """ Send keystrokes to the tmux pane. Args: text: Text to send enter: Whether to press Enter after text delay_enter: Delay in seconds before pressing Enter Returns: True if successful """ if not self.is_container_running(): return False # Send text first _, code = self._run_tmux_command(["send-keys", "-t", self.target, text]) if code != 0: return False if enter: if delay_enter > 0: time.sleep(delay_enter) self._run_tmux_command(["send-keys", "-t", self.target, "Enter"]) return True def capture_pane(self, lines: int = 200) -> str: """ Capture output from the tmux pane. Args: lines: Number of lines to capture from scrollback Returns: Captured text content """ if not self.is_container_running(): return "" output, code = self._run_tmux_command([ "capture-pane", "-t", self.target, "-p", "-S", f"-{lines}" ]) return output if code == 0 else "" def wait_for_prompt(self, prompt_pattern: str, timeout: int = 60, check_interval: float = 1.0) -> bool: """ Wait for a specific prompt pattern to appear. Args: prompt_pattern: Regex pattern to match timeout: Maximum seconds to wait check_interval: Seconds between checks Returns: True if pattern found, False on timeout """ pattern = re.compile(prompt_pattern) start_time = time.time() while time.time() - start_time < timeout: content = self.capture_pane(lines=50) if pattern.search(content): return True time.sleep(check_interval) return False def wait_for_idle(self, idle_time: float = 3.0, check_interval: float = 1.0, timeout: int = 600) -> Tuple[bool, str]: """ Wait for pane output to stabilize (no changes for idle_time seconds). Args: idle_time: Seconds of no change to consider idle check_interval: Seconds between checks timeout: Maximum seconds to wait Returns: Tuple of (is_idle, final_content) """ start_time = time.time() last_change_time = time.time() last_hash = "" final_content = "" while time.time() - start_time < timeout: content = self.capture_pane() content_hash = hashlib.md5(content.encode()).hexdigest() if content_hash != last_hash: last_hash = content_hash last_change_time = time.time() final_content = content elif time.time() - last_change_time >= idle_time: return True, final_content time.sleep(check_interval) return False, final_content def wait_for_shell_prompt(self, timeout: int = 600) -> Tuple[bool, str]: """ Wait for shell prompt to appear, indicating command completion. Detects common shell prompts like: - root@hostname:/path# - user@hostname:/path$ Args: timeout: Maximum seconds to wait Returns: Tuple of (found_prompt, final_content) """ # Pattern matches common shell prompts shell_pattern = r"(root|[\w-]+)@[\w-]+:.*[#$]\s*$" start_time = time.time() last_content = "" while time.time() - start_time < timeout: content = self.capture_pane(lines=50) lines = content.strip().split("\n") # Check last few lines for shell prompt for line in reversed(lines[-5:]): line = line.strip() if re.match(shell_pattern, line): return True, content last_content = content time.sleep(1) return False, last_content def extract_response(self, full_output: str, command_marker: str, session_id: str = None) -> str: """ Extract Claude's response from tmux output. Filters out: - The command line itself - Shell prompts - Session ID continuation lines Args: full_output: Raw captured output command_marker: Marker to identify command line (e.g., "claude --print") session_id: Claude session ID to filter Returns: Clean response text """ lines = full_output.strip().split("\n") response_lines = [] in_response = False shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$") for line in lines: stripped = line.strip() # Skip empty lines before we start if not in_response and not stripped: continue # Start capturing after command marker if command_marker in line: in_response = True continue # Skip session ID lines (continuation of wrapped command) if in_response and session_id and session_id in line: continue # Stop at shell prompt if in_response and shell_pattern.match(stripped): break # Capture response content if in_response: response_lines.append(line.rstrip()) return "\n".join(response_lines).strip() def send_interrupt(self) -> bool: """Send Ctrl+C to interrupt running command.""" if not self.is_container_running(): return False _, code = self._run_tmux_command(["send-keys", "-t", self.target, "C-c"]) return code == 0 def clear_pane(self) -> bool: """Clear the pane screen.""" if not self.is_container_running(): return False _, code = self._run_tmux_command(["send-keys", "-t", self.target, "C-l"]) return code == 0 # Backward compatibility alias DockerTmuxController = DockerTmuxAdapter # Constants COCKPIT_IMAGE = "luzia-cockpit:latest" COCKPIT_PREFIX = "luzia-cockpit-" COCKPIT_STATE_DIR = Path("/var/lib/luz-orchestrator/cockpits") # Ensure state directory exists COCKPIT_STATE_DIR.mkdir(parents=True, exist_ok=True) def get_container_name(project: str) -> str: """Get cockpit container name for a project.""" return f"{COCKPIT_PREFIX}{project}" def get_state_file(project: str) -> Path: """Get state file path for a project's cockpit.""" return COCKPIT_STATE_DIR / f"{project}.json" def load_state(project: str) -> Dict: """Load cockpit state for a project.""" state_file = get_state_file(project) if state_file.exists(): return json.loads(state_file.read_text()) return { "project": project, "session_id": None, "status": "not_started", "last_output": None, "awaiting_response": False, "last_question": None, } def save_state(project: str, state: Dict) -> None: """Save cockpit state for a project.""" state_file = get_state_file(project) state_file.write_text(json.dumps(state, indent=2)) def container_exists(project: str) -> bool: """Check if cockpit container exists.""" result = subprocess.run( ["docker", "ps", "-a", "--filter", f"name={get_container_name(project)}", "--format", "{{.Names}}"], capture_output=True, text=True ) return get_container_name(project) in result.stdout def container_running(project: str) -> bool: """Check if cockpit container is running.""" result = subprocess.run( ["docker", "ps", "--filter", f"name={get_container_name(project)}", "--format", "{{.Names}}"], capture_output=True, text=True ) return get_container_name(project) in result.stdout def cockpit_start(project: str, config: dict) -> Dict: """ Start or resume a cockpit container for a project. Returns: {"success": bool, "message": str, "container": str, "session_id": str} """ container_name = get_container_name(project) state = load_state(project) # Check if project exists projects = config.get("projects", {}) if project not in projects and project != "admin": return {"success": False, "message": f"Unknown project: {project}"} # Get project home directory if project == "admin": home_dir = "/home/admin" else: home_dir = projects[project].get("home", f"/home/{project}") # If container exists but stopped, restart it if container_exists(project) and not container_running(project): result = subprocess.run(["docker", "start", container_name], capture_output=True, text=True) if result.returncode == 0: state["status"] = "running" save_state(project, state) return { "success": True, "message": f"Resumed cockpit for {project}", "container": container_name, "session_id": state.get("session_id") } return {"success": False, "message": f"Failed to resume: {result.stderr}"} # If container is running, return info if container_running(project): return { "success": True, "message": f"Cockpit already running for {project}", "container": container_name, "session_id": state.get("session_id") } # Create new container # Mount project workspace and Claude credentials cmd = [ "docker", "run", "-d", "--name", container_name, "-v", f"{home_dir}:/workspace", "-v", "/home/admin/.claude:/root/.claude", # Claude credentials "-v", f"{COCKPIT_STATE_DIR}:/var/cockpit", # State persistence COCKPIT_IMAGE ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return {"success": False, "message": f"Failed to start: {result.stderr}"} # Initialize state state = { "project": project, "session_id": str(uuid.uuid4()), "status": "running", "session_started": False, # True after first message sent "last_output": None, "awaiting_response": False, "last_question": None, } save_state(project, state) return { "success": True, "message": f"Started cockpit for {project}", "container": container_name, "session_id": state["session_id"] } def cockpit_stop(project: str) -> Dict: """ Stop (freeze) a cockpit container. Returns: {"success": bool, "message": str} """ container_name = get_container_name(project) if not container_exists(project): return {"success": False, "message": f"No cockpit found for {project}"} if not container_running(project): return {"success": True, "message": f"Cockpit already stopped for {project}"} result = subprocess.run(["docker", "stop", container_name], capture_output=True, text=True) if result.returncode == 0: state = load_state(project) state["status"] = "stopped" save_state(project, state) return {"success": True, "message": f"Stopped cockpit for {project}"} return {"success": False, "message": f"Failed to stop: {result.stderr}"} def cockpit_remove(project: str) -> Dict: """ Remove a cockpit container completely. Returns: {"success": bool, "message": str} """ container_name = get_container_name(project) if not container_exists(project): return {"success": False, "message": f"No cockpit found for {project}"} # Force remove result = subprocess.run(["docker", "rm", "-f", container_name], capture_output=True, text=True) if result.returncode == 0: # Clean up state file state_file = get_state_file(project) if state_file.exists(): state_file.unlink() return {"success": True, "message": f"Removed cockpit for {project}"} return {"success": False, "message": f"Failed to remove: {result.stderr}"} def cockpit_send(project: str, message: str, is_response: bool = False) -> Dict: """ Send a message to the cockpit Claude session. Uses DockerTmuxController for robust tmux interaction. Args: project: Project name message: Message to send is_response: If True, this is responding to a previous question (use --resume) Returns: {"success": bool, "message": str, "output": str, "awaiting_response": bool} """ container_name = get_container_name(project) state = load_state(project) if not container_running(project): return {"success": False, "message": f"Cockpit not running for {project}. Run 'luzia cockpit start {project}' first."} # Initialize DockerTmuxController controller = DockerTmuxController(container_name) session_id = state.get("session_id") if not session_id: session_id = str(uuid.uuid4()) state["session_id"] = session_id # Build Claude command # Use --session-id for first message, --resume for all subsequent messages # Escape single quotes in message escaped_message = message.replace("'", "'\\''") if state.get("session_started", False): # Continue existing session claude_cmd = f"echo '{escaped_message}' | claude --print -p --resume {session_id}" else: # Start new session claude_cmd = f"echo '{escaped_message}' | claude --print -p --session-id {session_id}" state["session_started"] = True save_state(project, state) # If responding to a waiting state, notify queue system to unblock if is_response and state.get("awaiting_response"): try: from task_completion import resume_from_human task_id = state.get("task_id", f"cockpit-{project}") resume_from_human(task_id, message, project) except ImportError: pass # task_completion not available # Clear awaiting state locally state["awaiting_response"] = False state["last_question"] = None save_state(project, state) # Capture output before sending for comparison pre_output = controller.capture_pane() # Send command using controller if not controller.send_keys(claude_cmd, enter=True, delay_enter=0.5): return {"success": False, "message": "Failed to send command to tmux"} # Wait for shell prompt to return (indicates completion) found_prompt, raw_output = controller.wait_for_shell_prompt(timeout=60) # Extract clean response response = controller.extract_response( raw_output, command_marker="claude --print", session_id=session_id ) # Detect if Claude is asking a question awaiting = False question = None if response: response_lines = [l.strip() for l in response.split("\n") if l.strip()] if response_lines and response_lines[-1].endswith("?"): awaiting = True question = response_lines[-1] state["last_output"] = response state["awaiting_response"] = awaiting state["last_question"] = question save_state(project, state) # If awaiting human response, notify queue system to block project if awaiting: try: from task_completion import set_awaiting_human task_id = state.get("task_id", f"cockpit-{project}") set_awaiting_human(task_id, question, project) except ImportError: pass return { "success": True, "message": "Message sent", "output": response, "awaiting_response": awaiting, "question": question } def cockpit_output(project: str) -> Dict: """ Get recent output from the cockpit tmux session. Uses DockerTmuxController for clean capture. Returns: {"success": bool, "output": str} """ container_name = get_container_name(project) if not container_running(project): return {"success": False, "output": "", "message": "Cockpit not running"} # Use the DockerTmuxController for consistent tmux interaction controller = DockerTmuxController(container_name) output = controller.capture_pane(lines=200) if not output and controller.is_container_running(): # Container running but no output - might be empty pane return {"success": True, "output": ""} return {"success": True, "output": output} def cockpit_status(project: Optional[str] = None) -> Dict: """ Get cockpit status for one or all projects. Returns: {"success": bool, "cockpits": [{"project": str, "status": str, ...}]} """ # Try to load project knowledge loader for RAG status knowledge_loader = None try: from project_knowledge_loader import ProjectKnowledgeLoader knowledge_loader = ProjectKnowledgeLoader() except ImportError: pass def get_has_knowledge(proj: str) -> bool: """Check if project has .knowledge/ directory.""" if knowledge_loader: try: return knowledge_loader.has_knowledge(proj) except: pass return False if project: state = load_state(project) running = container_running(project) exists = container_exists(project) status = "running" if running else ("stopped" if exists else "not_started") state["status"] = status return { "success": True, "cockpits": [{ "project": project, "status": status, "container": get_container_name(project), "session_id": state.get("session_id"), "awaiting_response": state.get("awaiting_response", False), "last_question": state.get("last_question"), "has_knowledge": get_has_knowledge(project), }] } # List all cockpits cockpits = [] result = subprocess.run( ["docker", "ps", "-a", "--filter", f"name={COCKPIT_PREFIX}", "--format", "{{.Names}}\t{{.Status}}"], capture_output=True, text=True ) for line in result.stdout.strip().split("\n"): if not line: continue parts = line.split("\t") container_name = parts[0] container_status = parts[1] if len(parts) > 1 else "unknown" # Extract project name proj = container_name.replace(COCKPIT_PREFIX, "") state = load_state(proj) running = "Up" in container_status cockpits.append({ "project": proj, "status": "running" if running else "stopped", "container": container_name, "docker_status": container_status, "session_id": state.get("session_id"), "awaiting_response": state.get("awaiting_response", False), "has_knowledge": get_has_knowledge(proj), }) return {"success": True, "cockpits": cockpits} def cockpit_attach_cmd(project: str) -> str: """ Get the command to attach to a cockpit's tmux session. Returns the docker exec command string. """ container_name = get_container_name(project) return f"docker exec -it {container_name} tmux attach-session -t agent" def cockpit_queue_task(project: str, task: str, context: str = "", priority: str = "normal") -> Dict: """ Queue a task for background dispatch. Tasks are queued per-project and dispatched serially within each project, but in parallel across projects (with load awareness). Args: project: Target project name task: Task description context: Project context priority: "high" or "normal" Returns: {"success": bool, "task_id": str, "message": str} """ try: from cockpit_queue_dispatcher import CockpitQueueDispatcher import yaml config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") if config_path.exists(): config = yaml.safe_load(config_path.read_text()) else: config = {"projects": {}} dispatcher = CockpitQueueDispatcher(config) task_id = dispatcher.enqueue_task(project, task, context, priority) return { "success": True, "task_id": task_id, "message": f"Task queued for {project}", "queue_position": len(dispatcher.get_pending_tasks(project)) } except ImportError: return {"success": False, "message": "Queue dispatcher not available"} except Exception as e: return {"success": False, "message": str(e)} def cockpit_queue_status() -> Dict: """ Get status of the task queue and dispatcher. Returns: {"success": bool, "status": dict} """ try: from cockpit_queue_dispatcher import CockpitQueueDispatcher import yaml config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") if config_path.exists(): config = yaml.safe_load(config_path.read_text()) else: config = {"projects": {}} dispatcher = CockpitQueueDispatcher(config) return {"success": True, "status": dispatcher.get_status()} except ImportError: return {"success": False, "message": "Queue dispatcher not available"} except Exception as e: return {"success": False, "message": str(e)} def cockpit_dispatch_task(project: str, task: str, context: str, config: dict, show_output: bool = True, timeout: int = 600) -> Dict: """ Dispatch a task to cockpit and stream output in real-time. This is the main entry point for all task dispatch in luzia. Uses DockerTmuxController for robust tmux interaction. Args: project: Project name task: Task description context: Project context string config: Luzia config dict show_output: If True, print output in real-time timeout: Max seconds to wait for task completion Returns: {"success": bool, "output": str, "awaiting_response": bool, "session_id": str} """ import sys from datetime import datetime # Generate task ID for tracking task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(task) & 0xffff)[2:] # 1. Start cockpit if not running start_result = cockpit_start(project, config) if not start_result["success"]: return {"success": False, "error": start_result["message"], "task_id": task_id} session_id = start_result["session_id"] container_name = start_result["container"] # Initialize DockerTmuxController controller = DockerTmuxController(container_name) # Update state with task info state = load_state(project) state["task_id"] = task_id state["current_task"] = task state["task_started"] = datetime.now().isoformat() save_state(project, state) # 2. Build full prompt project_config = config.get("projects", {}).get(project, {}) project_path = project_config.get("path", f"/home/{project}") prompt = f"""You are a project agent working on the **{project}** project. {context} ## Your Task {task} ## Execution Environment - Working directory: {project_path} - You have FULL permission to read, write, and execute files - Use standard Claude tools (Read, Write, Edit, Bash) directly - All file operations are pre-authorized ## Guidelines - Complete the task step by step - If you need clarification, ask a clear question (ending with ?) - If you encounter errors, debug and fix them - Provide a summary when complete ## IMPORTANT: Human Collaboration If you need human input at any point, ASK. The human is available. End questions with ? to signal you're waiting for a response.""" # Build Claude command if state.get("session_started", False): claude_cmd = f"claude --print -p --resume {session_id}" else: claude_cmd = f"claude --print -p --session-id {session_id}" state["session_started"] = True save_state(project, state) # Write prompt to temp file in container for safety (handles special chars) write_prompt_cmd = f"cat > /tmp/task_prompt.txt << 'ENDOFPROMPT'\n{prompt}\nENDOFPROMPT" subprocess.run( ["docker", "exec", container_name, "bash", "-c", write_prompt_cmd], capture_output=True, text=True ) # Record output before sending for comparison pre_output = controller.capture_pane() # Build and send command exec_cmd = f"cat /tmp/task_prompt.txt | {claude_cmd}" if show_output: print(f"[cockpit:{project}:{task_id}] Task sent") print("-" * 60) if not controller.send_keys(exec_cmd, enter=True, delay_enter=0.5): return {"success": False, "error": "Failed to send task to tmux", "task_id": task_id} # 3. Stream output with proper completion detection start_time = time.time() final_output = "" awaiting_response = False timed_out = False question = None last_printed_len = 0 stable_count = 0 last_hash = "" # Initial wait for Claude to start time.sleep(2) while time.time() - start_time < timeout: time.sleep(1) # Capture current output raw_output = controller.capture_pane() # Extract Claude response response = controller.extract_response( raw_output, command_marker="claude --print", session_id=session_id ) # Stream new content if response and len(response) > last_printed_len: new_text = response[last_printed_len:] if show_output and new_text.strip(): print(new_text, end='', flush=True) last_printed_len = len(response) stable_count = 0 # Reset stability counter else: stable_count += 1 final_output = response # Check for shell prompt (completion) using controller method lines = raw_output.strip().split("\n") shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$") found_prompt = False for line in reversed(lines[-5:]): if shell_pattern.match(line.strip()): found_prompt = True break if found_prompt and response: # Check if Claude is asking a question response_lines = [l.strip() for l in response.split("\n") if l.strip()] if response_lines and response_lines[-1].endswith("?"): awaiting_response = True question = response_lines[-1] break # Also break if output stable for 5 seconds and we have a response if stable_count >= 5 and response: break else: timed_out = True if show_output: print("\n" + "-" * 60) # 4. Update state state = load_state(project) state["last_output"] = final_output state["awaiting_response"] = awaiting_response state["timed_out"] = timed_out state["last_question"] = question state["task_completed"] = datetime.now().isoformat() if not awaiting_response and not timed_out else None save_state(project, state) # Notify queue system if awaiting human if awaiting_response: try: from task_completion import set_awaiting_human set_awaiting_human(task_id, question, project) except ImportError: pass if show_output: print(f"\n[AWAITING RESPONSE] Claude is waiting for input:") print(f" {question}") print(f"\nTo respond: luzia cockpit respond {project} ") print(f"Or continue: luzia {project} ") elif timed_out: if show_output: print(f"[STILL RUNNING] Task continues in background (timeout {timeout}s)") print(f" Monitor: luzia cockpit output {project}") print(f" Attach: luzia cockpit attach {project}") else: if show_output: print(f"[COMPLETED] Task finished") return { "success": True, "task_id": task_id, "session_id": session_id, "output": final_output, "awaiting_response": awaiting_response, "timed_out": timed_out, "question": question } def cockpit_continue(project: str, message: str, config: dict, show_output: bool = True, timeout: int = 600) -> Dict: """ Continue an existing cockpit session with a follow-up message. Uses DockerTmuxController for robust tmux interaction. Args: project: Project name message: Follow-up message config: Luzia config dict show_output: If True, print output in real-time timeout: Max seconds to wait Returns: {"success": bool, "output": str, "awaiting_response": bool} """ from datetime import datetime state = load_state(project) container_name = get_container_name(project) # Check if cockpit is running if not container_running(project): start_result = cockpit_start(project, config) if not start_result["success"]: return {"success": False, "error": start_result["message"]} state = load_state(project) # Initialize DockerTmuxController controller = DockerTmuxController(container_name) session_id = state.get("session_id") if not session_id: return {"success": False, "error": "No session ID found"} task_id = datetime.now().strftime("%H%M%S") + "-" + hex(hash(message) & 0xffff)[2:] # Escape single quotes in message escaped_message = message.replace("'", "'\\''") # Build Claude command - always resume for continuation claude_cmd = f"echo '{escaped_message}' | claude --print -p --resume {session_id}" if show_output: print(f"[cockpit:{project}:{task_id}] Continuing session") print("-" * 60) # Send via controller if not controller.send_keys(claude_cmd, enter=True, delay_enter=0.5): return {"success": False, "error": "Failed to send command to tmux"} # Stream output with proper completion detection start_time = time.time() final_output = "" awaiting_response = False question = None last_printed_len = 0 stable_count = 0 time.sleep(2) # Give Claude time to start while time.time() - start_time < timeout: time.sleep(1) # Capture current output raw_output = controller.capture_pane() # Extract Claude response response = controller.extract_response( raw_output, command_marker="claude --print", session_id=session_id ) # Stream new content if response and len(response) > last_printed_len: new_text = response[last_printed_len:] if show_output and new_text.strip(): print(new_text, end='', flush=True) last_printed_len = len(response) stable_count = 0 else: stable_count += 1 final_output = response # Check for shell prompt (completion) lines = raw_output.strip().split("\n") shell_pattern = re.compile(r"^(root|[\w-]+)@[\w-]+:.*[#$]\s*$") found_prompt = False for line in reversed(lines[-5:]): if shell_pattern.match(line.strip()): found_prompt = True break if found_prompt and response: response_lines = [l.strip() for l in response.split("\n") if l.strip()] if response_lines and response_lines[-1].endswith("?"): awaiting_response = True question = response_lines[-1] break # Break if output stable for 3 seconds with response if stable_count >= 3 and response: break if show_output: print("\n" + "-" * 60) # Update state state["last_output"] = final_output state["awaiting_response"] = awaiting_response state["last_question"] = question save_state(project, state) if awaiting_response and show_output: print(f"[AWAITING RESPONSE] {question}") print(f"Respond: luzia cockpit respond {project} ") elif show_output: print("[COMPLETED]") return { "success": True, "task_id": task_id, "output": final_output, "awaiting_response": awaiting_response, "question": question } # CLI Handler for luzia integration def route_cockpit(config: dict, args: list, kwargs: dict) -> int: """ Route cockpit subcommands. luzia cockpit start luzia cockpit stop luzia cockpit remove luzia cockpit send luzia cockpit respond luzia cockpit output luzia cockpit status [project] luzia cockpit attach """ if not args: print("Usage: luzia cockpit [args]") print("") print("Commands:") print(" start Start cockpit container") print(" stop Stop (freeze) cockpit") print(" remove Remove cockpit completely") print(" send Send new message to Claude") print(" respond Respond to pending question") print(" output Get recent output") print(" status [project] Show cockpit status") print(" attach Show attach command") print("") print("Queue commands (per-project serialized, parallel across projects):") print(" queue Queue task for background dispatch") print(" queue --status Show dispatcher status") print(" dispatch Run one dispatch cycle") return 0 subcommand = args[0] subargs = args[1:] if subcommand == "start": if not subargs: print("Usage: luzia cockpit start ") return 1 result = cockpit_start(subargs[0], config) if result["success"]: print(f"OK: {result['message']}") print(f" Container: {result['container']}") print(f" Session: {result['session_id']}") return 0 print(f"Error: {result['message']}") return 1 if subcommand == "stop": if not subargs: print("Usage: luzia cockpit stop ") return 1 result = cockpit_stop(subargs[0]) print(result["message"]) return 0 if result["success"] else 1 if subcommand == "remove": if not subargs: print("Usage: luzia cockpit remove ") return 1 result = cockpit_remove(subargs[0]) print(result["message"]) return 0 if result["success"] else 1 if subcommand == "send": if len(subargs) < 2: print("Usage: luzia cockpit send ") return 1 project = subargs[0] message = " ".join(subargs[1:]) result = cockpit_send(project, message, is_response=False) if result["success"]: print("--- Claude Output ---") print(result.get("output", "")) if result.get("awaiting_response"): print("\n--- AWAITING RESPONSE ---") print(f"Question: {result.get('question')}") return 0 print(f"Error: {result['message']}") return 1 if subcommand == "respond": if len(subargs) < 2: print("Usage: luzia cockpit respond ") return 1 project = subargs[0] answer = " ".join(subargs[1:]) result = cockpit_send(project, answer, is_response=True) if result["success"]: print("--- Claude Output ---") print(result.get("output", "")) if result.get("awaiting_response"): print("\n--- AWAITING RESPONSE ---") print(f"Question: {result.get('question')}") return 0 print(f"Error: {result['message']}") return 1 if subcommand == "output": if not subargs: print("Usage: luzia cockpit output ") return 1 result = cockpit_output(subargs[0]) if result["success"]: print(result["output"]) return 0 print(f"Error: {result.get('message', 'Unknown error')}") return 1 if subcommand == "status": project = subargs[0] if subargs else None result = cockpit_status(project) if not result["cockpits"]: print("No cockpits found") return 0 print(f"{'PROJECT':<15} {'STATUS':<10} {'SESSION':<36} {'WAITING'}") print("-" * 80) for cp in result["cockpits"]: waiting = "YES - " + (cp.get("last_question", "")[:20] + "..." if cp.get("last_question") else "") if cp.get("awaiting_response") else "no" session_id = cp.get('session_id') or '-' # Handle None values print(f"{cp['project']:<15} {cp['status']:<10} {session_id:<36} {waiting}") return 0 if subcommand == "attach": if not subargs: print("Usage: luzia cockpit attach ") return 1 cmd = cockpit_attach_cmd(subargs[0]) print(f"Run this command to attach:") print(f" {cmd}") return 0 if subcommand == "queue": if len(subargs) < 2: print("Usage: luzia cockpit queue ") print(" luzia cockpit queue --status") return 1 if subargs[0] == "--status": result = cockpit_queue_status() if result["success"]: print(json.dumps(result["status"], indent=2)) return 0 print(f"Error: {result['message']}") return 1 project = subargs[0] task = " ".join(subargs[1:]) result = cockpit_queue_task(project, task) if result["success"]: print(f"OK: {result['message']}") print(f" Task ID: {result['task_id']}") print(f" Queue position: {result.get('queue_position', 'unknown')}") return 0 print(f"Error: {result['message']}") return 1 if subcommand == "dispatch": # Run one dispatch cycle try: from cockpit_queue_dispatcher import CockpitQueueDispatcher import yaml config_path = Path("/opt/server-agents/orchestrator/config/luzia.yaml") if config_path.exists(): cfg = yaml.safe_load(config_path.read_text()) else: cfg = config dispatcher = CockpitQueueDispatcher(cfg) result = dispatcher.run_dispatch_cycle() print(json.dumps(result, indent=2)) return 0 except Exception as e: print(f"Error: {e}") return 1 print(f"Unknown subcommand: {subcommand}") return 1