Source code for minitrino.core.cluster.provisioner

"""Cluster operations and resource management for Minitrino clusters."""

from __future__ import annotations

import hashlib
import os
import shutil
import threading
import time
from collections.abc import Callable
from typing import TYPE_CHECKING

from docker.errors import NotFound

from minitrino import utils
from minitrino.core.errors import MinitrinoError, UserError
from minitrino.shutdown import shutdown_event

if TYPE_CHECKING:
    from minitrino.core.cluster.cluster import Cluster
    from minitrino.core.context import MinitrinoContext


[docs] class ClusterProvisioner: """Provision the cluster and provided modules. Parameters ---------- ctx : MinitrinoContext An instantiated MinitrinoContext object with user input and context. cluster : Cluster An instantiated `Cluster` object. Methods ------- provision() Provision the cluster and provided modules. """ def __init__(self, ctx: MinitrinoContext, cluster: Cluster): self._ctx = ctx self._cluster = cluster self.modules: list[str] = [] self.image: str = "trino" self.workers: int = 0 self.no_rollback: bool = False self.build: bool = False self._captured_container_logs: str = "" # Store container logs before rollback self._worker_safe_event: threading.Event = threading.Event() self._dep_cluster_env: dict[str, str] = {}
[docs] def provision( self, modules: list[str], image: str, workers: int, no_rollback: bool, ) -> None: """Provision the cluster and provided modules. Notes ----- - Should always be invoked from `ClusterOperations.provision()`. - Writes a crashdump log to the user directory if an exception is raised. """ def _orchestrate(): if self._ctx.all_clusters: raise UserError( "The `provision` command cannot interact with multiple/all" " clusters. Please specify a valid cluster name containing only" " alphanumeric, hyphen, and underscore characters." ) self.modules = modules self.image = image self.workers = workers self.no_rollback = no_rollback self._set_license() self._set_distribution() self.build = self._determine_build() for module in self.modules: self._ctx.modules.validate_module_name(module) if not self.modules: self._ctx.logger.info( f"No modules specified. Provisioning standalone " f"{self._ctx.env.get('CLUSTER_DIST').title()} cluster..." ) utils.check_daemon(self._ctx.docker_client) utils.check_lib(self._ctx) self._ctx.cluster.validator.check_cluster_ver() self._ctx.modules.check_module_version_requirements(self.modules) self.modules = self._append_running_modules(self.modules) self.modules = self._ctx.modules.check_dep_modules(self.modules) dependent_clusters = self._ctx.cluster.validator.check_dependent_clusters( self.modules ) clusters_to_provision = [None] # None represents main cluster clusters_to_provision.extend(dependent_clusters) try: self._ensure_shared_network() self._runner() # Provisions main cluster for cluster in dependent_clusters: self._runner(cluster=cluster) self._record_image_src_checksum() self._ctx.logger.info("Environment provisioning complete.") except Exception as e: self._ctx.logger.error( "Provisioning failed. Rolling back all provisioned clusters..." ) # Capture container logs before rollback destroys them self._capture_container_logs_for_crashdump() self._rollback() raise e try: _orchestrate() except UserError as e: raise e except Exception as e: crashdump = os.path.join(self._ctx.minitrino_user_dir, "crashdump.log") self._ctx.logger.error( f"{str(e)}\nFull provision log written to {crashdump}" ) with open(crashdump, "w") as f: # Write Minitrino logs f.write("=" * 80 + "\n") f.write("MINITRINO LOGS\n") f.write("=" * 80 + "\n\n") for msg, _, is_spinner in self._ctx.logger._log_sink.buffer: if not is_spinner: f.write(msg + "\n") # Write captured container logs f.write("\n\n") if self._captured_container_logs: f.write(self._captured_container_logs) else: f.write("=" * 80 + "\n") f.write("CONTAINER LOGS\n") f.write("=" * 80 + "\n\n") f.write("No container logs were captured.\n") raise MinitrinoError( f"{str(e)}\nFull provision log written to {crashdump}" ) from e
def _runner(self, cluster: dict | None = None) -> None: """Execute the provisioning sequence for a cluster and modules. If provisioning a dependent cluster, update the cluster context, instance attributes, and environment variables before executing the provisioning sequence. Parameters ---------- cluster : dict, optional Optional dictionary representing a dependent cluster configuration. Defaults to None. """ if cluster is None: cluster = {} self._worker_safe_event.clear() log_dist = self._ctx.env.get("CLUSTER_DIST") self._ctx.logger.info(f"Starting {log_dist.title()} cluster provisioning...") # If a dependent cluster is being provisioned, we need to update # the context to the dependent cluster's values, then update the # environment variables so that the Compose YAMLs use the # correct values. if cluster: self._ctx.logger.info( f"Provisioning dependent cluster: {cluster['name']}..." ) self.modules = self._ctx.modules.check_dep_modules( cluster.get("modules", []) ) self.workers = cluster.get("workers", 0) self._ctx.cluster_name = cluster.get("name", "") if cluster and cluster.get("env"): self._dep_cluster_env.clear() self._ctx.logger.debug( f"Loading dependent cluster env vars: {cluster['env']}" ) self._dep_cluster_env.update(cluster.get("env", {})) self._set_env_vars() self._ctx.provisioned_clusters.append(self._ctx.cluster_name) self._ctx.modules.check_enterprise(self.modules) self._ctx.modules.check_compatibility(self.modules) self._ctx.modules.check_volumes() self._ctx.cluster.ports.set_external_ports(self.modules) try: module_yaml_paths = self._module_yaml_paths() compose_cmd = self._build_compose_command(module_yaml_paths) worker_thread = None if self.workers > 0: worker_thread = threading.Thread( target=self._provision_workers_when_safe, name="ProvisionWorkersThread", daemon=True, ) worker_thread.start() self._run_compose_and_wait(compose_cmd) if worker_thread: worker_thread.join() self._ctx.cluster.validator.check_dup_config() except Exception as e: self._rollback() raise MinitrinoError("Failed to provision cluster.") from e def _capture_container_logs_for_crashdump(self) -> None: """Capture container logs before rollback destroys them. Stores logs in self._captured_container_logs for later use in crashdump. """ logs_buffer = [] logs_buffer.append("=" * 80 + "\n") logs_buffer.append("CONTAINER LOGS\n") logs_buffer.append("=" * 80 + "\n\n") # Save current cluster name to restore later original_cluster_name = self._ctx.cluster_name total_containers = 0 # Debug: Log provisioned clusters self._ctx.logger.debug( f"Capturing logs for provisioned clusters: {self._ctx.provisioned_clusters}" ) logs_buffer.append( f"Provisioned clusters: {self._ctx.provisioned_clusters}\n\n" ) try: # Iterate through all provisioned clusters for cluster_name in self._ctx.provisioned_clusters: self._ctx.cluster_name = cluster_name # Activate cluster in context logs_buffer.append(f"\n{'=' * 80}\n") logs_buffer.append(f"Cluster: {cluster_name}\n") logs_buffer.append(f"{'=' * 80}\n\n") try: # Get all containers for this cluster resources = self._ctx.cluster.resource.resources() containers = list(resources.containers()) if containers: total_containers += len(containers) for container in containers: logs_buffer.append( f"\n--- Container: {container.name} ---\n" ) logs_buffer.append(f"Status: {container.status}\n") logs_buffer.append(f"ID: {container.id[:12]}\n") try: logs = container.logs().decode( "utf-8", errors="replace" ) logs_buffer.append(f"\nLogs:\n{logs}\n") except Exception as log_err: logs_buffer.append( f"\nFailed to retrieve logs: {log_err}\n" ) logs_buffer.append("-" * 80 + "\n") else: logs_buffer.append("No containers found for this cluster.\n") except Exception as cluster_err: logs_buffer.append( f"\nFailed to retrieve containers for cluster " f"'{cluster_name}': {cluster_err}\n" ) if total_containers == 0: logs_buffer.append( "\nNo containers found across all provisioned clusters.\n" ) except Exception as container_err: logs_buffer.append( f"\nFailed to retrieve container information: {container_err}\n" ) finally: # Restore original cluster name self._ctx.cluster_name = original_cluster_name self._captured_container_logs = "".join(logs_buffer) def _provision_workers_when_safe(self) -> None: """Wait for the worker-safe event, then provision workers. Notes ----- This method is intended to be run in a background thread. It waits until the coordinator container signals that workers can be safely provisioned, then provisions the requested number of workers. """ self._ctx.logger.debug( "Waiting for worker-safe signal before provisioning workers..." ) self._worker_safe_event.wait() self._ctx.logger.debug( "Worker-safe signal received. Proceeding to provision workers." ) with self._ctx.logger.spinner(f"Provisioning {self.workers} workers..."): self._ctx.cluster.ops.reconcile_workers(self.workers) self._ctx.logger.info(f"{self.workers} workers provisioned successfully.") def _set_distribution(self) -> None: """Determine the cluster distribution. Set the distribution for the cluster based on the configuration. """ if not self.image: self.image = self._ctx.env.get("IMAGE", "trino") if self.image != "trino" and self.image != "starburst": raise UserError( f"Invalid image type '{self.image}'. Please specify either 'trino' " "or 'starburst'.", "Example: `minitrino provision -i trino`. This can also be set " "permanently via `minitrino config`.", ) self._ctx.env.update({"CLUSTER_DIST": self.image}) self._ctx.env.update({"SERVICE_USER": self.image}) self._ctx.env.update({"ETC": f"/etc/{self.image}"}) def _ensure_shared_network(self) -> None: """Ensure the shared network exists.""" try: self._ctx.docker_client.networks.get("cluster_shared") self._ctx.logger.debug("Shared network already exists, skipping creation.") except NotFound: self._ctx.logger.debug("Creating shared network...") self._ctx.docker_client.networks.create( name="cluster_shared", driver="bridge", labels={ "org.minitrino.root": "true", "org.minitrino.module.minitrino": "true", "com.docker.compose.project": "minitrino-system", }, ) def _append_running_modules(self, modules: list[str] | None = None) -> list[str]: """Add running modules to the modules list. Parameters ---------- modules : list[str] The list of modules to provision. Returns ------- list[str] The list of modules to provision, including any running modules. """ self._ctx.logger.debug("Checking for running modules...") running_modules = self._ctx.modules.running_modules() if running_modules: self._ctx.logger.debug( f"Identified the following running modules: " f"{list(running_modules.keys())}. Appending " "the running module list to the list " "of modules to provision.", ) modules = modules if modules is not None else [] modules.extend(running_modules.keys()) return list(set(modules)) def _module_yaml_paths(self) -> list[str]: """Return a list of YAML file paths for enabled modules. Returns ------- list[str] List of Compose YAML file paths for enabled modules. """ root_compose = os.path.join(self._ctx.lib_dir, "docker-compose.yaml") paths = [root_compose] for module in self.modules: yaml_file = self._ctx.modules.data.get(module, {}).get("yaml_file", "") paths.append(yaml_file) return paths def _resolve_compose_bin(self) -> tuple[str, list[str]]: """Resolve the Docker Compose executable and base command. Returns ------- tuple[str, list[str]] Tuple of (compose_bin, base_args). compose_bin is the full path to the executable, base_args is the argument list (e.g., ['compose']) if using the plugin, or [] if using the legacy binary. Raises ------ RuntimeError If neither docker nor docker-compose is found in PATH. """ docker_bin = shutil.which("docker") docker_compose_bin = shutil.which("docker-compose") if docker_bin is not None: return docker_bin, ["compose"] elif docker_compose_bin is not None: return docker_compose_bin, [] else: raise MinitrinoError( "Neither 'docker' nor 'docker-compose' was found in PATH." ) def _build_compose_command( self, module_yaml_paths: list[str] | None = None ) -> list[str]: """Build the Docker Compose command as a list of arguments. Parameters ---------- module_yaml_paths : Optional[list[str]], optional List of module YAML file paths to include with -f flags. Returns ------- list[str] The Docker Compose command as a list of arguments. """ compose_bin, base_args = self._resolve_compose_bin() cmd = [compose_bin] + base_args if module_yaml_paths: for yaml_path in module_yaml_paths: cmd += ["-f", yaml_path] cmd += ["up", "-d", "--force-recreate"] if self.build: cmd.append("--build") return cmd def _module_string(self) -> str: """Return a comma-separated string of modules.""" return ",".join(self.modules) def _set_env_vars(self) -> None: """Set environment variables for the cluster.""" self._ctx.env.update({"WORKERS": str(self.workers)}) self._ctx.env.update({"CLUSTER_NAME": self._ctx.cluster_name}) self._ctx.env.update({"MINITRINO_MODULES": self._module_string()}) compose_project_name = self._ctx.cluster.resource.compose_project_name() self._ctx.env.update({"COMPOSE_PROJECT_NAME": compose_project_name}) def _run_compose_and_wait(self, compose_cmd: list[str]) -> None: """Run the compose command asynchronously. Parameters ---------- compose_cmd : list[str] The docker compose command to execute (as a list of arguments). """ if "COMPOSE_BAKE" not in self._ctx.env: self._ctx.env["COMPOSE_BAKE"] = "true" env = self._ctx.env.copy() env.update(self._dep_cluster_env) # Use the new stream_execute_with_result API for fast failure detection output_iterator, completion_event, get_result = ( self._ctx.cmd_executor.stream_execute_with_result( compose_cmd, environment=env, suppress_output=True ) ) self._compose_failed = threading.Event() self._compose_error: BaseException | None = None self._compose_output_lines: list[str] = [] def _run_compose() -> None: """Stream compose output and capture errors.""" try: for line in output_iterator: self._ctx.logger.debug(line) self._compose_output_lines.append(line) # Check if the process failed quickly (validation errors, etc.) if completion_event.is_set(): result = get_result() if result.exit_code != 0: self._compose_failed.set() self._compose_error = MinitrinoError( f"Docker Compose command failed with exit code " f"{result.exit_code}.\n" f"Command: {' '.join(compose_cmd)}\n" f"Output:\n{''.join(self._compose_output_lines)}" ) return except Exception as exc: self._compose_failed.set() self._compose_error = exc fq_container_name = self._ctx.cluster.resource.fq_container_name("minitrino") try: orig_container = self._ctx.cluster.resource.container(fq_container_name) orig_container_id = orig_container.id self._ctx.logger.debug( f"Original coordinator container ID: {orig_container_id}" ) except NotFound: orig_container_id = None compose_thread = threading.Thread(target=_run_compose) compose_thread.start() self._ctx.logger.debug("Compose command started asynchronously.") spinner_msg = ( "Building Minitrino image..." if self.build else "Starting Minitrino environment..." ) with self._ctx.logger.spinner(spinner_msg): try: self._wait_for_coordinator_container( orig_container_id, compose_thread, completion_event, get_result, ) finally: compose_thread.join() # Final check after thread completion if self._compose_failed.is_set(): raise MinitrinoError( "Docker Compose command failed." ) from self._compose_error # Check final result if not already failed if completion_event.is_set(): result = get_result() if result.exit_code != 0: raise MinitrinoError( f"Docker Compose command failed with exit code " f"{result.exit_code}.\n" f"Command: {' '.join(compose_cmd)}\n" f"Output:\n{''.join(self._compose_output_lines)}" ) def _wait_for_coordinator_container( self, orig_container_id: str | None, compose_thread: threading.Thread, completion_event: threading.Event, get_result: Callable, timeout: int = 180, ) -> None: """Wait for the coordinator container to be running. Parameters ---------- orig_container_id : str | None ID of the original coordinator container. compose_thread : threading.Thread Thread running the compose command. completion_event : threading.Event Event signaling compose command completion. get_result : Callable Function to get the final command result. """ timeout = ( int(self._ctx.env.get("PROVISION_BUILD_TIMEOUT", 1200)) if self.build else 120 ) # Use longer timeout for builds even after container creation default_timeout = 300 if self.build else 120 reset_timeout = False poll_start = time.time() while True: # Check for early compose failure (e.g., validation errors) if completion_event.is_set() and not compose_thread.is_alive(): result = get_result() if result.exit_code != 0: # Docker Compose failed quickly - likely validation error raise MinitrinoError( f"Docker Compose failed with exit code {result.exit_code}.\n" f"This often indicates a configuration or validation error.\n" f"Output:\n{''.join(self._compose_output_lines)}" ) if self._compose_failed.is_set(): raise MinitrinoError( "Docker Compose command failed." ) from self._compose_error if shutdown_event.is_set(): self._ctx.logger.warn("Shutdown event detected, aborting compose wait.") # Check if coordinator container failed before returning try: fqcn_shutdown = self._ctx.cluster.resource.fq_container_name( "minitrino" ) container_shutdown = self._ctx.cluster.resource.container( fqcn_shutdown ) if container_shutdown.status == "exited": exit_code_shutdown = int( container_shutdown.attrs.get("State", {}).get("ExitCode", 0) ) if exit_code_shutdown != 0: raise MinitrinoError( "Failed to provision cluster. " f"Coordinator container exited with code " f"{exit_code_shutdown}." ) except NotFound: pass return try: fqcn = self._ctx.cluster.resource.fq_container_name("minitrino") container = self._ctx.cluster.resource.container(fqcn) # Refresh container state to avoid checking stale/old containers container.reload() self._ctx.logger.debug( f"Polling coordinator container: " f"id={container.id[:12]}, status={container.status}" ) # If we're expecting a container replacement (build/recreate), # skip checks on the old container if orig_container_id and container.id == orig_container_id: self._ctx.logger.debug( f"Still seeing old container (id={container.id[:12]}), " "waiting for replacement..." ) # Don't check logs on old container, wait for new one time.sleep(0.5) continue # If pre-start bootstraps complete, signal to workers # that they can safely provision if ( container.status == "running" and b"- PRE START BOOTSTRAPS COMPLETED -" in container.logs() ): self._worker_safe_event.set() # If any running container for fqcn, treat as success; # Wait for coordinator to actually be ready. if ( container.status == "running" and b"- CLUSTER IS READY -" in container.logs() ): if orig_container_id and container.id != orig_container_id: self._ctx.logger.debug( f"Coordinator container replaced: " f"old id={orig_container_id[:12]}, " f"new id={container.id[:12]}" ) break # If current container is exited with nonzero exit code, # check if any newer running container exists if container.status == "exited": exit_code = int(container.attrs.get("State", {}).get("ExitCode", 0)) if exit_code != 0: # Check if a newer running container exists for fqcn try: container = self._ctx.cluster.resource.container(fqcn) container.reload() running_found = False if container.status == "running": self._ctx.logger.debug( f"Found newer running coordinator container: " f"id={container.id[:12]}" ) running_found = True if not running_found: raise MinitrinoError( f"Coordinator container exited with code " f"{exit_code}." ) except NotFound: raise MinitrinoError( f"Coordinator container exited with code {exit_code}." ) from None except NotFound: pass if not compose_thread.is_alive() and not reset_timeout: try: fqcn = self._ctx.cluster.resource.fq_container_name("minitrino") container = self._ctx.cluster.resource.container(fqcn) container.reload() # If this is still the old container, wait for replacement if orig_container_id and container.id == orig_container_id: self._ctx.logger.debug( f"Compose finished but still seeing old container " f"(id={container.id[:12]}), waiting for replacement..." ) else: # New container exists, safe to reduce timeout timeout = default_timeout reset_timeout = True self._ctx.logger.debug( f"Compose thread finished and new container exists " f"(id={container.id[:12]}), reducing coordinator wait " f"timeout to {default_timeout} seconds." ) self._ctx.logger.info( "Waiting for coordinator container to start..." ) except NotFound: # Container doesn't exist yet, likely still pulling # image. Keep the original timeout and check again # next iteration self._ctx.logger.debug( "Compose thread finished but container not found yet, " "likely still pulling image. Maintaining original timeout." ) if time.time() - poll_start > timeout: raise MinitrinoError( f"Timed out after {timeout} seconds waiting for " "coordinator container to start." ) time.sleep(1) def _set_license(self) -> None: """Set the license for the cluster.""" if self._ctx.env.get("LIC_PATH"): user_provided_path = self._ctx.env["LIC_PATH"] self._ctx.logger.debug("License path provided. Ensuring absolute path.") try: self._ctx.env["LIC_PATH"] = os.path.abspath( os.path.expanduser(user_provided_path) ) assert os.path.isfile(self._ctx.env["LIC_PATH"]) except Exception as e: raise UserError( f"Failed to resolve valid license path: {e}", f"Please provide a valid license path. " f"Path provided: {user_provided_path}", ) from e def _determine_build(self) -> bool: """Determine if the image should be built.""" # Check if image source has changed if self._image_src_changed(): self._ctx.logger.debug( "Image source has changed. " "--build flag will be appended to compose command." ) return True # Check if the image exists at all ver = self._ctx.env.get("CLUSTER_VER") dist = self._ctx.env.get("CLUSTER_DIST") image_tag = f"minitrino/cluster:{ver}-{dist}" try: self._ctx.docker_client.images.get(image_tag) self._ctx.logger.debug(f"Image {image_tag} exists, no build required.") return False except Exception: self._ctx.logger.debug( f"Image {image_tag} does not exist. " "--build flag will be appended to compose command." ) return True @property def _image_src_checksum(self) -> str: """The checksum of the Minitrino image source directory.""" if not hasattr(self, "_checksum"): self._checksum = self._get_image_src_checksum() return self._checksum def _get_image_src_checksum(self) -> str: """Return the checksum of the image source directory.""" hashobj = hashlib.new("sha256") directory = os.path.join(self._ctx.lib_dir, "image") for root, _, files in os.walk(directory): for fname in sorted(files): fpath = os.path.join(root, fname) if not os.path.isfile(fpath) or os.path.islink(fpath): continue # Include relative path in hash for uniqueness relpath = os.path.relpath(fpath, directory) hashobj.update(relpath.encode()) with open(fpath, "rb") as f: while chunk := f.read(8192): hashobj.update(chunk) self._ctx.logger.debug( f"Minitrino image source current checksum: {hashobj.hexdigest()}" ) return hashobj.hexdigest() def _image_src_changed(self) -> bool: """Compare current image source checksum to recorded checksum. If the image source checksum has not been recorded, always return True to force a build. Returns ------- bool True if the image source has changed, False otherwise. """ checksum = self._image_src_checksum ver = self._ctx.env.get("CLUSTER_VER") dist = self._ctx.env.get("CLUSTER_DIST") image_name = f"{ver}-{dist}" self._ctx.logger.debug( f"Checking if image source has changed for {image_name}..." ) self.checksum_dir = os.path.join( self._ctx.minitrino_user_dir, ".imagechecksums" ) if not os.path.isdir(self.checksum_dir): os.makedirs(self.checksum_dir) self.checksum_file = os.path.join(self.checksum_dir, f"{image_name}") if os.path.isfile(self.checksum_file): with open(self.checksum_file) as f: recorded_checksum = f.read().strip() self._ctx.logger.debug( f"Minitrino image source last recorded checksum " f"for image {image_name}: {recorded_checksum}" ) return checksum != recorded_checksum else: return True def _record_image_src_checksum(self) -> None: """Record the image source checksum.""" checksum = self._image_src_checksum self._ctx.logger.debug( f"Recording Minitrino image source checksum: " f"{checksum} to {self.checksum_file}" ) with open(self.checksum_file, "w") as f: f.write(checksum) def _rollback(self) -> None: """Perform a cluster rollback.""" if self.no_rollback: self._ctx.logger.warn( f"Errors occurred during cluster '{self._ctx.cluster_name}' " "provisioning and rollback has been disabled. " "Provisioned resources will remain in an unaltered state.", ) return for cluster in self._ctx.provisioned_clusters: self._ctx.cluster_name = cluster # Activate the cluster in the context self._ctx.cluster.ops.rollback()