Files
luzia/lib/service_manager.py
admin ec33ac1936 Refactor cockpit to use DockerTmuxController pattern
Based on claude-code-tools TmuxCLIController, this refactor:

- Added DockerTmuxController class for robust tmux session management
- Implements send_keys() with configurable delay_enter
- Implements capture_pane() for output retrieval
- Implements wait_for_prompt() for pattern-based completion detection
- Implements wait_for_idle() for content-hash-based idle detection
- Implements wait_for_shell_prompt() for shell prompt detection

Also includes workflow improvements:
- Pre-task git snapshot before agent execution
- Post-task commit protocol in agent guidelines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:42:16 -03:00

347 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Service Manager for Luzia Cockpits
Allows cockpits to manage project services without direct network access.
Services run as project user outside the sandbox.
Usage:
luzia service start <project> <service>
luzia service stop <project> <service>
luzia service status [project]
luzia service list <project>
"""
import json
import os
import subprocess
import signal
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
import yaml
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Paths
SERVICES_STATE_DIR = Path("/var/lib/luz-orchestrator/services")
SERVICES_STATE_DIR.mkdir(parents=True, exist_ok=True)
# Default service definitions (fallback if no services.yml)
DEFAULT_SERVICES = {
"musica": {
"backend": {
"command": "source backend/venv/bin/activate && uvicorn app.main:app --host 0.0.0.0 --port 8100 --app-dir backend",
"workdir": "/home/musica",
"port": 8100,
"description": "MU Backend API"
},
"frontend": {
"command": "cd frontend && ./node_modules/.bin/vite --port 5175 --host",
"workdir": "/home/musica",
"port": 5175,
"description": "MU Frontend Dev Server"
}
},
"librechat": {
"chat-hub": {
"command": "cd chat-hub && uvicorn server:app --host 0.0.0.0 --port 3200",
"workdir": "/home/librechat",
"port": 3200,
"description": "Chat Hub Server"
}
},
"dss": {
"api": {
"command": "docker compose up",
"workdir": "/home/dss/sofi-design-system/packages/dss-server",
"port": 6220,
"description": "DSS API Server"
}
}
}
class ServiceManager:
"""Manages project services outside cockpit sandbox."""
def __init__(self):
self.state_file = SERVICES_STATE_DIR / "running.json"
self.state = self._load_state()
def _load_state(self) -> Dict:
"""Load running services state."""
if self.state_file.exists():
try:
return json.loads(self.state_file.read_text())
except:
pass
return {"services": {}}
def _save_state(self):
"""Save running services state."""
self.state_file.write_text(json.dumps(self.state, indent=2))
def get_service_config(self, project: str, service: str) -> Optional[Dict]:
"""Get service configuration from project's services.yml or defaults."""
# Try project-specific services.yml
services_file = Path(f"/home/{project}/services.yml")
if services_file.exists():
try:
with open(services_file) as f:
config = yaml.safe_load(f)
if config and "services" in config:
return config["services"].get(service)
except Exception as e:
logger.warning(f"Error reading services.yml: {e}")
# Fall back to defaults
if project in DEFAULT_SERVICES:
return DEFAULT_SERVICES[project].get(service)
return None
def list_services(self, project: str) -> List[Dict]:
"""List available services for a project."""
services = []
# Check services.yml
services_file = Path(f"/home/{project}/services.yml")
if services_file.exists():
try:
with open(services_file) as f:
config = yaml.safe_load(f)
if config and "services" in config:
for name, svc in config["services"].items():
services.append({
"name": name,
"port": svc.get("port"),
"description": svc.get("description", ""),
"source": "services.yml"
})
except:
pass
# Add defaults if not in services.yml
if project in DEFAULT_SERVICES:
existing_names = {s["name"] for s in services}
for name, svc in DEFAULT_SERVICES[project].items():
if name not in existing_names:
services.append({
"name": name,
"port": svc.get("port"),
"description": svc.get("description", ""),
"source": "default"
})
return services
def start_service(self, project: str, service: str) -> Dict:
"""Start a service for a project."""
config = self.get_service_config(project, service)
if not config:
return {"success": False, "error": f"Service '{service}' not found for project '{project}'"}
# Check if already running
key = f"{project}/{service}"
if key in self.state["services"]:
pid = self.state["services"][key].get("pid")
if pid and self._is_process_running(pid):
return {"success": False, "error": f"Service already running (PID {pid})"}
# Start the service
command = config["command"]
workdir = config.get("workdir", f"/home/{project}")
port = config.get("port")
# Check if port is already in use
if port and self._is_port_in_use(port):
return {"success": False, "error": f"Port {port} already in use"}
try:
# Run as project user with nohup
full_cmd = f"cd {workdir} && nohup bash -c '{command}' > /tmp/{project}-{service}.log 2>&1 & echo $!"
result = subprocess.run(
["sudo", "-u", project, "bash", "-c", full_cmd],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
return {"success": False, "error": f"Failed to start: {result.stderr}"}
pid = int(result.stdout.strip()) if result.stdout.strip().isdigit() else None
# Save state
self.state["services"][key] = {
"pid": pid,
"port": port,
"started_at": datetime.now().isoformat(),
"command": command,
"workdir": workdir
}
self._save_state()
return {
"success": True,
"service": service,
"project": project,
"pid": pid,
"port": port,
"log": f"/tmp/{project}-{service}.log"
}
except Exception as e:
return {"success": False, "error": str(e)}
def stop_service(self, project: str, service: str) -> Dict:
"""Stop a running service."""
key = f"{project}/{service}"
if key not in self.state["services"]:
return {"success": False, "error": f"Service '{service}' not running for '{project}'"}
svc = self.state["services"][key]
pid = svc.get("pid")
if pid:
try:
os.kill(pid, signal.SIGTERM)
# Give it a moment to terminate
import time
time.sleep(1)
# Force kill if still running
if self._is_process_running(pid):
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass # Already dead
except Exception as e:
return {"success": False, "error": str(e)}
# Remove from state
del self.state["services"][key]
self._save_state()
return {"success": True, "service": service, "project": project, "stopped_pid": pid}
def status(self, project: str = None) -> Dict:
"""Get status of running services."""
result = {"services": []}
for key, svc in self.state["services"].items():
proj, name = key.split("/", 1)
if project and proj != project:
continue
pid = svc.get("pid")
running = self._is_process_running(pid) if pid else False
port_open = self._is_port_in_use(svc.get("port")) if svc.get("port") else None
result["services"].append({
"project": proj,
"service": name,
"pid": pid,
"port": svc.get("port"),
"running": running,
"port_responding": port_open,
"started_at": svc.get("started_at"),
"log": f"/tmp/{proj}-{name}.log"
})
return result
def _is_process_running(self, pid: int) -> bool:
"""Check if a process is running."""
try:
# Use /proc check instead of kill to avoid permission issues
return Path(f"/proc/{pid}").exists()
except (TypeError, ValueError):
return False
def _is_port_in_use(self, port: int) -> bool:
"""Check if a port is in use."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
# CLI functions
def cmd_start(project: str, service: str) -> str:
"""Start a service."""
mgr = ServiceManager()
result = mgr.start_service(project, service)
if result["success"]:
return f"✅ Started {service} for {project} (PID: {result.get('pid')}, port: {result.get('port')})"
return f"❌ Failed: {result['error']}"
def cmd_stop(project: str, service: str) -> str:
"""Stop a service."""
mgr = ServiceManager()
result = mgr.stop_service(project, service)
if result["success"]:
return f"✅ Stopped {service} for {project}"
return f"❌ Failed: {result['error']}"
def cmd_status(project: str = None) -> str:
"""Get service status."""
mgr = ServiceManager()
result = mgr.status(project)
if not result["services"]:
return "No services running" + (f" for {project}" if project else "")
lines = ["SERVICE STATUS", "=" * 50]
for svc in result["services"]:
status = "✅ RUNNING" if svc["running"] else "❌ STOPPED"
port_status = f", port {svc['port']} {'open' if svc['port_responding'] else 'closed'}" if svc.get("port") else ""
lines.append(f"{svc['project']}/{svc['service']}: {status} (PID {svc['pid']}{port_status})")
return "\n".join(lines)
def cmd_list(project: str) -> str:
"""List available services."""
mgr = ServiceManager()
services = mgr.list_services(project)
if not services:
return f"No services defined for {project}"
lines = [f"SERVICES FOR {project.upper()}", "=" * 50]
for svc in services:
lines.append(f" {svc['name']}: port {svc.get('port', 'N/A')} - {svc.get('description', '')} [{svc['source']}]")
return "\n".join(lines)
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage:")
print(" service_manager.py start <project> <service>")
print(" service_manager.py stop <project> <service>")
print(" service_manager.py status [project]")
print(" service_manager.py list <project>")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "start" and len(sys.argv) >= 4:
print(cmd_start(sys.argv[2], sys.argv[3]))
elif cmd == "stop" and len(sys.argv) >= 4:
print(cmd_stop(sys.argv[2], sys.argv[3]))
elif cmd == "status":
print(cmd_status(sys.argv[2] if len(sys.argv) > 2 else None))
elif cmd == "list" and len(sys.argv) >= 3:
print(cmd_list(sys.argv[2]))
else:
print(f"Unknown command: {cmd}")
sys.exit(1)