Source code for minitrino.core.cluster.ports
"""Port management for Minitrino clusters.
This module provides classes and functions to manage port assignments for Minitrino
clusters, including dynamic port assignment and handling user overrides.
"""
from __future__ import annotations
import re
import socket
from typing import TYPE_CHECKING
from minitrino.core.errors import UserError
if TYPE_CHECKING:
from minitrino.core.cluster.cluster import Cluster
from minitrino.core.context import MinitrinoContext
[docs]
class ClusterPortManager:
"""Manage cluster ports for the current cluster.
Parameters
----------
ctx : MinitrinoContext
An instantiated MinitrinoContext object with user input and
context.
cluster : Cluster
An instantiated `Cluster` object.
"""
def __init__(self, ctx: MinitrinoContext, cluster: Cluster):
self._ctx = ctx
self._cluster = cluster
[docs]
def set_external_ports(self, modules: list[str] | None = None) -> None:
"""Dynamically assign host ports to containers.
Parameters
----------
modules : list[str], optional
A list of module names to scan for port mappings.
"""
self._assign_port("minitrino", "__PORT_MINITRINO", 8080)
modules = modules or []
services = self._ctx.modules.module_services(modules)
for service in services:
port_mappings = service[1].get("ports", [])
container_name = service[1].get("container_name", "undefined")
if container_name == "undefined":
# If the container name is undefined, use the service
# name
container_name = service[0]
for port_mapping in port_mappings:
if "__PORT" not in port_mapping:
continue
host_port_var, default_port = port_mapping.split(":")
# Remove ${} syntax from the environment variable name
host_port_var_name = re.sub(r"\$\{([^}]+)\}", r"\1", host_port_var)
try:
isinstance(int(default_port), int)
except ValueError as e:
raise UserError(
f"Default port '{default_port}' is not a valid integer. "
f"Please check the module's Docker Compose YAML file for the "
f"correct variable name and ensure a default value is "
f"set as an environment variable. See the wiki for more "
f"information: TODO: link\n{e}",
) from e
self._assign_port(container_name, host_port_var_name, int(default_port))
def _is_port_in_use(self, port: int) -> bool:
"""Check if a port is in use on the local machine."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("127.0.0.1", port))
return False
except OSError:
return True
def _is_docker_port_in_use(self, port: int) -> bool:
"""Check if a port is in use by any running Docker container."""
containers = self._ctx.docker_client.containers.list()
for container in containers:
ports = container.attrs.get("NetworkSettings", {}).get("Ports", {})
for binding in ports.values():
if binding:
for b in binding:
if str(port) == b.get("HostPort"):
return True
return False
def _find_next_available_port(
self, default_port: int, exclude_var: str | None = None
) -> int:
"""Find the next available port on the host."""
candidate_port = default_port
while (
self._is_port_in_use(candidate_port)
or self._is_docker_port_in_use(candidate_port)
or self._is_port_assigned_in_session(candidate_port, exclude_var)
):
self._ctx.logger.debug(
f"Port {candidate_port} is already in use. "
"Finding the next available port..."
)
candidate_port += 1
return candidate_port
def _is_port_assigned_in_session(
self, port: int, exclude_var: str | None = None
) -> bool:
"""Check if a port has been assigned in the current session."""
for env_var, env_value in self._ctx.env.items():
if env_var.startswith("__PORT_") and env_value == str(port):
if exclude_var and env_var == exclude_var:
continue # Skip the variable we're currently assigning
return True
return False
def _assign_port(
self, container_name: str, host_port_var: str, default_port: int
) -> None:
"""Assign an available host port to a container."""
candidate_port = self._find_next_available_port(default_port, host_port_var)
fq_container_name = self._cluster.resource.fq_container_name(container_name)
self._ctx.logger.info(
f"Found available port {candidate_port} for container '"
f"{fq_container_name}'. The service can be reached at "
f"localhost:{candidate_port}.",
)
self._ctx.logger.debug(
f"Setting environment variable {host_port_var} to {candidate_port}"
)
self._ctx.env.update({host_port_var: str(candidate_port)})