Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
293 lines
8.7 KiB
Python
Executable File
293 lines
8.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Luz Orchestrator Daemon
|
|
|
|
Background service that:
|
|
1. Monitors a task queue for incoming requests
|
|
2. Routes tasks to appropriate project subagents
|
|
3. Manages resource usage and concurrency
|
|
4. Provides health monitoring
|
|
|
|
This replaces multiple persistent Claude sessions with
|
|
on-demand subagent execution.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
import signal
|
|
import subprocess
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any
|
|
from dataclasses import dataclass, asdict
|
|
from queue import Queue, Empty
|
|
from threading import Thread, Event
|
|
import socket
|
|
|
|
# Configuration
|
|
CONFIG_PATH = Path(__file__).parent / "config.json"
|
|
TASK_QUEUE_PATH = Path("/var/run/luz-orchestrator/tasks.json")
|
|
LOG_DIR = Path("/var/log/luz-orchestrator")
|
|
PID_FILE = Path("/var/run/luz-orchestrator/daemon.pid")
|
|
SOCKET_PATH = Path("/var/run/luz-orchestrator/orchestrator.sock")
|
|
|
|
# Ensure directories exist
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
TASK_QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Logging setup
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_DIR / "daemon.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class Task:
|
|
id: str
|
|
project: Optional[str]
|
|
prompt: str
|
|
tools: list
|
|
model: str
|
|
status: str = "pending"
|
|
result: Optional[str] = None
|
|
created_at: str = ""
|
|
completed_at: str = ""
|
|
|
|
def __post_init__(self):
|
|
if not self.created_at:
|
|
self.created_at = datetime.now().isoformat()
|
|
|
|
class OrchestratorDaemon:
|
|
def __init__(self):
|
|
self.config = self._load_config()
|
|
self.task_queue: Queue = Queue()
|
|
self.stop_event = Event()
|
|
self.active_tasks: Dict[str, Task] = {}
|
|
self.completed_tasks: list = []
|
|
self.max_completed = 100 # Keep last 100 completed tasks
|
|
|
|
def _load_config(self) -> dict:
|
|
"""Load configuration from file"""
|
|
if CONFIG_PATH.exists():
|
|
with open(CONFIG_PATH) as f:
|
|
return json.load(f)
|
|
return {"projects": {}, "orchestrator": {}}
|
|
|
|
def _save_pid(self):
|
|
"""Save PID file"""
|
|
with open(PID_FILE, 'w') as f:
|
|
f.write(str(os.getpid()))
|
|
|
|
def _remove_pid(self):
|
|
"""Remove PID file"""
|
|
if PID_FILE.exists():
|
|
PID_FILE.unlink()
|
|
|
|
def detect_project(self, prompt: str) -> Optional[str]:
|
|
"""Detect which project a prompt relates to"""
|
|
prompt_lower = prompt.lower()
|
|
|
|
# Check direct mentions
|
|
for name in self.config.get("projects", {}):
|
|
if name in prompt_lower:
|
|
return name
|
|
|
|
# Check path mentions
|
|
for name, cfg in self.config.get("projects", {}).items():
|
|
if cfg.get("path", "") in prompt:
|
|
return name
|
|
|
|
return None
|
|
|
|
def run_subagent(self, task: Task) -> str:
|
|
"""Execute a task using Claude subagent"""
|
|
project_config = self.config.get("projects", {}).get(task.project, {})
|
|
cwd = project_config.get("path", "/home/admin")
|
|
focus = project_config.get("focus", "")
|
|
|
|
# Build context-aware prompt
|
|
full_prompt = f"""You are a subagent for the {task.project or 'general'} project.
|
|
Working directory: {cwd}
|
|
Focus: {focus}
|
|
|
|
Task: {task.prompt}
|
|
|
|
Execute efficiently and return a concise summary."""
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"claude",
|
|
"-p", full_prompt,
|
|
"--output-format", "json",
|
|
"--allowedTools", ",".join(task.tools),
|
|
"--model", task.model
|
|
],
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=300
|
|
)
|
|
|
|
return result.stdout if result.returncode == 0 else f"Error: {result.stderr}"
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Task timed out after 5 minutes"
|
|
except Exception as e:
|
|
return f"Error: {str(e)}"
|
|
|
|
def process_task(self, task: Task):
|
|
"""Process a single task"""
|
|
logger.info(f"Processing task {task.id}: {task.prompt[:50]}...")
|
|
|
|
task.status = "running"
|
|
self.active_tasks[task.id] = task
|
|
|
|
try:
|
|
result = self.run_subagent(task)
|
|
task.result = result
|
|
task.status = "completed"
|
|
except Exception as e:
|
|
task.result = str(e)
|
|
task.status = "failed"
|
|
|
|
task.completed_at = datetime.now().isoformat()
|
|
|
|
# Move to completed
|
|
del self.active_tasks[task.id]
|
|
self.completed_tasks.append(task)
|
|
|
|
# Trim completed tasks
|
|
if len(self.completed_tasks) > self.max_completed:
|
|
self.completed_tasks = self.completed_tasks[-self.max_completed:]
|
|
|
|
logger.info(f"Task {task.id} {task.status}")
|
|
|
|
def worker_loop(self):
|
|
"""Main worker loop processing tasks"""
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
task = self.task_queue.get(timeout=1.0)
|
|
self.process_task(task)
|
|
except Empty:
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Worker error: {e}")
|
|
|
|
def submit_task(self, prompt: str, project: Optional[str] = None,
|
|
tools: Optional[list] = None, model: str = "haiku") -> str:
|
|
"""Submit a new task to the queue"""
|
|
task_id = f"task_{int(time.time() * 1000)}"
|
|
|
|
if not project:
|
|
project = self.detect_project(prompt)
|
|
|
|
project_config = self.config.get("projects", {}).get(project, {})
|
|
default_tools = project_config.get("tools", ["Read", "Glob", "Grep", "Bash"])
|
|
|
|
task = Task(
|
|
id=task_id,
|
|
project=project,
|
|
prompt=prompt,
|
|
tools=tools or default_tools,
|
|
model=model
|
|
)
|
|
|
|
self.task_queue.put(task)
|
|
logger.info(f"Submitted task {task_id} for project {project}")
|
|
|
|
return task_id
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get daemon status"""
|
|
return {
|
|
"running": True,
|
|
"pid": os.getpid(),
|
|
"queue_size": self.task_queue.qsize(),
|
|
"active_tasks": len(self.active_tasks),
|
|
"completed_tasks": len(self.completed_tasks),
|
|
"projects": list(self.config.get("projects", {}).keys()),
|
|
"uptime": time.time() - self.start_time
|
|
}
|
|
|
|
def handle_signal(self, signum, frame):
|
|
"""Handle shutdown signals"""
|
|
logger.info(f"Received signal {signum}, shutting down...")
|
|
self.stop_event.set()
|
|
|
|
def run(self):
|
|
"""Run the daemon"""
|
|
logger.info("Starting Luz Orchestrator Daemon")
|
|
|
|
# Set up signal handlers
|
|
signal.signal(signal.SIGTERM, self.handle_signal)
|
|
signal.signal(signal.SIGINT, self.handle_signal)
|
|
|
|
self._save_pid()
|
|
self.start_time = time.time()
|
|
|
|
# Start worker threads
|
|
workers = []
|
|
max_workers = self.config.get("orchestrator", {}).get("max_concurrent_subagents", 3)
|
|
|
|
for i in range(max_workers):
|
|
worker = Thread(target=self.worker_loop, name=f"worker-{i}")
|
|
worker.daemon = True
|
|
worker.start()
|
|
workers.append(worker)
|
|
|
|
logger.info(f"Started {max_workers} worker threads")
|
|
|
|
# Main loop - could add socket server for IPC here
|
|
try:
|
|
while not self.stop_event.is_set():
|
|
time.sleep(1)
|
|
finally:
|
|
self._remove_pid()
|
|
logger.info("Daemon stopped")
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Luz Orchestrator Daemon")
|
|
parser.add_argument("--status", action="store_true", help="Check daemon status")
|
|
parser.add_argument("--submit", help="Submit a task")
|
|
parser.add_argument("--project", help="Target project for task")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.status:
|
|
if PID_FILE.exists():
|
|
pid = int(PID_FILE.read_text().strip())
|
|
try:
|
|
os.kill(pid, 0) # Check if process exists
|
|
print(f"Daemon running (PID: {pid})")
|
|
except OSError:
|
|
print("Daemon not running (stale PID file)")
|
|
else:
|
|
print("Daemon not running")
|
|
return
|
|
|
|
if args.submit:
|
|
# For now, just run the task directly
|
|
# In production, would connect to daemon via socket
|
|
daemon = OrchestratorDaemon()
|
|
task_id = daemon.submit_task(args.submit, args.project)
|
|
print(f"Submitted: {task_id}")
|
|
return
|
|
|
|
# Run daemon
|
|
daemon = OrchestratorDaemon()
|
|
daemon.run()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|