Source code for minitrino.core.cluster.validator

"""Cluster validation utilities for Minitrino CLI."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING

from minitrino import utils
from minitrino.core.docker.wrappers import MinitrinoContainer
from minitrino.core.errors import UserError
from minitrino.settings import (
    CLUSTER_CONFIG,
    CLUSTER_JVM_CONFIG,
    ETC_DIR,
    MIN_CLUSTER_VER,
)

if TYPE_CHECKING:
    from minitrino.core.cluster.cluster import Cluster
    from minitrino.core.context import MinitrinoContext


[docs] class ClusterValidator: """Validate cluster configuration and environment variables. Parameters ---------- ctx : MinitrinoContext An instantiated MinitrinoContext object with user input and context. cluster : Cluster An instantiated `Cluster` object. Methods ------- check_cluster_name() Validate that the cluster name is valid. check_cluster_ver() Validate that the current `CLUSTER_VER` and `CLUSTER_DIST` environment variables meet minimum requirements for either Trino or Starburst distributions. check_dependent_clusters(modules: Optional[list[str]] = None) Identify dependent clusters for the specified modules. check_dup_config() Check for duplicate entries in `config.properties` and `jvm.config` and log warnings if duplicates are found. """ def __init__(self, ctx: MinitrinoContext, cluster: Cluster): self._ctx = ctx self._cluster = cluster
[docs] def check_cluster_name(self) -> None: """Validate that the cluster name is valid. Raises ------ UserError If the cluster name is invalid. """ if self._ctx.cluster_name == "images" or self._ctx.cluster_name == "system": raise UserError( f"Cluster name '{self._ctx.cluster_name}' is reserved for " "internal use. Please use a different cluster name." ) if not re.fullmatch(r"[A-Za-z0-9_\-\*]+", self._ctx.cluster_name): raise UserError( f"Invalid cluster name '{self._ctx.cluster_name}'. Cluster names can " "only contain alphanumeric characters, underscores, dashes, or " "asterisks (asterisks are for filtering operations only and will " "not work with the `provision` command)." )
[docs] def check_cluster_ver(self) -> None: """Validate that the cluster version meets minimum requirements. Raises ------ UserError If the provided version is too low or formatted incorrectly. """ cluster_dist = self._ctx.env.get("CLUSTER_DIST", "") cluster_ver = self._ctx.env.get("CLUSTER_VER", "") if cluster_dist == "starburst": error_msg = ( f"Provided Starburst version '{cluster_ver}' is invalid. " f"The version must be {MIN_CLUSTER_VER}-e or higher." ) try: cluster_ver_int = int(cluster_ver[0:3]) if cluster_ver_int < MIN_CLUSTER_VER or "-e" not in cluster_ver: raise UserError(error_msg) except Exception: raise UserError(error_msg) from None elif cluster_dist == "trino": error_msg = ( f"Provided Trino version '{cluster_ver}' is invalid. " f"The version must be {MIN_CLUSTER_VER} or higher." ) if "-e" in cluster_ver: raise UserError( f"The provided Trino version '{cluster_ver}' cannot contain '-e'. " "Did you mean to use Starburst via the --image option?" ) try: cluster_ver_int = int(cluster_ver[0:3]) if cluster_ver_int < MIN_CLUSTER_VER: raise UserError(error_msg) except Exception: raise UserError(error_msg) from None
[docs] def check_dependent_clusters(self, modules: list[str] | None = None) -> list[dict]: """Identify dependent clusters for the specified modules. Parameters ---------- modules : list[str] A list of module names to check for dependencies. Returns ------- list[dict] A list of cluster definitions that should be treated as dependencies. """ self._ctx.logger.debug("Checking for dependent clusters...") dependent_clusters = [] modules = modules or [] def _helper(module_dependent_clusters): for cluster in module_dependent_clusters: cluster_name = f"{self._ctx.cluster_name}-dep-{cluster['name']}" cluster["name"] = cluster_name dependent_clusters.append(cluster) # Check for test override once upfront override_clusters = None override_env = self._ctx.env.get("MINITRINO_TEST_DEP_OVERRIDE") if override_env: import json try: override_clusters = json.loads(override_env) self._ctx.logger.debug( f"Using test override for dependent clusters: {override_clusters}" ) except json.JSONDecodeError as e: self._ctx.logger.warn( f"Invalid JSON in MINITRINO_TEST_DEP_OVERRIDE: {e}" ) for module in modules: # Use override if present, otherwise use module metadata if override_clusters is not None: module_dependent_clusters = override_clusters else: module_data: dict = self._ctx.modules.data.get(module, {}) module_dependent_clusters = module_data.get("dependentClusters", []) if module_dependent_clusters: _helper(module_dependent_clusters) # Circular dependency check: dependent clusters cannot list # their parent module as a dependency (would cause infinite # recursion) for parent_module in modules: parent_module_data: dict = self._ctx.modules.data.get(parent_module, {}) module_dependent_clusters = parent_module_data.get("dependentClusters", []) for cluster in module_dependent_clusters: cluster_modules = cluster.get("modules", []) if parent_module in cluster_modules: raise UserError( f"Circular dependency detected: Dependent cluster " f"'{cluster.get('name', 'unknown')}' of module " f"'{parent_module}' cannot list '{parent_module}' " "as one of its modules. This would cause infinite " "recursion during provisioning." ) return list(dependent_clusters)
[docs] def check_dup_config(self, cluster_cfgs=None, jvm_cfgs=None) -> None: """Check for duplicate entries in cluster config files.""" def log_duplicates(cfgs, filename): self._ctx.logger.debug( f"Checking '{filename}' file for duplicate configs...", ) unique: dict[str, list[list[str]]] = {} for cfg in cfgs: if cfg[0] == "key_value": key = cfg[1] # config property name elif cfg[0] == "unified": key = cfg[1] # unified line itself else: key = str(cfg) if key in unique: unique[key].append(cfg) else: unique[key] = [cfg] duplicates = {k: v for k, v in unique.items() if len(v) > 1} if duplicates: msg = [ f"Duplicate configuration properties detected in '{filename}' file:" ] for key, entries in duplicates.items(): msg.append(f" {key}:") for entry in entries: if entry[0] == "key_value": msg.append(f" - {entry[1]}={entry[2]}") elif entry[0] == "unified": msg.append(f" - {entry[1]}") else: msg.append(f" - {entry}") self._ctx.logger.warn("\n".join(msg)) # If configs are provided, just check them without needing containers if cluster_cfgs is not None and jvm_cfgs is not None: log_duplicates(cluster_cfgs, CLUSTER_CONFIG) log_duplicates(jvm_cfgs, CLUSTER_JVM_CONFIG) else: # Need to fetch configs from containers containers = self._cluster.resource.cluster_containers() for container in containers: if cluster_cfgs is None or jvm_cfgs is None: current_cluster_cfgs, current_jvm_cfg = self._current_config( container ) cluster_cfgs = cluster_cfgs or current_cluster_cfgs jvm_cfgs = jvm_cfgs or current_jvm_cfg log_duplicates(cluster_cfgs, CLUSTER_CONFIG) log_duplicates(jvm_cfgs, CLUSTER_JVM_CONFIG)
def _current_config( self, container: MinitrinoContainer ) -> tuple[list[tuple], list[tuple]]: """Fetch current cluster configs from a cluster container. Parameters ---------- container : MinitrinoContainer The container to fetch configs from. Returns ------- tuple[list[tuple], list[tuple]] A tuple of parsed config tuples for both files. """ _, uid = utils.container_user_and_id(self._ctx, container) current_cfgs = self._ctx.cmd_executor.execute( [f"cat {ETC_DIR}/{CLUSTER_CONFIG}"], [f"cat {ETC_DIR}/{CLUSTER_JVM_CONFIG}"], container=container, suppress_output=True, user=uid, ) current_cluster_cfgs = self._split_config(current_cfgs[0].output) current_jvm_cfg = self._split_config(current_cfgs[1].output) return current_cluster_cfgs, current_jvm_cfg def _split_config(self, cfgs: str = "") -> list[tuple]: """Split raw config strings into an ordered list of tuples. Each tuple is either ('key_value', key, value) or ('unified', line). Preserves the original ordering and comments. """ cfgs_list = cfgs.strip().split("\n") parsed = [] for cfg in cfgs_list: cfg = re.sub(r"\s*=\s*", "=", cfg) parts = cfg.split("=", 1) if len(parts) == 2: parsed.append(("key_value", parts[0], parts[1])) else: parsed.append(("unified", cfg, "")) return parsed