#!/usr/bin/env python3 """ Port Manager - Server Port Allocation and Conflict Resolution Rules: 1. Applications should have their own dedicated port (not generic ports) 2. If using generic port, set up custom port for luzia 3. If app port is in use, check who is using it - if same service, kill and restart 4. NEVER run multiple servers by adding +1 to port number 5. Maintain port registry to track which ports belong to which services """ import json import subprocess import os import signal import sys from pathlib import Path from typing import Dict, Optional, Tuple, List from datetime import datetime REGISTRY_PATH = Path("/var/lib/luz/port-registry.json") # Reserved port ranges for different service types PORT_RANGES = { "livekit-agents": (8081, 8099), # LiveKit voice agents "web-apps": (3000, 3999), # Web applications "api-services": (8000, 8080), # API services "databases": (5432, 5499), # Database services "mcp-servers": (8100, 8199), # MCP servers "monitoring": (9000, 9099), # Prometheus, metrics "internal": (8200, 8299), # Internal services } # Known services and their dedicated ports DEDICATED_PORTS = { # LiveKit ecosystem "livekit-server": 7880, "livekit-agent-main": 8081, "livekit-web": 8082, "luzia-agent-elevenlabs": 8083, "luzia-voice-chat": 8084, # Web services "ia-luzia-web": 8091, "auth-luz": 8090, "luzuy-admin": 8092, # LLM/AI services "litellm-proxy": 8005, "ollama-api": 11434, # MCP servers "hybrid-memory-mcp": 8100, "sarlo-admin-mcp": 8101, "assistant-channel-mcp": 8102, "task-queue-mcp": 8103, "backup-mcp": 8104, "git-mcp": 8105, # Orchestrator "luzia-orchestrator": 8200, "luzia-status-publisher": 8201, # Docker services (via proxy) "librechat": 8000, "grafana": 3000, "prometheus": 9090, # Other services "whatsapp-webhook": 8281, "agentic-server": 8321, "voice-server": 8765, } def load_registry() -> Dict: """Load the port registry from disk""" if REGISTRY_PATH.exists(): try: return json.loads(REGISTRY_PATH.read_text()) except json.JSONDecodeError: return {"allocations": {}, "history": []} return {"allocations": {}, "history": []} def save_registry(registry: Dict) -> None: """Save the port registry to disk""" REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True) REGISTRY_PATH.write_text(json.dumps(registry, indent=2)) def get_port_user(port: int) -> Optional[Dict]: """ Check what process is using a port. Returns dict with pid, process name, cmdline, or None if port is free. """ import re try: # Use ss with sudo to get process info result = subprocess.run( ["sudo", "ss", "-tlnp"], capture_output=True, text=True, timeout=5 ) port_pattern = f":{port}\\s" for line in result.stdout.strip().split('\n')[1:]: # Skip header if not line.strip(): continue # Check if this line contains our port if not re.search(port_pattern, line): continue # Parse ss output to find PID # Format: LISTEN 0 100 0.0.0.0:8081 0.0.0.0:* users:(("python3",pid=12345,fd=5)) if 'pid=' in line: pid_match = re.search(r'pid=(\d+)', line) if pid_match: pid = int(pid_match.group(1)) # Get process info try: cmdline = Path(f"/proc/{pid}/cmdline").read_text().replace('\x00', ' ').strip() comm = Path(f"/proc/{pid}/comm").read_text().strip() return { "pid": pid, "process": comm, "cmdline": cmdline, "port": port } except (FileNotFoundError, PermissionError): return {"pid": pid, "process": "unknown", "cmdline": "", "port": port} return None except subprocess.TimeoutExpired: return None except Exception as e: print(f"Error checking port {port}: {e}", file=sys.stderr) return None def is_same_service(service_name: str, port_user: Dict) -> bool: """Check if the process using the port is the same service""" if not port_user: return False cmdline = port_user.get("cmdline", "").lower() process = port_user.get("process", "").lower() service_lower = service_name.lower().replace("-", "").replace("_", "") # Check if service name appears in cmdline or process name checks = [ service_lower in cmdline.replace("-", "").replace("_", ""), service_lower in process.replace("-", "").replace("_", ""), service_name.replace("-", "_") in cmdline, service_name.replace("_", "-") in cmdline, ] return any(checks) def kill_process(pid: int, service_name: str, force: bool = False) -> bool: """Kill a process, optionally with force""" try: sig = signal.SIGKILL if force else signal.SIGTERM os.kill(pid, sig) print(f"Sent {sig.name} to PID {pid} ({service_name})") return True except ProcessLookupError: print(f"Process {pid} already terminated") return True except PermissionError: print(f"Permission denied to kill PID {pid}") return False except Exception as e: print(f"Error killing PID {pid}: {e}") return False def allocate_port(service_name: str, preferred_port: Optional[int] = None) -> Tuple[int, str]: """ Allocate a port for a service. Returns: (port, status_message) Logic: 1. Check if service has a dedicated port 2. If preferred_port specified, try to use it 3. If port is in use by same service, kill and reuse 4. If port is in use by different service, FAIL (don't increment!) 5. Register the allocation """ registry = load_registry() # Step 1: Check for dedicated port if service_name in DEDICATED_PORTS: port = DEDICATED_PORTS[service_name] print(f"Service '{service_name}' has dedicated port: {port}") elif preferred_port: port = preferred_port print(f"Using preferred port: {port}") else: # Find from registry or allocate new if service_name in registry["allocations"]: port = registry["allocations"][service_name]["port"] print(f"Found registered port for '{service_name}': {port}") else: return -1, f"ERROR: No dedicated port for '{service_name}'. Add to DEDICATED_PORTS or specify --port" # Step 2: Check if port is in use port_user = get_port_user(port) if port_user: print(f"Port {port} is in use by: {port_user}") # Step 3: Check if same service if is_same_service(service_name, port_user): print(f"Same service detected. Killing PID {port_user['pid']} to restart...") if kill_process(port_user["pid"], service_name): # Wait a moment for port to be released import time time.sleep(1) # Verify port is now free if get_port_user(port): # Try SIGKILL kill_process(port_user["pid"], service_name, force=True) time.sleep(1) if get_port_user(port): return -1, f"ERROR: Could not free port {port} after killing process" # Record in history registry["history"].append({ "action": "kill_restart", "service": service_name, "port": port, "killed_pid": port_user["pid"], "timestamp": datetime.now().isoformat() }) else: return -1, f"ERROR: Could not kill process on port {port}" else: # Different service - DO NOT INCREMENT, FAIL! return -1, ( f"ERROR: Port {port} is in use by DIFFERENT service!\n" f" Current user: {port_user['process']} (PID {port_user['pid']})\n" f" Cmdline: {port_user['cmdline'][:100]}...\n" f" Requested: {service_name}\n" f"\nRESOLUTION OPTIONS:\n" f" 1. Stop the conflicting service first\n" f" 2. Assign a different dedicated port to '{service_name}'\n" f" 3. Check if both services should share the port (unlikely)\n" f"\nDO NOT use port {port + 1} - that creates port sprawl!" ) # Step 4: Register allocation registry["allocations"][service_name] = { "port": port, "allocated_at": datetime.now().isoformat(), "dedicated": service_name in DEDICATED_PORTS } save_registry(registry) return port, f"OK: Port {port} allocated to '{service_name}'" def list_allocations() -> str: """List all port allocations""" registry = load_registry() lines = ["PORT REGISTRY", "=" * 60] # Show dedicated ports lines.append("\nDEDICATED PORTS:") lines.append("-" * 40) for service, port in sorted(DEDICATED_PORTS.items(), key=lambda x: x[1]): user = get_port_user(port) status = "ACTIVE" if user else "idle" lines.append(f" {port:5d} {service:<30} [{status}]") # Show runtime allocations if registry["allocations"]: lines.append("\nRUNTIME ALLOCATIONS:") lines.append("-" * 40) for service, info in sorted(registry["allocations"].items(), key=lambda x: x[1]["port"]): if service not in DEDICATED_PORTS: user = get_port_user(info["port"]) status = "ACTIVE" if user else "idle" lines.append(f" {info['port']:5d} {service:<30} [{status}]") # Show active but unregistered lines.append("\nACTIVE PORTS (8000-9000):") lines.append("-" * 40) known_ports = set(DEDICATED_PORTS.values()) known_ports.update(info["port"] for info in registry["allocations"].values()) for port in range(8000, 9001): if port in known_ports: continue user = get_port_user(port) if user: lines.append(f" {port:5d} {user['process']:<20} PID={user['pid']} [UNREGISTERED!]") return "\n".join(lines) def check_port(port: int) -> str: """Check what's using a specific port""" user = get_port_user(port) if not user: return f"Port {port}: FREE" # Check if registered registry = load_registry() registered_service = None for service, info in registry["allocations"].items(): if info["port"] == port: registered_service = service break for service, p in DEDICATED_PORTS.items(): if p == port: registered_service = service break lines = [ f"Port {port}: IN USE", f" PID: {user['pid']}", f" Process: {user['process']}", f" Cmdline: {user['cmdline'][:100]}", f" Registered: {registered_service or 'NO (unregistered!)'}", ] return "\n".join(lines) def release_port(service_name: str) -> str: """Release a port allocation (does not kill the process)""" registry = load_registry() if service_name in registry["allocations"]: info = registry["allocations"].pop(service_name) registry["history"].append({ "action": "release", "service": service_name, "port": info["port"], "timestamp": datetime.now().isoformat() }) save_registry(registry) return f"Released port {info['port']} from '{service_name}'" return f"Service '{service_name}' has no runtime allocation" def suggest_port(service_type: str) -> int: """Suggest an available port for a service type""" if service_type not in PORT_RANGES: print(f"Unknown service type: {service_type}") print(f"Known types: {', '.join(PORT_RANGES.keys())}") return -1 start, end = PORT_RANGES[service_type] registry = load_registry() used_ports = set(DEDICATED_PORTS.values()) used_ports.update(info["port"] for info in registry["allocations"].values()) for port in range(start, end + 1): if port not in used_ports and not get_port_user(port): return port return -1 # CLI interface def main(): import argparse parser = argparse.ArgumentParser( description="Port Manager - Server Port Allocation and Conflict Resolution", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: port_manager.py allocate my-service --port 8085 port_manager.py check 8081 port_manager.py list port_manager.py suggest web-apps port_manager.py release my-service """ ) subparsers = parser.add_subparsers(dest="command", required=True) # allocate alloc = subparsers.add_parser("allocate", help="Allocate a port to a service") alloc.add_argument("service", help="Service name") alloc.add_argument("--port", "-p", type=int, help="Preferred port") # check check = subparsers.add_parser("check", help="Check what's using a port") check.add_argument("port", type=int, help="Port number") # list subparsers.add_parser("list", help="List all port allocations") # suggest suggest = subparsers.add_parser("suggest", help="Suggest an available port") suggest.add_argument("type", choices=PORT_RANGES.keys(), help="Service type") # release release = subparsers.add_parser("release", help="Release a port allocation") release.add_argument("service", help="Service name") args = parser.parse_args() if args.command == "allocate": port, msg = allocate_port(args.service, args.port) print(msg) sys.exit(0 if port > 0 else 1) elif args.command == "check": print(check_port(args.port)) elif args.command == "list": print(list_allocations()) elif args.command == "suggest": port = suggest_port(args.type) if port > 0: print(f"Suggested port for {args.type}: {port}") else: print(f"No available ports in {args.type} range") sys.exit(1) elif args.command == "release": print(release_port(args.service)) if __name__ == "__main__": main()