From 45ec8828a1aa1e66e71e8605ee6aef36f993e912 Mon Sep 17 00:00:00 2001 From: admin Date: Wed, 14 Jan 2026 12:33:33 -0300 Subject: [PATCH] Add port management skill and CLI commands - Add lib/port_manager.py for server port allocation - Rules: dedicated ports, no +1 increment, kill same service on conflict - Add 'luzia port' CLI commands (list/check/allocate/release/suggest) - Add .gitignore for __pycache__ Co-Authored-By: Claude Opus 4.5 --- .gitignore | 1 + bin/luzia | 87 +++++++++ lib/port_manager.py | 448 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 536 insertions(+) create mode 100644 .gitignore create mode 100755 lib/port_manager.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/bin/luzia b/bin/luzia index 839550d..a68b48f 100755 --- a/bin/luzia +++ b/bin/luzia @@ -4970,6 +4970,8 @@ class Router: (self._match_telegram, self._route_telegram, "Telegram notifications"), # Service management (for cockpits) (self._match_service, self._route_service, "Service management"), + # Port management + (self._match_port, self._route_port, "Port allocation"), # Watchdog (Task monitoring) (self._match_watchdog, self._route_watchdog, "Task watchdog"), (self._match_project_task, route_project_task, "Project task"), @@ -5394,6 +5396,91 @@ class Router: print(f"Unknown service command: {cmd}") return 1 + def _match_port(self, args: list) -> Optional[list]: + if args and args[0] == "port": + return args[1:] + return None + + def _route_port(self, config: dict, args: list, kwargs: dict) -> int: + """Handler: luzia port [list|check|allocate|release|suggest]""" + try: + sys.path.insert(0, str(Path(__file__).parent.parent / "lib")) + from port_manager import ( + list_allocations, check_port, allocate_port, + release_port, suggest_port, PORT_RANGES + ) + except ImportError as e: + print(f"Error: Port manager not available: {e}") + return 1 + + if not args: + print("Port Management Commands:") + print(" luzia port list - List all port allocations") + print(" luzia port check - Check what's using a port") + print(" luzia port allocate [-p PORT] - Allocate port to service") + print(" luzia port release - Release port allocation") + print(" luzia port suggest - Suggest available port") + print(f" Types: {', '.join(PORT_RANGES.keys())}") + return 0 + + cmd = args[0] + + if cmd == "list": + print(list_allocations()) + return 0 + + elif cmd == "check": + if len(args) < 2: + print("Usage: luzia port check ") + return 1 + try: + port = int(args[1]) + print(check_port(port)) + return 0 + except ValueError: + print(f"Invalid port number: {args[1]}") + return 1 + + elif cmd == "allocate": + if len(args) < 2: + print("Usage: luzia port allocate [--port PORT]") + return 1 + service = args[1] + preferred_port = None + if len(args) >= 4 and args[2] in ("-p", "--port"): + try: + preferred_port = int(args[3]) + except ValueError: + print(f"Invalid port number: {args[3]}") + return 1 + port, msg = allocate_port(service, preferred_port) + print(msg) + return 0 if port > 0 else 1 + + elif cmd == "release": + if len(args) < 2: + print("Usage: luzia port release ") + return 1 + print(release_port(args[1])) + return 0 + + elif cmd == "suggest": + if len(args) < 2: + print(f"Usage: luzia port suggest ") + print(f"Types: {', '.join(PORT_RANGES.keys())}") + return 1 + port = suggest_port(args[1]) + if port > 0: + print(f"Suggested port for {args[1]}: {port}") + return 0 + else: + print(f"No available ports in {args[1]} range") + return 1 + + else: + print(f"Unknown port command: {cmd}") + return 1 + def _route_exec(self, config: dict, args: list, kwargs: dict) -> int: """Handler: luzia --exec """ if len(args) < 2: diff --git a/lib/port_manager.py b/lib/port_manager.py new file mode 100755 index 0000000..7c6c2ae --- /dev/null +++ b/lib/port_manager.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 +""" +Port Manager - Server Port Allocation and Conflict Resolution + +Rules: +1. Applications should have their own dedicated port (not generic ports) +2. If using generic port, set up custom port for luzia +3. If app port is in use, check who is using it - if same service, kill and restart +4. NEVER run multiple servers by adding +1 to port number +5. Maintain port registry to track which ports belong to which services +""" +import json +import subprocess +import os +import signal +import sys +from pathlib import Path +from typing import Dict, Optional, Tuple, List +from datetime import datetime + +REGISTRY_PATH = Path("/var/lib/luz/port-registry.json") + +# Reserved port ranges for different service types +PORT_RANGES = { + "livekit-agents": (8081, 8099), # LiveKit voice agents + "web-apps": (3000, 3999), # Web applications + "api-services": (8000, 8080), # API services + "databases": (5432, 5499), # Database services + "mcp-servers": (8100, 8199), # MCP servers + "monitoring": (9000, 9099), # Prometheus, metrics + "internal": (8200, 8299), # Internal services +} + +# Known services and their dedicated ports +DEDICATED_PORTS = { + # LiveKit ecosystem + "livekit-server": 7880, + "livekit-agent-main": 8081, + "livekit-web": 8082, + "luzia-agent-elevenlabs": 8083, + "luzia-voice-chat": 8084, + + # Web services + "ia-luzia-web": 8091, + "auth-luz": 8090, + "luzuy-admin": 8092, + + # LLM/AI services + "litellm-proxy": 8005, + "ollama-api": 11434, + + # MCP servers + "hybrid-memory-mcp": 8100, + "sarlo-admin-mcp": 8101, + "assistant-channel-mcp": 8102, + "task-queue-mcp": 8103, + "backup-mcp": 8104, + "git-mcp": 8105, + + # Orchestrator + "luzia-orchestrator": 8200, + "luzia-status-publisher": 8201, + + # Docker services (via proxy) + "librechat": 8000, + "grafana": 3000, + "prometheus": 9090, + + # Other services + "whatsapp-webhook": 8281, + "agentic-server": 8321, + "voice-server": 8765, +} + + +def load_registry() -> Dict: + """Load the port registry from disk""" + if REGISTRY_PATH.exists(): + try: + return json.loads(REGISTRY_PATH.read_text()) + except json.JSONDecodeError: + return {"allocations": {}, "history": []} + return {"allocations": {}, "history": []} + + +def save_registry(registry: Dict) -> None: + """Save the port registry to disk""" + REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True) + REGISTRY_PATH.write_text(json.dumps(registry, indent=2)) + + +def get_port_user(port: int) -> Optional[Dict]: + """ + Check what process is using a port. + Returns dict with pid, process name, cmdline, or None if port is free. + """ + import re + try: + # Use ss with sudo to get process info + result = subprocess.run( + ["sudo", "ss", "-tlnp"], + capture_output=True, + text=True, + timeout=5 + ) + + port_pattern = f":{port}\\s" + for line in result.stdout.strip().split('\n')[1:]: # Skip header + if not line.strip(): + continue + + # Check if this line contains our port + if not re.search(port_pattern, line): + continue + + # Parse ss output to find PID + # Format: LISTEN 0 100 0.0.0.0:8081 0.0.0.0:* users:(("python3",pid=12345,fd=5)) + if 'pid=' in line: + pid_match = re.search(r'pid=(\d+)', line) + if pid_match: + pid = int(pid_match.group(1)) + + # Get process info + try: + cmdline = Path(f"/proc/{pid}/cmdline").read_text().replace('\x00', ' ').strip() + comm = Path(f"/proc/{pid}/comm").read_text().strip() + return { + "pid": pid, + "process": comm, + "cmdline": cmdline, + "port": port + } + except (FileNotFoundError, PermissionError): + return {"pid": pid, "process": "unknown", "cmdline": "", "port": port} + + return None + + except subprocess.TimeoutExpired: + return None + except Exception as e: + print(f"Error checking port {port}: {e}", file=sys.stderr) + return None + + +def is_same_service(service_name: str, port_user: Dict) -> bool: + """Check if the process using the port is the same service""" + if not port_user: + return False + + cmdline = port_user.get("cmdline", "").lower() + process = port_user.get("process", "").lower() + service_lower = service_name.lower().replace("-", "").replace("_", "") + + # Check if service name appears in cmdline or process name + checks = [ + service_lower in cmdline.replace("-", "").replace("_", ""), + service_lower in process.replace("-", "").replace("_", ""), + service_name.replace("-", "_") in cmdline, + service_name.replace("_", "-") in cmdline, + ] + + return any(checks) + + +def kill_process(pid: int, service_name: str, force: bool = False) -> bool: + """Kill a process, optionally with force""" + try: + sig = signal.SIGKILL if force else signal.SIGTERM + os.kill(pid, sig) + print(f"Sent {sig.name} to PID {pid} ({service_name})") + return True + except ProcessLookupError: + print(f"Process {pid} already terminated") + return True + except PermissionError: + print(f"Permission denied to kill PID {pid}") + return False + except Exception as e: + print(f"Error killing PID {pid}: {e}") + return False + + +def allocate_port(service_name: str, preferred_port: Optional[int] = None) -> Tuple[int, str]: + """ + Allocate a port for a service. + + Returns: (port, status_message) + + Logic: + 1. Check if service has a dedicated port + 2. If preferred_port specified, try to use it + 3. If port is in use by same service, kill and reuse + 4. If port is in use by different service, FAIL (don't increment!) + 5. Register the allocation + """ + registry = load_registry() + + # Step 1: Check for dedicated port + if service_name in DEDICATED_PORTS: + port = DEDICATED_PORTS[service_name] + print(f"Service '{service_name}' has dedicated port: {port}") + elif preferred_port: + port = preferred_port + print(f"Using preferred port: {port}") + else: + # Find from registry or allocate new + if service_name in registry["allocations"]: + port = registry["allocations"][service_name]["port"] + print(f"Found registered port for '{service_name}': {port}") + else: + return -1, f"ERROR: No dedicated port for '{service_name}'. Add to DEDICATED_PORTS or specify --port" + + # Step 2: Check if port is in use + port_user = get_port_user(port) + + if port_user: + print(f"Port {port} is in use by: {port_user}") + + # Step 3: Check if same service + if is_same_service(service_name, port_user): + print(f"Same service detected. Killing PID {port_user['pid']} to restart...") + + if kill_process(port_user["pid"], service_name): + # Wait a moment for port to be released + import time + time.sleep(1) + + # Verify port is now free + if get_port_user(port): + # Try SIGKILL + kill_process(port_user["pid"], service_name, force=True) + time.sleep(1) + + if get_port_user(port): + return -1, f"ERROR: Could not free port {port} after killing process" + + # Record in history + registry["history"].append({ + "action": "kill_restart", + "service": service_name, + "port": port, + "killed_pid": port_user["pid"], + "timestamp": datetime.now().isoformat() + }) + else: + return -1, f"ERROR: Could not kill process on port {port}" + else: + # Different service - DO NOT INCREMENT, FAIL! + return -1, ( + f"ERROR: Port {port} is in use by DIFFERENT service!\n" + f" Current user: {port_user['process']} (PID {port_user['pid']})\n" + f" Cmdline: {port_user['cmdline'][:100]}...\n" + f" Requested: {service_name}\n" + f"\nRESOLUTION OPTIONS:\n" + f" 1. Stop the conflicting service first\n" + f" 2. Assign a different dedicated port to '{service_name}'\n" + f" 3. Check if both services should share the port (unlikely)\n" + f"\nDO NOT use port {port + 1} - that creates port sprawl!" + ) + + # Step 4: Register allocation + registry["allocations"][service_name] = { + "port": port, + "allocated_at": datetime.now().isoformat(), + "dedicated": service_name in DEDICATED_PORTS + } + save_registry(registry) + + return port, f"OK: Port {port} allocated to '{service_name}'" + + +def list_allocations() -> str: + """List all port allocations""" + registry = load_registry() + + lines = ["PORT REGISTRY", "=" * 60] + + # Show dedicated ports + lines.append("\nDEDICATED PORTS:") + lines.append("-" * 40) + for service, port in sorted(DEDICATED_PORTS.items(), key=lambda x: x[1]): + user = get_port_user(port) + status = "ACTIVE" if user else "idle" + lines.append(f" {port:5d} {service:<30} [{status}]") + + # Show runtime allocations + if registry["allocations"]: + lines.append("\nRUNTIME ALLOCATIONS:") + lines.append("-" * 40) + for service, info in sorted(registry["allocations"].items(), key=lambda x: x[1]["port"]): + if service not in DEDICATED_PORTS: + user = get_port_user(info["port"]) + status = "ACTIVE" if user else "idle" + lines.append(f" {info['port']:5d} {service:<30} [{status}]") + + # Show active but unregistered + lines.append("\nACTIVE PORTS (8000-9000):") + lines.append("-" * 40) + + known_ports = set(DEDICATED_PORTS.values()) + known_ports.update(info["port"] for info in registry["allocations"].values()) + + for port in range(8000, 9001): + if port in known_ports: + continue + user = get_port_user(port) + if user: + lines.append(f" {port:5d} {user['process']:<20} PID={user['pid']} [UNREGISTERED!]") + + return "\n".join(lines) + + +def check_port(port: int) -> str: + """Check what's using a specific port""" + user = get_port_user(port) + + if not user: + return f"Port {port}: FREE" + + # Check if registered + registry = load_registry() + registered_service = None + + for service, info in registry["allocations"].items(): + if info["port"] == port: + registered_service = service + break + + for service, p in DEDICATED_PORTS.items(): + if p == port: + registered_service = service + break + + lines = [ + f"Port {port}: IN USE", + f" PID: {user['pid']}", + f" Process: {user['process']}", + f" Cmdline: {user['cmdline'][:100]}", + f" Registered: {registered_service or 'NO (unregistered!)'}", + ] + + return "\n".join(lines) + + +def release_port(service_name: str) -> str: + """Release a port allocation (does not kill the process)""" + registry = load_registry() + + if service_name in registry["allocations"]: + info = registry["allocations"].pop(service_name) + registry["history"].append({ + "action": "release", + "service": service_name, + "port": info["port"], + "timestamp": datetime.now().isoformat() + }) + save_registry(registry) + return f"Released port {info['port']} from '{service_name}'" + + return f"Service '{service_name}' has no runtime allocation" + + +def suggest_port(service_type: str) -> int: + """Suggest an available port for a service type""" + if service_type not in PORT_RANGES: + print(f"Unknown service type: {service_type}") + print(f"Known types: {', '.join(PORT_RANGES.keys())}") + return -1 + + start, end = PORT_RANGES[service_type] + registry = load_registry() + + used_ports = set(DEDICATED_PORTS.values()) + used_ports.update(info["port"] for info in registry["allocations"].values()) + + for port in range(start, end + 1): + if port not in used_ports and not get_port_user(port): + return port + + return -1 + + +# CLI interface +def main(): + import argparse + + parser = argparse.ArgumentParser( + description="Port Manager - Server Port Allocation and Conflict Resolution", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + port_manager.py allocate my-service --port 8085 + port_manager.py check 8081 + port_manager.py list + port_manager.py suggest web-apps + port_manager.py release my-service +""" + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + # allocate + alloc = subparsers.add_parser("allocate", help="Allocate a port to a service") + alloc.add_argument("service", help="Service name") + alloc.add_argument("--port", "-p", type=int, help="Preferred port") + + # check + check = subparsers.add_parser("check", help="Check what's using a port") + check.add_argument("port", type=int, help="Port number") + + # list + subparsers.add_parser("list", help="List all port allocations") + + # suggest + suggest = subparsers.add_parser("suggest", help="Suggest an available port") + suggest.add_argument("type", choices=PORT_RANGES.keys(), help="Service type") + + # release + release = subparsers.add_parser("release", help="Release a port allocation") + release.add_argument("service", help="Service name") + + args = parser.parse_args() + + if args.command == "allocate": + port, msg = allocate_port(args.service, args.port) + print(msg) + sys.exit(0 if port > 0 else 1) + + elif args.command == "check": + print(check_port(args.port)) + + elif args.command == "list": + print(list_allocations()) + + elif args.command == "suggest": + port = suggest_port(args.type) + if port > 0: + print(f"Suggested port for {args.type}: {port}") + else: + print(f"No available ports in {args.type} range") + sys.exit(1) + + elif args.command == "release": + print(release_port(args.service)) + + +if __name__ == "__main__": + main()