Source code for minitrino.core.modules

"""Module management for Minitrino CLI."""

from __future__ import annotations

import json
import os
from typing import TYPE_CHECKING

import jsonschema
import yaml

from minitrino import utils
from minitrino.core.errors import MinitrinoError, UserError
from minitrino.settings import (
    COMPOSE_LABEL_KEY,
    LIC_MOUNT_PATH,
    LIC_VOLUME_MOUNT,
    MODULE_ADMIN,
    MODULE_CATALOG,
    MODULE_LABEL_KEY,
    MODULE_ROOT,
    MODULE_SECURITY,
)

if TYPE_CHECKING:
    from minitrino.core.context import MinitrinoContext

MODULE_METADATA_SPEC = {
    "type": "object",
    "properties": {
        "description": {"type": "string"},
        "incompatibleModules": {"type": "array", "items": {"type": "string"}},
        "enterprise": {"type": "boolean"},
        "versions": {
            "type": "array",
            "items": {"type": "string"},
            "minItems": 0,
            "maxItems": 2,
        },
        "dependentModules": {"type": "array", "items": {"type": "string"}},
        "dependentClusters": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "modules": {"type": "array", "items": {"type": "string"}},
                    "workers": {"type": "number"},
                    "env": {"type": "object"},
                },
                "required": ["name", "modules", "workers", "env"],
            },
        },
    },
    "required": ["description"],
}


