Based on claude-code-tools TmuxCLIController, this refactor: - Added DockerTmuxController class for robust tmux session management - Implements send_keys() with configurable delay_enter - Implements capture_pane() for output retrieval - Implements wait_for_prompt() for pattern-based completion detection - Implements wait_for_idle() for content-hash-based idle detection - Implements wait_for_shell_prompt() for shell prompt detection Also includes workflow improvements: - Pre-task git snapshot before agent execution - Post-task commit protocol in agent guidelines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
125 lines
3.7 KiB
Python
125 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Service Request Handler for Cockpits
|
|
|
|
Cockpits can request services by writing to:
|
|
/var/cockpit/service_requests/<project>/<action>-<service>.request
|
|
|
|
This watcher processes those requests and writes responses.
|
|
|
|
Usage from cockpit:
|
|
echo '{"action":"start","service":"backend"}' > /var/cockpit/service_requests/musica/start-backend.request
|
|
|
|
Or use the helper script mounted in the container.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Paths
|
|
REQUESTS_DIR = Path("/var/lib/luz-orchestrator/cockpits/service_requests")
|
|
REQUESTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def process_request(request_file: Path) -> dict:
|
|
"""Process a service request file."""
|
|
try:
|
|
content = request_file.read_text()
|
|
req = json.loads(content)
|
|
except Exception as e:
|
|
return {"success": False, "error": f"Invalid request: {e}"}
|
|
|
|
action = req.get("action")
|
|
service = req.get("service")
|
|
project = request_file.parent.name # Directory name is project
|
|
|
|
if not action:
|
|
return {"success": False, "error": "Missing action"}
|
|
|
|
# These actions don't need a service name
|
|
if action not in ["status", "list"] and not service:
|
|
return {"success": False, "error": "Missing service name for start/stop"}
|
|
|
|
# Import and use service manager
|
|
from service_manager import ServiceManager
|
|
mgr = ServiceManager()
|
|
|
|
if action == "start":
|
|
result = mgr.start_service(project, service)
|
|
elif action == "stop":
|
|
result = mgr.stop_service(project, service)
|
|
elif action == "status":
|
|
result = mgr.status(project)
|
|
elif action == "list":
|
|
services = mgr.list_services(project)
|
|
result = {"success": True, "services": services}
|
|
else:
|
|
result = {"success": False, "error": f"Unknown action: {action}"}
|
|
|
|
return result
|
|
|
|
|
|
def process_all_requests():
|
|
"""Process all pending requests."""
|
|
processed = 0
|
|
|
|
for project_dir in REQUESTS_DIR.iterdir():
|
|
if not project_dir.is_dir():
|
|
continue
|
|
|
|
for req_file in project_dir.glob("*.request"):
|
|
logger.info(f"Processing: {req_file}")
|
|
|
|
# Process the request
|
|
result = process_request(req_file)
|
|
|
|
# Write response
|
|
response_file = req_file.with_suffix(".response")
|
|
response_file.write_text(json.dumps({
|
|
"result": result,
|
|
"processed_at": datetime.now().isoformat()
|
|
}, indent=2))
|
|
|
|
# Remove the request file
|
|
req_file.unlink()
|
|
processed += 1
|
|
|
|
logger.info(f"Processed: {req_file.name} -> {result.get('success', False)}")
|
|
|
|
return processed
|
|
|
|
|
|
def run_watcher(interval: int = 5):
|
|
"""Run continuous watcher for service requests."""
|
|
logger.info(f"Starting service request watcher (interval: {interval}s)")
|
|
logger.info(f"Watching: {REQUESTS_DIR}")
|
|
|
|
while True:
|
|
try:
|
|
processed = process_all_requests()
|
|
if processed:
|
|
logger.info(f"Processed {processed} requests")
|
|
except Exception as e:
|
|
logger.error(f"Error processing requests: {e}")
|
|
|
|
time.sleep(interval)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) > 1 and sys.argv[1] == "daemon":
|
|
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 5
|
|
run_watcher(interval)
|
|
else:
|
|
processed = process_all_requests()
|
|
print(f"Processed {processed} requests")
|