Source code for minitrino.core.context

"""Core context and controls for Minitrino CLI."""

import contextlib
import logging
import os
from pathlib import Path
from typing import cast

import docker

from minitrino import utils
from minitrino.core.cluster.cluster import Cluster
from minitrino.core.docker.socket import resolve_docker_socket
from minitrino.core.envvars import EnvironmentVariables
from minitrino.core.errors import MinitrinoError, UserError
from minitrino.core.exec.cmd import CommandExecutor
from minitrino.core.library import LibraryManager
from minitrino.core.logging.levels import LogLevel
from minitrino.core.logging.logger import MinitrinoLogger
from minitrino.core.modules import Modules


[docs] class MinitrinoContext: """Expose context and core controls to CLI scripts. Attributes ---------- cluster : Cluster Cluster interface. logger : MinitrinoLogger Logs CLI activity. env : EnvironmentVariables CLI environment variables, subdivided by sections. modules : Modules Metadata about Minitrino modules. cmd_executor : CommandExecutor Executes shell commands in host and containers. docker_client : docker.DockerClient Docker client for high-level API access. api_client : docker.APIClient Docker API client for low-level access. all_clusters : bool If True, operations are applied to all clusters. user_home_dir : str Home directory of the current user. minitrino_user_dir : str Path to ~/.minitrino/ directory. config_file : str Path to user's minitrino.cfg file. snapshot_dir : str Path to snapshot directory for temporary tarballs. lib_dir : str Path to Minitrino's library directory. Methods ------- initialize() Hydrate the context with user-provided inputs. Notes ----- The `lib_dir` property cannot be accessed prior to `_lib_safe` being set to `True`, which occurs early during `initialize()`. The idea is to force any user-provided env vars (since one of them may be `LIB_PATH`) to load before we attempt to do anything with the library. """ cluster: Cluster logger: MinitrinoLogger env: EnvironmentVariables modules: Modules cmd_executor: CommandExecutor docker_client: docker.DockerClient | None api_client: docker.APIClient | None library_manager: LibraryManager all_clusters: bool provisioned_clusters: list[str] user_home_dir: str minitrino_user_dir: str config_file: str snapshot_dir: str def __init__(self): # ------------------------------ # ---- User-provided inputs ---- self.cluster_name = "default" self._user_env_args = [] self._user_log_level = LogLevel.INFO # ------------------------------ self.all_clusters = False self.provisioned_clusters = [] self.logger: MinitrinoLogger = logging.getLogger("minitrino") self.cluster: Cluster | None = None self.env: EnvironmentVariables | None = None self.modules: Modules | None = None self.cmd_executor: CommandExecutor | None = None self.docker_client: docker.DockerClient | None = None self.api_client: docker.APIClient | None = None self.lib_manager = LibraryManager(self) self.user_home_dir = os.path.expanduser("~") self.minitrino_user_dir = self._handle_minitrino_user_dir() self.config_file = self._config_file() self.snapshot_dir = os.path.join(self.minitrino_user_dir, "snapshots") self._lib_dir = None # State self._initialized = False self._lib_safe = False self._logged_config_file_missing = False
[docs] @utils.exception_handler def initialize( self, cluster_name: str = "", version_only: bool = False, log_level: LogLevel | None = None, minimal: bool = False, ) -> None: """Initialize core CLI context attributes. This method sets up logging, environment variables, and context-specific resources like the cluster and Docker clients. If `version_only` is True, initialization is limited to what is required to resolve the CLI and library versions. Parameters ---------- cluster_name : str, optional The cluster name to scope operations to. Defaults to "default". version_only : bool, optional If True, initializes only the attributes required for version fetching (e.g. `minitrino --version`). log_level : LogLevel, optional The log level to set for the logger. minimal : bool, optional If True, initializes only the attributes required for commands that don't need the library (e.g. `lib-install`, `config`). This skips module loading, cluster setup, and Docker client initialization. """ if self._initialized: raise MinitrinoError("Context has already been initialized.") self._lib_safe = True self.env = EnvironmentVariables(self) if version_only: return self._validate_config_file() if minimal: if log_level: self.logger.set_level(log_level) self._initialized = True return self._try_parse_library_env() self._try_compare_versions() self.modules = Modules(self) self.cmd_executor = CommandExecutor(self) if cluster_name: self._set_cluster_attrs(cluster_name) else: self._set_cluster_attrs(self.cluster_name) self._set_docker_clients(env=self.env.copy()) self.env._log_env_vars() if log_level: self.logger.set_level(log_level) self._initialized = True
@property def user_log_level(self) -> LogLevel: """The user-configured log level for this context. Immutable once set. Returns ------- LogLevel The immutable log level set by the user or default (INFO). """ return self._user_log_level @user_log_level.setter def user_log_level(self, value: LogLevel) -> None: """Set the user log level once. Further attempts to set will raise. Parameters ---------- value : LogLevel The log level to set. Raises ------ RuntimeError If the log level has already been set to a non-default value. """ if self._user_log_level != LogLevel.INFO: raise MinitrinoError("user_log_level is immutable once set.") self._user_log_level = value @property def lib_dir(self) -> str: """Get the library directory. Returns ------- str Path to the resolved library directory. Notes ----- The directory is determined using the following order of precedence: 1. Use `LIB_PATH` if provided via environment. 2. Check if the library exists relative to the location of this file, assuming the project is running in a repository context. """ if not self._lib_safe: raise MinitrinoError("lib_dir accessed before initialization") if not self._lib_dir: self._lib_dir = self._get_lib_dir() return self._lib_dir def _get_lib_dir(self) -> str: """Determine and validate the path to the library directory. Returns ------- str Resolved library directory path. Raises ------ UserError If a valid library cannot be found. """ lib_dir = "" with contextlib.suppress(Exception): lib_dir = self.env.get("LIB_PATH", "") if not lib_dir and os.path.isdir(os.path.join(self.minitrino_user_dir, "lib")): lib_dir = os.path.join(self.minitrino_user_dir, "lib") elif not lib_dir: # Use repo root, fail if this doesn't exist repo_root = Path(__file__).resolve().parents for parent in repo_root: if (parent / "src" / "lib").is_dir(): lib_dir = str(parent / "src" / "lib") break if not os.path.isdir(lib_dir) or not os.path.isfile( os.path.join(lib_dir, "minitrino.env") ): raise UserError( "This operation requires a library to be installed.", "The library can be installed in the default location " "(~/.minitrino/lib) via the lib-install command, or it " "can be pointed to with the LIB_PATH environment variable.", ) self.logger.debug( f"Library path set to: {lib_dir}", ) return lib_dir def _handle_minitrino_user_dir(self) -> str: """Create the `~/.minitrino` directory if not exists. Returns ------- str Path to the user's `~/.minitrino` directory. """ minitrino_user_dir = os.path.abspath( os.path.join(self.user_home_dir, ".minitrino") ) if not os.path.isdir(minitrino_user_dir): try: os.mkdir(minitrino_user_dir) except Exception as e: raise UserError( "Failed to create the minitrino user directory.", str(e), ) from e return minitrino_user_dir def _validate_config_file(self) -> None: """Validate the path to the user's `minitrino.cfg` file.""" config_file = os.path.join(self.minitrino_user_dir, "minitrino.cfg") if not os.path.isfile(config_file): msg = ( f"No minitrino.cfg file found at {config_file}. Run " "'minitrino config' to reconfigure this file and directory." ) if not self._logged_config_file_missing: self.logger.warn(msg) self._logged_config_file_missing = True def _config_file(self) -> str: """Return the path to the user's `minitrino.cfg` file. Returns ------- str Path to the user's `minitrino.cfg` file. """ config_file = os.path.join(self.minitrino_user_dir, "minitrino.cfg") return config_file def _try_parse_library_env(self) -> None: """Attempt to parse the library environment file if present.""" try: lib_env = self.env._parse_library_env() # Also check for port variables in OS environment for k, _v in lib_env.items(): if k.startswith("__PORT"): # Check if this port var exists in OS env and isn't already set os_val = os.environ.get(k) if os_val and not self.env.get(k): self.env[k] = str(os_val) except Exception: # Skip lib-related procedures if the lib is not found pass def _try_compare_versions(self) -> None: """Attempt to compare CLI and library versions for compatibility. This is a best-effort check that gracefully handles missing libraries. Commands like `lib-install` and `config` need to run even when no library is installed yet. """ try: cli_ver = utils.cli_ver() lib_ver = utils.lib_ver(lib_path=self.lib_dir) if cli_ver != lib_ver: self.logger.warn( f"CLI version {cli_ver} and library version {lib_ver} " f"do not match. You can update the Minitrino library " f"version to match the CLI version by running 'minitrino " f"lib-install'.", ) except Exception: # Skip version comparison if lib is not found pass def _set_cluster_attrs(self, cluster_name: str) -> None: """Set cluster-related attributes for the context. Parameters ---------- cluster_name : str The name of the cluster to set. """ self.cluster = Cluster(self) self._set_cluster_name(cluster_name) self.env.update( { "COMPOSE_PROJECT_NAME": self.cluster.resource.compose_project_name( self.cluster_name ) } ) def _set_cluster_name(self, cluster_name: str) -> None: """Determine and validate the active cluster name. Parameters ---------- cluster_name : str The user-specified or default cluster name. """ if cluster_name: self.cluster_name = cluster_name elif self.env.get("CLUSTER_NAME"): self.cluster_name = self.env.get("CLUSTER_NAME") else: self.cluster_name = "default" self.cluster.validator.check_cluster_name() if self.cluster_name == "all": self.all_clusters = True self.env.update({"CLUSTER_NAME": self.cluster_name}) self.logger.debug(f"Cluster name set to: {self.cluster_name}") def _set_docker_clients(self, env: dict | None = None) -> None: """Initialize Docker clients. Parameters ---------- env : dict, optional Dictionary of environment variables used when resolving the current Docker context. Defaults to an empty dictionary. Raises ------ MinitrinoError If the Docker socket file cannot be determined. """ self.logger.debug( "Attempting to locate Docker socket file for current Docker context..." ) try: socket = resolve_docker_socket(self, env) self.logger.debug(f"Docker socket path: {socket}") docker_client = docker.DockerClient(base_url=socket) api_client = docker.APIClient(base_url=socket) self.docker_client, self.api_client = docker_client, api_client except Exception: self.docker_client = cast(docker.DockerClient, object()) self.api_client = cast(docker.APIClient, object())