[docs] class Modules: """Module validation and management. Parameters ---------- ctx : MinitrinoContext An instantiated MinitrinoContext object with user input and context. Attributes ---------- data : dict A dictionary of loaded module metadata keyed by module name. Methods ------- running_modules() : Returns a dictionary of modules that are currently running. validate_module_name(name: str) : Return the module name or raise UserError with a suggestion. check_dep_modules(modules: Optional[list[str]] = None) : Check if provided modules have dependencies and include them. check_module_version_requirements(modules: Optional[list[str]] = None) : Check module version compatibility against the provided cluster version. check_compatibility(modules: Optional[list[str]] = None) : Check for mutually exclusive modules among the provided modules. check_enterprise(modules: Optional[list[str]] = None) : Check if any provided modules are Starburst Enterprise features and validate license. module_services(modules: Optional[list[str]] = None) : Get all services defined in the provided modules. check_volumes(modules: Optional[list[str]] = None) : Check if any of the provided modules have persistent volumes and warn the user. """ def __init__(self, ctx: MinitrinoContext) -> None: self.data: dict = {} self._ctx = ctx self._load_modules()
[docs] def running_modules(self) -> dict[str, str]: """Retrieve running modules by inspecting active Docker containers. Returns ------- dict[str, str] A dictionary mapping module names (lowercase strings) to their associated cluster name. Raises ------ UserError If any container lacks expected Minitrino labels or if a running module is not found in the loaded library data. Notes ----- This method inspects active Docker containers for the current cluster and extracts module names and their associated cluster from Docker labels. Only containers with valid `org.minitrino` labels are considered modules. """ utils.check_daemon(self._ctx.docker_client) containers = self._ctx.cluster.resource.resources().containers() if not containers: return {} modules = {} def get_modules_from_labels(labels: dict[str, str]) -> list[str]: module_names = [] for label_key, label_value in labels.items(): if not label_key.startswith(MODULE_LABEL_KEY): continue if label_value != "true": continue parts = label_key[len(f"{MODULE_LABEL_KEY}.") :].split(".") if len(parts) < 2: continue module_name = ".".join(parts[1:]) module_names.append(module_name) return module_names for container in containers: cluster_name = container.cluster_name module_names = get_modules_from_labels(container.labels) if module_names: if cluster_name is None: raise UserError( f"Unable to determine cluster name for container " f"'{container.name}'. Container '{container.name}' " f"is either missing the '{COMPOSE_LABEL_KEY}' label " f"or it is malformed.", ) for module in module_names: modules[module] = cluster_name continue if ( "minitrino-worker" in str(container.name) or container.name == f"minitrino-{container.cluster_name}" ): continue raise UserError( f"Missing Minitrino labels for container '{container.name}'.", ) for module in modules: if not isinstance(self.data.get(module), dict): raise UserError( f"Module '{module}' is running, but it is not found in the " "library. Was it deleted, or are you pointing Minitrino to " "the wrong location?" ) return modules
[docs] def validate_module_name(self, name: str) -> str: """Return the module name or raise UserError with a suggestion. Always use this method to validate any user-provided module name(s). Parameters ---------- name : str User-supplied module name. Returns ------- str Validated module name. Raises ------ UserError If the module name is not found, with a suggestion. """ valid_names = list(self.data.keys()) return utils.closest_match_or_error(name, valid_names, "module")
[docs] def check_dep_modules(self, modules: list[str] | None = None) -> list[str]: """Recursively collect all direct and transitive module deps. Parameters ---------- modules : Optional[list[str]] List of module names to check. Default is `None`. Returns ------- list[str] List of modules dependent to the modules provided. """ def _add_with_deps(module): if module in result: return result.add(module) for dep in self.data.get(module, {}).get("dependentModules", []): self._ctx.logger.debug( f"Module dependency for module '{module}' will be included: '{dep}'" ) _add_with_deps(dep) if modules is None: modules = [] result: set[str] = set() for m in modules: _add_with_deps(m) return list(result)
[docs] def check_module_version_requirements( self, modules: list[str] | None = None ) -> None: """Check module-cluster version compatibility. Parameters ---------- modules : list[str] A list of module names to check version requirements for. Raises ------ UserError If the version constraints are invalid or not satisfied. """ modules = modules or [] for module in modules: versions = self.data.get(module, {}).get("versions", []) if not versions: continue if len(versions) > 2: raise UserError( f"Invalid versions specification for module '{module}' " f"in metadata.json file: {versions}", 'The valid structure is: {{"versions": [min-ver, max-ver]}}. ' "If the versions key is present, the minimum version is required, " "and the maximum version is optional.", ) cluster_ver = int(self._ctx.env.get("CLUSTER_VER", "")[0:3]) min_ver = int(versions.pop(0)) max_ver = None if versions: max_ver = int(versions.pop()) begin_msg = ( f"The supplied cluster version {cluster_ver} " f"is incompatible with module '{module}'. " f"Per the module's metadata.json file, the" ) if cluster_ver < min_ver: raise UserError( f"{begin_msg} minimum required cluster " f"version for the module is: {min_ver}." ) if max_ver and cluster_ver > max_ver: raise UserError( f"{begin_msg} maximum required cluster " f"version for the module is: {max_ver}." )
[docs] def check_compatibility(self, modules: list[str] | None = None) -> None: """Check for mutually exclusive modules among the provided modules. Parameters ---------- modules : Optional[list[str]] List of module names to check. Default is `None`. Raises ------ UserError If incompatible modules are detected. """ if modules is None: modules = [] for module in modules: incompatible = self.data.get(module, {}).get("incompatibleModules", []) if not incompatible: continue for module_inner in modules: if (module_inner in incompatible) or ( incompatible[0] == "*" and len(modules) > 1 ): raise UserError( f"Incompatible modules detected. Tried to provision " f"module '{module_inner}', but found that the module is " f"incompatible with module '{module}'. Incompatible modules " f"listed for module '{module}' are: {incompatible}", f"You can see which modules are incompatible with this module " f"by running 'minitrino modules -m {module}'", )
[docs] def check_enterprise(self, modules: list[str] | None = None) -> None: """Check for Starburst Enterprise modules and validate license. Parameters ---------- modules : Optional[list[str]] List of module names to check. Default is `None`. Raises ------ UserError If a required license is missing. """ if modules is None: modules = [] self._ctx.logger.debug( "Checking for Starburst Enterprise modules...", ) yaml_path = os.path.join(self._ctx.lib_dir, "docker-compose.yaml") with open(yaml_path) as f: yaml_file = yaml.load(f, Loader=yaml.FullLoader) volumes = yaml_file.get("services", {}).get("minitrino", {}).get("volumes", []) if LIC_VOLUME_MOUNT not in volumes: raise UserError( f"The required license volume in the library's root " f"docker-compose.yaml is either commented out or " f"deleted: {yaml_path}.", "The proper volume mount is: '{LIC_VOLUME_MOUNT}'", ) enterprise_modules = [] for module in modules: if self.data.get(module, {}).get("enterprise", False): enterprise_modules.append(module) if enterprise_modules: if not self._ctx.env.get("CLUSTER_DIST") == "starburst": raise UserError( f"Module(s) {enterprise_modules} are only compatible with " f"Starburst Enterprise. Please specify the image type with " f"the '-i' option. ", "minitrino provision -i starburst", ) if not self._ctx.env.get("LIC_PATH", ""): raise UserError( f"Module(s) {enterprise_modules} requires a Starburst license. " f"You must provide a path to a Starburst license via the " f"LIC_PATH environment variable." ) lic_path = os.path.expanduser(self._ctx.env.get("LIC_PATH", "")) if not os.path.isfile(lic_path): raise UserError( f"Module(s) {enterprise_modules} requires a Starburst license. " f"The path provided via the LIC_PATH environment variable does " f"not exist or is not a file: {lic_path}." ) self._ctx.env.update({"LIC_MOUNT_PATH": LIC_MOUNT_PATH}) elif "dummy.license" not in self._ctx.env.get("LIC_PATH", ""): self._ctx.env.update({"LIC_MOUNT_PATH": LIC_MOUNT_PATH})
[docs] def module_services(self, modules: list[str] | None = None) -> list[list]: """Get all services defined in the provided modules. Parameters ---------- modules : Optional[list[str]] List of module names to retrieve services from. Default is `None`. Returns ------- list[list] List of services, each as a list containing the service key (`str`), service dictionary (`dict`), and the YAML file path (`str`). Raises ------ MinitrinoError If a module's Docker Compose YAML file lacks a 'services' section. """ if modules is None: modules = [] services = [] for module in modules: self._ctx.logger.debug(f"Checking for services in module '{module}'...") yaml_file = self.data.get(module, {}).get("yaml_file", "") module_services = ( self.data.get(module, {}).get("yaml_dict", {}).get("services", {}) ) if not module_services: raise MinitrinoError( f"Invalid Docker Compose YAML file for module '{module}' " f"(no 'services' section found): {yaml_file}" ) # Get all services defined in YAML file for service_key, service_dict in module_services.items(): services.append([service_key, service_dict, yaml_file]) return services
[docs] def check_volumes(self, modules: list[str] | None = None) -> None: """Check for persistent volumes and warn the user if any are found. Parameters ---------- modules : Optional[list[str]] List of module names to check. Default is `None`. """ if modules is None: modules = [] self._ctx.logger.debug( "Checking modules for persistent volumes...", ) for module in modules: if self.data.get(module, {}).get("yaml_dict", {}).get("volumes", {}): self._ctx.logger.warn( f"Module '{module}' has persistent volumes associated " f"with it. To delete these volumes, remember to run " f"minitrino remove --volumes --module {module}.", )
def _load_modules(self) -> None: """Load module data during class instantiation. Raises ------ MinitrinoError If the resolved `modules_dir` path is not a directory. UserError If a module is missing its expected `.yaml` file. Notes ----- This method scans the Minitrino library for valid module directories under the `admin`, `catalog`, and `security` sections. Each module must include a matching `.yaml` file, and may optionally include a `metadata.json` file. Parsed module information is stored in the `data` attribute. """ self._ctx.logger.debug("Loading modules...") modules_dir = os.path.join(self._ctx.lib_dir, MODULE_ROOT) if not os.path.isdir(modules_dir): raise MinitrinoError( f"Path is not a directory: {modules_dir}. " f"Are you pointing to a compatible Minitrino library?" ) # Loop through all module types sections = [ os.path.join(modules_dir, MODULE_ADMIN), os.path.join(modules_dir, MODULE_CATALOG), os.path.join(modules_dir, MODULE_SECURITY), ] for section_dir in sections: for _dir in os.listdir(section_dir): module_dir = os.path.join(section_dir, _dir) if not os.path.isdir(module_dir): self._ctx.logger.debug( f"Skipping file (expected a directory, not a file) " f"at path: {module_dir}", ) continue # List inner-module files module_files = os.listdir(module_dir) yaml_basename = f"{os.path.basename(module_dir)}.yaml" if yaml_basename not in module_files: raise UserError( f"Missing Docker Compose file in module directory {_dir}. " f"Expected file to be present: {yaml_basename}", "Check this module in your library to ensure it is " "properly constructed.", ) # Module dir and YAML exist, add to modules module_name = os.path.basename(module_dir) self.data[module_name] = {} self.data[module_name]["type"] = os.path.basename(section_dir) self.data[module_name]["module_dir"] = module_dir # Add YAML file path yaml_file = os.path.join(module_dir, yaml_basename) self.data[module_name]["yaml_file"] = yaml_file # Add YAML dict with open(yaml_file) as f: self.data[module_name]["yaml_dict"] = yaml.load( f, Loader=yaml.FullLoader ) json_basename = "metadata.json" json_file = os.path.join(module_dir, json_basename) metadata = {} if not os.path.isfile(json_file): raise MinitrinoError( f"Missing required metadata.json file for " f"module '{module_name}'." ) with open(json_file) as f: metadata = json.load(f) try: jsonschema.validate(metadata, MODULE_METADATA_SPEC) except jsonschema.ValidationError as e: raise UserError( f"Invalid metadata.json in module '{module_name}': {e.message}", f"File: {json_file}", ) from e for k, v in metadata.items(): self.data[module_name][k] = v # Add module label self.data[module_name]["label"] = ( f"{MODULE_LABEL_KEY}." f"{self.data[module_name]['type']}." f"{module_name}=true" )