Files
luzia/lib/port_manager.py
admin 45ec8828a1 Add port management skill and CLI commands
- Add lib/port_manager.py for server port allocation
- Rules: dedicated ports, no +1 increment, kill same service on conflict
- Add 'luzia port' CLI commands (list/check/allocate/release/suggest)
- Add .gitignore for __pycache__

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 12:33:33 -03:00

449 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Port Manager - Server Port Allocation and Conflict Resolution
Rules:
1. Applications should have their own dedicated port (not generic ports)
2. If using generic port, set up custom port for luzia
3. If app port is in use, check who is using it - if same service, kill and restart
4. NEVER run multiple servers by adding +1 to port number
5. Maintain port registry to track which ports belong to which services
"""
import json
import subprocess
import os
import signal
import sys
from pathlib import Path
from typing import Dict, Optional, Tuple, List
from datetime import datetime
REGISTRY_PATH = Path("/var/lib/luz/port-registry.json")
# Reserved port ranges for different service types
PORT_RANGES = {
"livekit-agents": (8081, 8099), # LiveKit voice agents
"web-apps": (3000, 3999), # Web applications
"api-services": (8000, 8080), # API services
"databases": (5432, 5499), # Database services
"mcp-servers": (8100, 8199), # MCP servers
"monitoring": (9000, 9099), # Prometheus, metrics
"internal": (8200, 8299), # Internal services
}
# Known services and their dedicated ports
DEDICATED_PORTS = {
# LiveKit ecosystem
"livekit-server": 7880,
"livekit-agent-main": 8081,
"livekit-web": 8082,
"luzia-agent-elevenlabs": 8083,
"luzia-voice-chat": 8084,
# Web services
"ia-luzia-web": 8091,
"auth-luz": 8090,
"luzuy-admin": 8092,
# LLM/AI services
"litellm-proxy": 8005,
"ollama-api": 11434,
# MCP servers
"hybrid-memory-mcp": 8100,
"sarlo-admin-mcp": 8101,
"assistant-channel-mcp": 8102,
"task-queue-mcp": 8103,
"backup-mcp": 8104,
"git-mcp": 8105,
# Orchestrator
"luzia-orchestrator": 8200,
"luzia-status-publisher": 8201,
# Docker services (via proxy)
"librechat": 8000,
"grafana": 3000,
"prometheus": 9090,
# Other services
"whatsapp-webhook": 8281,
"agentic-server": 8321,
"voice-server": 8765,
}
def load_registry() -> Dict:
"""Load the port registry from disk"""
if REGISTRY_PATH.exists():
try:
return json.loads(REGISTRY_PATH.read_text())
except json.JSONDecodeError:
return {"allocations": {}, "history": []}
return {"allocations": {}, "history": []}
def save_registry(registry: Dict) -> None:
"""Save the port registry to disk"""
REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
REGISTRY_PATH.write_text(json.dumps(registry, indent=2))
def get_port_user(port: int) -> Optional[Dict]:
"""
Check what process is using a port.
Returns dict with pid, process name, cmdline, or None if port is free.
"""
import re
try:
# Use ss with sudo to get process info
result = subprocess.run(
["sudo", "ss", "-tlnp"],
capture_output=True,
text=True,
timeout=5
)
port_pattern = f":{port}\\s"
for line in result.stdout.strip().split('\n')[1:]: # Skip header
if not line.strip():
continue
# Check if this line contains our port
if not re.search(port_pattern, line):
continue
# Parse ss output to find PID
# Format: LISTEN 0 100 0.0.0.0:8081 0.0.0.0:* users:(("python3",pid=12345,fd=5))
if 'pid=' in line:
pid_match = re.search(r'pid=(\d+)', line)
if pid_match:
pid = int(pid_match.group(1))
# Get process info
try:
cmdline = Path(f"/proc/{pid}/cmdline").read_text().replace('\x00', ' ').strip()
comm = Path(f"/proc/{pid}/comm").read_text().strip()
return {
"pid": pid,
"process": comm,
"cmdline": cmdline,
"port": port
}
except (FileNotFoundError, PermissionError):
return {"pid": pid, "process": "unknown", "cmdline": "", "port": port}
return None
except subprocess.TimeoutExpired:
return None
except Exception as e:
print(f"Error checking port {port}: {e}", file=sys.stderr)
return None
def is_same_service(service_name: str, port_user: Dict) -> bool:
"""Check if the process using the port is the same service"""
if not port_user:
return False
cmdline = port_user.get("cmdline", "").lower()
process = port_user.get("process", "").lower()
service_lower = service_name.lower().replace("-", "").replace("_", "")
# Check if service name appears in cmdline or process name
checks = [
service_lower in cmdline.replace("-", "").replace("_", ""),
service_lower in process.replace("-", "").replace("_", ""),
service_name.replace("-", "_") in cmdline,
service_name.replace("_", "-") in cmdline,
]
return any(checks)
def kill_process(pid: int, service_name: str, force: bool = False) -> bool:
"""Kill a process, optionally with force"""
try:
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
print(f"Sent {sig.name} to PID {pid} ({service_name})")
return True
except ProcessLookupError:
print(f"Process {pid} already terminated")
return True
except PermissionError:
print(f"Permission denied to kill PID {pid}")
return False
except Exception as e:
print(f"Error killing PID {pid}: {e}")
return False
def allocate_port(service_name: str, preferred_port: Optional[int] = None) -> Tuple[int, str]:
"""
Allocate a port for a service.
Returns: (port, status_message)
Logic:
1. Check if service has a dedicated port
2. If preferred_port specified, try to use it
3. If port is in use by same service, kill and reuse
4. If port is in use by different service, FAIL (don't increment!)
5. Register the allocation
"""
registry = load_registry()
# Step 1: Check for dedicated port
if service_name in DEDICATED_PORTS:
port = DEDICATED_PORTS[service_name]
print(f"Service '{service_name}' has dedicated port: {port}")
elif preferred_port:
port = preferred_port
print(f"Using preferred port: {port}")
else:
# Find from registry or allocate new
if service_name in registry["allocations"]:
port = registry["allocations"][service_name]["port"]
print(f"Found registered port for '{service_name}': {port}")
else:
return -1, f"ERROR: No dedicated port for '{service_name}'. Add to DEDICATED_PORTS or specify --port"
# Step 2: Check if port is in use
port_user = get_port_user(port)
if port_user:
print(f"Port {port} is in use by: {port_user}")
# Step 3: Check if same service
if is_same_service(service_name, port_user):
print(f"Same service detected. Killing PID {port_user['pid']} to restart...")
if kill_process(port_user["pid"], service_name):
# Wait a moment for port to be released
import time
time.sleep(1)
# Verify port is now free
if get_port_user(port):
# Try SIGKILL
kill_process(port_user["pid"], service_name, force=True)
time.sleep(1)
if get_port_user(port):
return -1, f"ERROR: Could not free port {port} after killing process"
# Record in history
registry["history"].append({
"action": "kill_restart",
"service": service_name,
"port": port,
"killed_pid": port_user["pid"],
"timestamp": datetime.now().isoformat()
})
else:
return -1, f"ERROR: Could not kill process on port {port}"
else:
# Different service - DO NOT INCREMENT, FAIL!
return -1, (
f"ERROR: Port {port} is in use by DIFFERENT service!\n"
f" Current user: {port_user['process']} (PID {port_user['pid']})\n"
f" Cmdline: {port_user['cmdline'][:100]}...\n"
f" Requested: {service_name}\n"
f"\nRESOLUTION OPTIONS:\n"
f" 1. Stop the conflicting service first\n"
f" 2. Assign a different dedicated port to '{service_name}'\n"
f" 3. Check if both services should share the port (unlikely)\n"
f"\nDO NOT use port {port + 1} - that creates port sprawl!"
)
# Step 4: Register allocation
registry["allocations"][service_name] = {
"port": port,
"allocated_at": datetime.now().isoformat(),
"dedicated": service_name in DEDICATED_PORTS
}
save_registry(registry)
return port, f"OK: Port {port} allocated to '{service_name}'"
def list_allocations() -> str:
"""List all port allocations"""
registry = load_registry()
lines = ["PORT REGISTRY", "=" * 60]
# Show dedicated ports
lines.append("\nDEDICATED PORTS:")
lines.append("-" * 40)
for service, port in sorted(DEDICATED_PORTS.items(), key=lambda x: x[1]):
user = get_port_user(port)
status = "ACTIVE" if user else "idle"
lines.append(f" {port:5d} {service:<30} [{status}]")
# Show runtime allocations
if registry["allocations"]:
lines.append("\nRUNTIME ALLOCATIONS:")
lines.append("-" * 40)
for service, info in sorted(registry["allocations"].items(), key=lambda x: x[1]["port"]):
if service not in DEDICATED_PORTS:
user = get_port_user(info["port"])
status = "ACTIVE" if user else "idle"
lines.append(f" {info['port']:5d} {service:<30} [{status}]")
# Show active but unregistered
lines.append("\nACTIVE PORTS (8000-9000):")
lines.append("-" * 40)
known_ports = set(DEDICATED_PORTS.values())
known_ports.update(info["port"] for info in registry["allocations"].values())
for port in range(8000, 9001):
if port in known_ports:
continue
user = get_port_user(port)
if user:
lines.append(f" {port:5d} {user['process']:<20} PID={user['pid']} [UNREGISTERED!]")
return "\n".join(lines)
def check_port(port: int) -> str:
"""Check what's using a specific port"""
user = get_port_user(port)
if not user:
return f"Port {port}: FREE"
# Check if registered
registry = load_registry()
registered_service = None
for service, info in registry["allocations"].items():
if info["port"] == port:
registered_service = service
break
for service, p in DEDICATED_PORTS.items():
if p == port:
registered_service = service
break
lines = [
f"Port {port}: IN USE",
f" PID: {user['pid']}",
f" Process: {user['process']}",
f" Cmdline: {user['cmdline'][:100]}",
f" Registered: {registered_service or 'NO (unregistered!)'}",
]
return "\n".join(lines)
def release_port(service_name: str) -> str:
"""Release a port allocation (does not kill the process)"""
registry = load_registry()
if service_name in registry["allocations"]:
info = registry["allocations"].pop(service_name)
registry["history"].append({
"action": "release",
"service": service_name,
"port": info["port"],
"timestamp": datetime.now().isoformat()
})
save_registry(registry)
return f"Released port {info['port']} from '{service_name}'"
return f"Service '{service_name}' has no runtime allocation"
def suggest_port(service_type: str) -> int:
"""Suggest an available port for a service type"""
if service_type not in PORT_RANGES:
print(f"Unknown service type: {service_type}")
print(f"Known types: {', '.join(PORT_RANGES.keys())}")
return -1
start, end = PORT_RANGES[service_type]
registry = load_registry()
used_ports = set(DEDICATED_PORTS.values())
used_ports.update(info["port"] for info in registry["allocations"].values())
for port in range(start, end + 1):
if port not in used_ports and not get_port_user(port):
return port
return -1
# CLI interface
def main():
import argparse
parser = argparse.ArgumentParser(
description="Port Manager - Server Port Allocation and Conflict Resolution",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
port_manager.py allocate my-service --port 8085
port_manager.py check 8081
port_manager.py list
port_manager.py suggest web-apps
port_manager.py release my-service
"""
)
subparsers = parser.add_subparsers(dest="command", required=True)
# allocate
alloc = subparsers.add_parser("allocate", help="Allocate a port to a service")
alloc.add_argument("service", help="Service name")
alloc.add_argument("--port", "-p", type=int, help="Preferred port")
# check
check = subparsers.add_parser("check", help="Check what's using a port")
check.add_argument("port", type=int, help="Port number")
# list
subparsers.add_parser("list", help="List all port allocations")
# suggest
suggest = subparsers.add_parser("suggest", help="Suggest an available port")
suggest.add_argument("type", choices=PORT_RANGES.keys(), help="Service type")
# release
release = subparsers.add_parser("release", help="Release a port allocation")
release.add_argument("service", help="Service name")
args = parser.parse_args()
if args.command == "allocate":
port, msg = allocate_port(args.service, args.port)
print(msg)
sys.exit(0 if port > 0 else 1)
elif args.command == "check":
print(check_port(args.port))
elif args.command == "list":
print(list_allocations())
elif args.command == "suggest":
port = suggest_port(args.type)
if port > 0:
print(f"Suggested port for {args.type}: {port}")
else:
print(f"No available ports in {args.type} range")
sys.exit(1)
elif args.command == "release":
print(release_port(args.service))
if __name__ == "__main__":
main()