#!/usr/bin/env python3 """ DockerBridge - Manages lazy-loaded Docker containers for Project Agents. Executes tools inside containers while preserving user ownership. Containers spin up on-demand and auto-stop after idle timeout. """ import subprocess import time import os import json import logging from typing import Optional, Dict, Any from pathlib import Path from datetime import datetime, timedelta logger = logging.getLogger("luzia-docker") # Global registry of active containers and their last activity _container_activity: Dict[str, datetime] = {} IDLE_TIMEOUT_MINUTES = 10 DEFAULT_IMAGE = "luzia-sandbox:latest" class DockerBridge: """ Manages lazy-loaded Docker containers for Project Agents. Executes tools inside containers while preserving user ownership. """ def __init__( self, project: str, host_path: str, image: str = DEFAULT_IMAGE, timeout_seconds: int = 300, extra_mounts: list = None ): self.project = project self.host_path = host_path self.container_name = f"luzia-{project}" self.image = image self.timeout_seconds = timeout_seconds self.extra_mounts = extra_mounts or [] self._uid = self._get_uid() self._gid = self._get_gid() def _get_uid(self) -> str: """Get UID for the project user to ensure correct file ownership""" try: result = subprocess.run( ["id", "-u", self.project], capture_output=True, text=True, check=True ) return result.stdout.strip() except subprocess.CalledProcessError: logger.warning(f"Could not get UID for {self.project}, using 1000") return "1000" def _get_gid(self) -> str: """Get GID for the project user""" try: result = subprocess.run( ["id", "-g", self.project], capture_output=True, text=True, check=True ) return result.stdout.strip() except subprocess.CalledProcessError: logger.warning(f"Could not get GID for {self.project}, using 1000") return "1000" def _is_running(self) -> bool: """Check if the container is currently running""" result = subprocess.run( ["docker", "inspect", "-f", "{{.State.Running}}", self.container_name], capture_output=True, text=True ) return result.returncode == 0 and "true" in result.stdout.strip().lower() def _update_activity(self): """Update last activity timestamp for idle tracking""" _container_activity[self.container_name] = datetime.now() def ensure_running(self) -> bool: """Start container if not running (Lazy Loading). Returns True if started.""" if self._is_running(): self._update_activity() return False # Already running logger.info(f"Starting container {self.container_name} for {self.project}") # Remove if exists but stopped subprocess.run( ["docker", "rm", "-f", self.container_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # Build run command cmd = [ "docker", "run", "-d", "--name", self.container_name, "--user", f"{self._uid}:{self._gid}", "-e", f"HOME=/workspace", "-e", f"npm_config_cache=/workspace/.npm", # Use user-specific temp dir to avoid /tmp collisions "-e", f"TMPDIR=/workspace/.tmp", "-e", f"TEMP=/workspace/.tmp", "-e", f"TMP=/workspace/.tmp", "-v", f"{self.host_path}:/workspace", "-w", "/workspace", "--network", "host", # Allow access to local services "--restart", "unless-stopped", # Resource limits "--memory", "2g", "--cpus", "2", # Labels for management "--label", "luzia.project=" + self.project, "--label", "luzia.created=" + datetime.now().isoformat(), ] # Add extra mounts (e.g., /opt/dss for DSS project) for mount in self.extra_mounts: cmd.extend(["-v", mount]) cmd.extend([self.image, "tail", "-f", "/dev/null"]) # Keep alive result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.error(f"Failed to start container: {result.stderr}") raise RuntimeError(f"Failed to start container: {result.stderr}") # Give it a moment to stabilize time.sleep(0.5) # Ensure user-specific temp directory exists inside container subprocess.run( ["docker", "exec", self.container_name, "mkdir", "-p", "/workspace/.tmp"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self._update_activity() return True def execute(self, command: str, timeout: Optional[int] = None) -> Dict[str, Any]: """ Run a bash command inside the container. Returns dict with: - success: bool - output: str (stdout) - error: str (stderr if any) - exit_code: int """ self.ensure_running() cmd = ["docker", "exec", self.container_name, "bash", "-c", command] timeout = timeout or self.timeout_seconds try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) self._update_activity() return { "success": result.returncode == 0, "output": result.stdout, "error": result.stderr, "exit_code": result.returncode } except subprocess.TimeoutExpired: return { "success": False, "output": "", "error": f"Command timed out after {timeout}s", "exit_code": -1 } def write_file(self, path: str, content: str) -> Dict[str, Any]: """ Write file inside container using 'tee'. File is owned by the container user (project user). Args: path: Relative path from /workspace (project home) content: File content to write """ self.ensure_running() # Ensure parent directory exists parent_dir = os.path.dirname(path) if parent_dir: self.execute(f"mkdir -p '{parent_dir}'") cmd = ["docker", "exec", "-i", self.container_name, "tee", path] try: result = subprocess.run( cmd, input=content.encode('utf-8'), capture_output=True, timeout=30 ) self._update_activity() if result.returncode == 0: return { "success": True, "message": f"Successfully wrote to {path}", "bytes_written": len(content.encode('utf-8')) } else: return { "success": False, "message": f"Failed to write file: {result.stderr.decode()}" } except subprocess.TimeoutExpired: return { "success": False, "message": "Write operation timed out" } def read_file(self, path: str) -> Dict[str, Any]: """Read file from container""" result = self.execute(f"cat '{path}'") if result["success"]: return { "success": True, "content": result["output"] } return { "success": False, "error": result["error"] or "File not found or not readable" } def list_files(self, path: str = ".", pattern: str = "*") -> Dict[str, Any]: """List files matching pattern""" result = self.execute(f"find '{path}' -name '{pattern}' -type f 2>/dev/null | head -100") if result["success"]: files = [f for f in result["output"].strip().split("\n") if f] return {"success": True, "files": files} return {"success": False, "error": result["error"]} def grep(self, pattern: str, path: str = ".") -> Dict[str, Any]: """Search for pattern in files""" result = self.execute( f"grep -rn '{pattern}' '{path}' 2>/dev/null | head -50" ) return { "success": True, "matches": result["output"], "truncated": len(result["output"].split("\n")) >= 50 } def stop(self): """Stop the container""" logger.info(f"Stopping container {self.container_name}") subprocess.run(["docker", "stop", self.container_name], capture_output=True) if self.container_name in _container_activity: del _container_activity[self.container_name] def remove(self): """Stop and remove the container""" logger.info(f"Removing container {self.container_name}") subprocess.run(["docker", "rm", "-f", self.container_name], capture_output=True) if self.container_name in _container_activity: del _container_activity[self.container_name] def status(self) -> Dict[str, Any]: """Get container status""" if not self._is_running(): return {"running": False} # Get container info result = subprocess.run( ["docker", "inspect", self.container_name], capture_output=True, text=True ) if result.returncode != 0: return {"running": False, "error": result.stderr} info = json.loads(result.stdout)[0] return { "running": True, "container_id": info["Id"][:12], "started_at": info["State"]["StartedAt"], "user": f"{self._uid}:{self._gid}", "image": self.image, "last_activity": _container_activity.get( self.container_name, datetime.now() ).isoformat() } def cleanup_idle_containers(timeout_minutes: int = IDLE_TIMEOUT_MINUTES): """Stop containers that have been idle for too long""" now = datetime.now() timeout = timedelta(minutes=timeout_minutes) # Get all luzia containers result = subprocess.run( ["docker", "ps", "--filter", "name=luzia-", "--format", "{{.Names}}"], capture_output=True, text=True ) if result.returncode != 0: return containers = [c.strip() for c in result.stdout.strip().split("\n") if c.strip()] for container_name in containers: last_activity = _container_activity.get(container_name) if last_activity is None: # No activity tracked, check container start time inspect = subprocess.run( ["docker", "inspect", "-f", "{{.State.StartedAt}}", container_name], capture_output=True, text=True ) if inspect.returncode == 0: try: # Parse Docker timestamp started = inspect.stdout.strip()[:26] # Trim nanoseconds last_activity = datetime.fromisoformat(started.replace("Z", "+00:00").replace("+00:00", "")) _container_activity[container_name] = last_activity except: continue if last_activity and (now - last_activity) > timeout: logger.info(f"Stopping idle container: {container_name}") subprocess.run(["docker", "stop", container_name], capture_output=True) if container_name in _container_activity: del _container_activity[container_name] def list_project_containers() -> list: """List all luzia project containers""" result = subprocess.run( ["docker", "ps", "-a", "--filter", "name=luzia-", "--format", "{{.Names}}\t{{.Status}}\t{{.CreatedAt}}"], capture_output=True, text=True ) if result.returncode != 0: return [] containers = [] for line in result.stdout.strip().split("\n"): if not line: continue parts = line.split("\t") if len(parts) >= 2: containers.append({ "name": parts[0], "status": parts[1], "created": parts[2] if len(parts) > 2 else "unknown" }) return containers