Source code for minitrino.core.cluster.ops

"""Cluster operations and resource management for Minitrino clusters."""

from __future__ import annotations

import concurrent.futures
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING

from docker.errors import APIError, NotFound

from minitrino import utils
from minitrino.core.cluster.provisioner import ClusterProvisioner
from minitrino.core.docker.wrappers import (
    MinitrinoContainer,
    MinitrinoDockerObject,
    MinitrinoImage,
    MinitrinoNetwork,
)
from minitrino.core.errors import MinitrinoError, UserError
from minitrino.settings import ETC_DIR
from minitrino.shutdown import shutdown_event

if TYPE_CHECKING:
    from minitrino.core.cluster.cluster import Cluster
    from minitrino.core.context import MinitrinoContext


[docs] class ClusterOperations: """Cluster operations manager for the current cluster. Parameters ---------- ctx : MinitrinoContext An instantiated MinitrinoContext object with user input and context. cluster : Cluster An instantiated `Cluster` object. Methods ------- provision() Provisions the cluster. reconcile_workers(workers: int) Provisions or adjusts worker containers based on the specified number. down(sig_kill: bool = False, keep: bool = False) Stops and optionally removes all containers for the current cluster. restart() Restarts all cluster containers (coordinator and workers). restart_containers(c_restart: Optional[list[str]] = None, log_level: LogLevel = LogLevel.DEBUG) Restarts all the containers in the provided list. Can apply to any container in the environment, not just the coordinator and workers. remove(obj_type: str, force: bool, labels: Optional[list[str]] = None) Removes Docker objects (images, volumes, or networks) associated with the current cluster. rollback() Terminates the provision operations and removes the cluster. """ def __init__(self, ctx: MinitrinoContext, cluster: Cluster): self._ctx = ctx self._cluster = cluster self._provisioner = ClusterProvisioner(ctx, cluster)
[docs] def provision( self, modules: list[str], image: str, workers: int, no_rollback: bool, ) -> None: """Provision the cluster and environment dependencies. Dependencies include any service/configuration that is defined in the module(s) that are to be provisioned. Parameters ---------- modules : list[str] One or more modules to provision in the cluster. image : str Cluster image type (trino or starburst). workers : int Number of cluster workers to provision. no_rollback : bool If True, disables rollback on failure. Notes ----- - If no options are provided, a standalone coordinator is provisioned. - Supports Trino or Starburst distributions. - Dependent modules are automatically added to the environment. - Dependent clusters are automatically provisioned after the primary cluster is launched. """ self._provisioner.provision(modules, image, workers, no_rollback)
[docs] def reconcile_workers(self, workers: int = 0) -> None: """Reconcile the number of workers in the cluster. Notes ----- Handles five scenarios: 1. No `workers` value is provided and no workers are currently running — does nothing. 2. A positive `workers` value is provided and no workers exist — provisions new workers. 3. No `workers` value is provided but some are already running — uses current count. 4. Provided `workers` value is greater than running workers — provisions more workers. 5. Provided `workers` value is less than running workers — removes excess workers. Parallelism is set to 4 for worker provisioning. """ pattern = rf"minitrino-worker-\d+-{self._ctx.cluster_name}" worker_containers = [ c.name for c in self._cluster.resource.resources().containers() if c.name and re.match(pattern, c.name) and c.name.startswith("minitrino-worker-") and c.labels.get("org.minitrino.root") == "true" ] running_workers = len(worker_containers) # Scenario 1 if workers == 0 and running_workers == 0: return # Scenario 3 if workers == 0 and running_workers > 0: workers = running_workers # Scenario 4 if workers > running_workers: self._ctx.logger.info(f"Adding {workers} workers...") # Scenario 5 if workers < running_workers: worker_containers.sort(reverse=True) excess = running_workers - workers remove = [name for name in worker_containers[:excess] if name] for name in remove: container_obj = self._cluster.resource.container(name) container_obj.kill() container_obj.remove() identifier = utils.generate_identifier( {"ID": container_obj.short_id, "Name": container_obj.name} ) self._ctx.logger.warn(f"Removed excess worker: {identifier}") ver = self._ctx.env.get("CLUSTER_VER") dist = self._ctx.env.get("CLUSTER_DIST") worker_img = f"minitrino/cluster:{ver}-{dist}" compose_project_name = self._cluster.resource.compose_project_name() network_name = f"minitrino_{self._ctx.cluster_name}" fq_container_name = self._cluster.resource.fq_container_name("minitrino") coordinator = self._cluster.resource.container(fq_container_name) # Create tar archive of coordinator's /etc/${CLUSTER_DIST}; user = self._ctx.env.get("SERVICE_USER") tar_path = "/tmp/${CLUSTER_DIST}.tar.gz" self._ctx.cmd_executor.execute( ["rm -rf /tmp/${CLUSTER_DIST}_copy"], ["rm /tmp/${CLUSTER_DIST}.tar.gz"], ["cp -a /etc/${CLUSTER_DIST} /tmp/${CLUSTER_DIST}_copy"], ["rm /tmp/${CLUSTER_DIST}_copy/config.properties"], ["rm /tmp/${CLUSTER_DIST}_copy/jvm.config"], [f"tar czf {tar_path} -C /tmp/${{CLUSTER_DIST}}_copy ."], ["rm -rf /tmp/${CLUSTER_DIST}_copy"], container=coordinator, user=user, ) def _provision_worker(i: int) -> None: if shutdown_event.is_set(): self._ctx.logger.warn(f"Shutdown signal detected. Skipping worker {i}.") return fq_worker_name = self._cluster.resource.fq_container_name( f"minitrino-worker-{i}", ) try: worker = self._cluster.resource.container(fq_worker_name) except NotFound: env_list = coordinator.attrs["Config"]["Env"] env_dict = dict(item.split("=", 1) for item in env_list if "=" in item) env_dict["WORKER"] = "true" env_dict["COORDINATOR"] = "false" # Extract extra_hosts from coordinator to ensure workers have same # network capabilities (e.g., host.docker.internal resolution) extra_hosts_list = coordinator.attrs.get("HostConfig", {}).get( "ExtraHosts", [] ) extra_hosts_dict = {} if extra_hosts_list: for entry in extra_hosts_list: if ":" in entry: hostname, ip = entry.split(":", 1) extra_hosts_dict[hostname] = ip worker_base = self._ctx.docker_client.containers.run( worker_img, name=fq_worker_name, environment=env_dict, detach=True, hostname=fq_worker_name, network=network_name, extra_hosts=extra_hosts_dict if extra_hosts_dict else None, labels={ "org.minitrino.root": "true", "org.minitrino.module.minitrino": "true", "com.docker.compose.project": compose_project_name, "com.docker.compose.service": f"minitrino-worker-{i}", }, ) shared_network = self._ctx.docker_client.networks.get("cluster_shared") shared_network.connect(worker_base) worker = MinitrinoContainer(worker_base, self._ctx.cluster_name) self._ctx.logger.debug( f"Created and started worker container: '{fq_worker_name}' " f"in network '{network_name}'." ) # Copy the tar archive from the coordinator container bits, _ = coordinator.get_archive( f"/tmp/{self._ctx.env.get('CLUSTER_DIST')}.tar.gz" ) tar_stream = b"".join(bits) worker.put_archive("/tmp", tar_stream) # Extract the tar archive into the new worker container self._ctx.cmd_executor.execute( [f"tar xzf {tar_path} -C /etc/${{CLUSTER_DIST}}"], container=worker, user=user, ) self._ctx.logger.debug(f"Copied {ETC_DIR} to '{fq_worker_name}'") with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [ executor.submit(_provision_worker, i) for i in range(1, workers + 1) ] for future in concurrent.futures.as_completed(futures): if shutdown_event.is_set(): self._ctx.logger.warn( "Shutdown detected. Aborting remaining worker provisioning." ) break try: future.result() except Exception as exc: raise MinitrinoError("Worker provisioning failed") from exc # Remove the tar archive self._ctx.cmd_executor.execute( ["rm /tmp/${CLUSTER_DIST}.tar.gz"], container=coordinator, user=user, )
[docs] def down(self, sig_kill: bool = False, keep: bool = False) -> None: """Stop and optionally remove all containers from the cluster. Parameters ---------- sig_kill : bool, optional If True, containers will be stopped using SIGKILL instead of SIGTERM. keep : bool, optional If True, containers will be stopped but not removed. """ resources = self._cluster.resource.resources() containers = resources.containers() if len(containers) == 0: self._ctx.logger.info("No containers to bring down.") return def stop_container(container: MinitrinoContainer): identifier = utils.generate_identifier( {"ID": container.short_id, "Name": container.name} ) if container.status == "running": if sig_kill: container.kill() else: container.stop() self._ctx.logger.info(f"Stopped container: {identifier}") return container def remove_container(container: MinitrinoContainer): identifier = utils.generate_identifier( {"ID": container.short_id, "Name": container.name} ) container.remove() self._ctx.logger.info(f"Removed container: {identifier}") with ThreadPoolExecutor() as executor: stop_futures = { executor.submit(stop_container, container): container for container in containers } for future in as_completed(stop_futures): container = stop_futures[future] try: future.result() except Exception as e: raise MinitrinoError( f"Error stopping container '{container.name}'" ) from e if not keep: with ThreadPoolExecutor() as executor: remove_futures = { executor.submit(remove_container, container): container for container in containers } for future in as_completed(remove_futures): container = remove_futures[future] try: future.result() except Exception as e: raise MinitrinoError( f"Error removing container '{container.name}'" ) from e self._ctx.logger.info("Brought down all Minitrino containers.")
[docs] def restart(self) -> None: """Restart all cluster containers (coordinator and workers).""" cluster_resources = self._cluster.resource.resources() containers = cluster_resources.containers() if len(containers) == 0: self._ctx.logger.info("No cluster containers to restart.") return cluster_containers = [c.name for c in containers if c.name] self.restart_containers(cluster_containers) self._ctx.logger.info( f"Restarted containers in cluster '{self._ctx.cluster_name}'." )
[docs] def restart_containers(self, c_restart: list[str] | None = None) -> None: """Restart all the containers in the provided list. Parameters ---------- c_restart : Optional[list[str]], optional List of fully-qualified container names to restart, by default None. """ if c_restart is None: return c_restart = list(set(c_restart)) def _restart_container(container_name: str) -> None: """Restart a single container by name. Parameters ---------- container_name : str The name of the container to restart. Raises ------ MinitrinoError If the container is not found. """ try: container = self._ctx.docker_client.containers.get(container_name) self._ctx.logger.debug(f"Restarting container '{container.name}'...") container.restart() self._ctx.logger.debug( f"Container '{container.name}' restarted successfully." ) except NotFound as e: raise MinitrinoError( f"Attempting to restart container '{container_name}', " f"but the container was not found." ) from e with ThreadPoolExecutor() as executor: futures = { executor.submit(_restart_container, container): container for container in c_restart } for future in as_completed(futures): container_name = futures[future] try: future.result() except Exception as e: raise MinitrinoError( f"Error while restarting container '{container_name}'" ) from e
[docs] def remove( self, obj_type: str, force: bool, modules: list[str] | None = None ) -> None: """Remove Docker objects associated with the current cluster. Parameters ---------- obj_type : str Type of Docker object to remove. Must be an image, volume, or network. force : bool If True, forces removal even if the resource is in use. modules : list[str], optional Module names to filter which resources should be removed. Raises ------ UserError If attempting to remove images for a specific cluster or module. Notes ----- This method deletes the specified Docker resource type(s) filtered by labels (sourced from provided modules, cluster name, or the project root label). Because images are global project resources (they are not tied to any one cluster or module), they can only be removed as a global operation (using `--cluster all` and omitting `--module`). """ modules = modules or [] if obj_type == "image": if modules: self._ctx.logger.warn( "Cannot remove images for a specific module. " "Skipping image removal." ) return if not self._ctx.all_clusters: self._ctx.logger.warn( "Cannot remove images for a specific cluster. " "Skipping image removal." ) return module_labels = [] for module_name in modules: module: dict | None = self._ctx.modules.data.get(module_name) if module is None: raise UserError(f"Module '{module_name}' not found.") module_labels.append(module["label"]) if not module_labels: self._remove(obj_type, None, force) return for label in module_labels: self._remove(obj_type, label, force)
[docs] def rollback(self) -> None: """Terminate the provision operations and remove the cluster.""" self._ctx.logger.warn( f"Rolling back cluster '{self._ctx.cluster_name}'...", ) resources = self._cluster.resource.resources() containers = resources.containers() for c in containers: try: c.kill() self._ctx.logger.debug(f"Rolled back {repr(c)}") except Exception: pass try: c.remove() self._ctx.logger.debug(f"Rolled back {repr(c)}") except Exception: pass
def _remove(self, obj_type: str, label: str | None, force: bool) -> None: resources = self._cluster.resource.resources([label] if label else None) items: list[MinitrinoDockerObject] if obj_type == "image": items = list(resources.images()) elif obj_type == "volume": items = list(resources.volumes()) elif obj_type == "network": items = list(resources.networks()) else: raise MinitrinoError(f"Invalid object type: {obj_type}") for obj in items: identifier = "<unknown>" try: fields = self._get_identifier_fields(obj_type, obj) identifier = utils.generate_identifier(fields) if obj.kind == "network": assert isinstance(obj, MinitrinoNetwork) obj.remove() else: assert not isinstance(obj, MinitrinoNetwork) obj.remove(force=force) self._ctx.logger.info(f"{obj_type.title()} removed: {identifier}") except APIError as e: self._ctx.logger.info( f"Cannot remove {obj_type}: {identifier}\n" f"Error from Docker: {e.explanation}" ) def _get_identifier_fields( self, obj_type: str, item: MinitrinoDockerObject ) -> dict[str, str]: """Return a dictionary of identifying fields for Docker resources. Parameters ---------- obj_type : str Type of Docker object (container, image, volume, network). item : docker.models object The Docker object to extract metadata from. Returns ------- dict[str, str] Mapping of human-readable keys and values. """ if obj_type == "image": id_val = item.short_id tag_val = "<none>" # Narrow item to MinitrinoImage before calling # _try_get_image_tag if isinstance(item, MinitrinoImage): tag_val = self._try_get_image_tag(item) tag_val = tag_val if tag_val else "<none>" return {"ID": id_val, "Image:Tag": tag_val} else: id_val = ( getattr(item, "name", None) or f"<unnamed-{item.kind}-{item.cluster_name}>" ) return {"ID": id_val} def _try_get_image_tag(self, image: MinitrinoImage) -> str: """Safely fetch the first tag from an image.""" try: return image.tags[0] except Exception: return ""